KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > transport > AbstractTrafficControlTest


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress JavaDoc;
23
24 import junit.framework.TestCase;
25
26 import org.apache.mina.common.ByteBuffer;
27 import org.apache.mina.common.ConnectFuture;
28 import org.apache.mina.common.IoAcceptor;
29 import org.apache.mina.common.IoHandler;
30 import org.apache.mina.common.IoHandlerAdapter;
31 import org.apache.mina.common.IoSession;
32 import org.apache.mina.common.TransportType;
33 import org.apache.mina.util.AvailablePortFinder;
34
35 /**
36  * Abstract base class for testing suspending and resuming reads and
37  * writes.
38  *
39  * @author The Apache Directory Project (mina-dev@directory.apache.org)
40  * @version $Rev$, $Date$
41  */

42 public abstract class AbstractTrafficControlTest extends TestCase {
43     protected int port = 0;
44
45     protected IoAcceptor acceptor;
46
47     protected TransportType transportType;
48
49     public AbstractTrafficControlTest(IoAcceptor acceptor) {
50         this.acceptor = acceptor;
51     }
52
53     protected void setUp() throws Exception JavaDoc {
54         super.setUp();
55
56         port = AvailablePortFinder.getNextAvailable();
57
58         acceptor.bind(createServerSocketAddress(port), new ServerIoHandler());
59
60     }
61
62     protected void tearDown() throws Exception JavaDoc {
63         super.tearDown();
64
65         acceptor.unbind(createServerSocketAddress(port));
66     }
67
68     protected abstract ConnectFuture connect(int port, IoHandler handler)
69             throws Exception JavaDoc;
70
71     protected abstract SocketAddress JavaDoc createServerSocketAddress(int port);
72
73     public void testSuspendResumeReadWrite() throws Exception JavaDoc {
74         ConnectFuture future = connect(port, new ClientIoHandler());
75         future.join();
76         IoSession session = future.getSession();
77
78         // We wait for the sessionCreated() event is fired becayse we cannot guarentee that
79
// it is invoked already.
80
while (session.getAttribute("lock") == null) {
81             Thread.yield();
82         }
83
84         Object JavaDoc lock = session.getAttribute("lock");
85         synchronized (lock) {
86
87             write(session, "1");
88             assertEquals('1', read(session));
89             assertEquals("1", getReceived(session));
90             assertEquals("1", getSent(session));
91
92             session.suspendRead();
93
94             write(session, "2");
95             assertFalse(canRead(session));
96             assertEquals("1", getReceived(session));
97             assertEquals("12", getSent(session));
98
99             session.suspendWrite();
100
101             write(session, "3");
102             assertFalse(canRead(session));
103             assertEquals("1", getReceived(session));
104             assertEquals("12", getSent(session));
105
106             session.resumeRead();
107
108             write(session, "4");
109             assertEquals('2', read(session));
110             assertEquals("12", getReceived(session));
111             assertEquals("12", getSent(session));
112
113             session.resumeWrite();
114             assertEquals('3', read(session));
115             assertEquals('4', read(session));
116
117             write(session, "5");
118             assertEquals('5', read(session));
119             assertEquals("12345", getReceived(session));
120             assertEquals("12345", getSent(session));
121
122             session.suspendWrite();
123
124             write(session, "6");
125             assertFalse(canRead(session));
126             assertEquals("12345", getReceived(session));
127             assertEquals("12345", getSent(session));
128
129             session.suspendRead();
130             session.resumeWrite();
131
132             write(session, "7");
133             assertFalse(canRead(session));
134             assertEquals("12345", getReceived(session));
135             assertEquals("1234567", getSent(session));
136
137             session.resumeRead();
138             assertEquals('6', read(session));
139             assertEquals('7', read(session));
140
141             assertEquals("1234567", getReceived(session));
142             assertEquals("1234567", getSent(session));
143
144         }
145
146         session.close().join();
147     }
148
149     private void write(IoSession session, String JavaDoc s) throws Exception JavaDoc {
150         session.write(ByteBuffer.wrap(s.getBytes("ASCII")));
151     }
152
153     private int read(IoSession session) throws Exception JavaDoc {
154         int pos = ((Integer JavaDoc) session.getAttribute("pos")).intValue();
155         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
156             Object JavaDoc lock = session.getAttribute("lock");
157             lock.wait(200);
158         }
159         session.setAttribute("pos", new Integer JavaDoc(pos + 1));
160         return getReceived(session).charAt(pos);
161     }
162
163     private boolean canRead(IoSession session) throws Exception JavaDoc {
164         int pos = ((Integer JavaDoc) session.getAttribute("pos")).intValue();
165         Object JavaDoc lock = session.getAttribute("lock");
166         lock.wait(250);
167         String JavaDoc received = getReceived(session);
168         return pos < received.length();
169     }
170
171     private String JavaDoc getReceived(IoSession session) throws Exception JavaDoc {
172         return session.getAttribute("received").toString();
173     }
174
175     private String JavaDoc getSent(IoSession session) throws Exception JavaDoc {
176         return session.getAttribute("sent").toString();
177     }
178
179     public static class ClientIoHandler extends IoHandlerAdapter {
180         public void sessionCreated(IoSession session) throws Exception JavaDoc {
181             super.sessionCreated(session);
182             session.setAttribute("pos", new Integer JavaDoc(0));
183             session.setAttribute("received", new StringBuffer JavaDoc());
184             session.setAttribute("sent", new StringBuffer JavaDoc());
185             session.setAttribute("lock", new Object JavaDoc());
186         }
187
188         public void messageReceived(IoSession session, Object JavaDoc message)
189                 throws Exception JavaDoc {
190             ByteBuffer buffer = (ByteBuffer) message;
191             byte[] data = new byte[buffer.remaining()];
192             buffer.get(data);
193             Object JavaDoc lock = session.getAttribute("lock");
194             synchronized (lock) {
195                 StringBuffer JavaDoc sb = (StringBuffer JavaDoc) session
196                         .getAttribute("received");
197                 sb.append(new String JavaDoc(data, "ASCII"));
198                 lock.notifyAll();
199             }
200         }
201
202         public void messageSent(IoSession session, Object JavaDoc message)
203                 throws Exception JavaDoc {
204             ByteBuffer buffer = (ByteBuffer) message;
205             buffer.rewind();
206             byte[] data = new byte[buffer.remaining()];
207             buffer.get(data);
208             StringBuffer JavaDoc sb = (StringBuffer JavaDoc) session.getAttribute("sent");
209             sb.append(new String JavaDoc(data, "ASCII"));
210         }
211
212     }
213
214     private static class ServerIoHandler extends IoHandlerAdapter {
215         public void messageReceived(IoSession session, Object JavaDoc message)
216                 throws Exception JavaDoc {
217             // Just echo the received bytes.
218
ByteBuffer rb = (ByteBuffer) message;
219             ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
220             wb.put(rb);
221             wb.flip();
222             session.write(wb);
223         }
224     }
225 }
226
Popular Tags