1 20 package org.apache.mina.transport; 21 22 import java.net.SocketAddress ; 23 24 import junit.framework.TestCase; 25 26 import org.apache.mina.common.ByteBuffer; 27 import org.apache.mina.common.ConnectFuture; 28 import org.apache.mina.common.IoAcceptor; 29 import org.apache.mina.common.IoHandler; 30 import org.apache.mina.common.IoHandlerAdapter; 31 import org.apache.mina.common.IoSession; 32 import org.apache.mina.common.TransportType; 33 import org.apache.mina.util.AvailablePortFinder; 34 35 42 public abstract class AbstractTrafficControlTest extends TestCase { 43 protected int port = 0; 44 45 protected IoAcceptor acceptor; 46 47 protected TransportType transportType; 48 49 public AbstractTrafficControlTest(IoAcceptor acceptor) { 50 this.acceptor = acceptor; 51 } 52 53 protected void setUp() throws Exception { 54 super.setUp(); 55 56 port = AvailablePortFinder.getNextAvailable(); 57 58 acceptor.bind(createServerSocketAddress(port), new ServerIoHandler()); 59 60 } 61 62 protected void tearDown() throws Exception { 63 super.tearDown(); 64 65 acceptor.unbind(createServerSocketAddress(port)); 66 } 67 68 protected abstract ConnectFuture connect(int port, IoHandler handler) 69 throws Exception ; 70 71 protected abstract SocketAddress createServerSocketAddress(int port); 72 73 public void testSuspendResumeReadWrite() throws Exception { 74 ConnectFuture future = connect(port, new ClientIoHandler()); 75 future.join(); 76 IoSession session = future.getSession(); 77 78 while (session.getAttribute("lock") == null) { 81 Thread.yield(); 82 } 83 84 Object lock = session.getAttribute("lock"); 85 synchronized (lock) { 86 87 write(session, "1"); 88 assertEquals('1', read(session)); 89 assertEquals("1", getReceived(session)); 90 assertEquals("1", getSent(session)); 91 92 session.suspendRead(); 93 94 write(session, "2"); 95 assertFalse(canRead(session)); 96 assertEquals("1", getReceived(session)); 97 assertEquals("12", getSent(session)); 98 99 session.suspendWrite(); 100 101 write(session, "3"); 102 assertFalse(canRead(session)); 103 assertEquals("1", getReceived(session)); 104 assertEquals("12", getSent(session)); 105 106 session.resumeRead(); 107 108 write(session, "4"); 109 assertEquals('2', read(session)); 110 assertEquals("12", getReceived(session)); 111 assertEquals("12", getSent(session)); 112 113 session.resumeWrite(); 114 assertEquals('3', read(session)); 115 assertEquals('4', read(session)); 116 117 write(session, "5"); 118 assertEquals('5', read(session)); 119 assertEquals("12345", getReceived(session)); 120 assertEquals("12345", getSent(session)); 121 122 session.suspendWrite(); 123 124 write(session, "6"); 125 assertFalse(canRead(session)); 126 assertEquals("12345", getReceived(session)); 127 assertEquals("12345", getSent(session)); 128 129 session.suspendRead(); 130 session.resumeWrite(); 131 132 write(session, "7"); 133 assertFalse(canRead(session)); 134 assertEquals("12345", getReceived(session)); 135 assertEquals("1234567", getSent(session)); 136 137 session.resumeRead(); 138 assertEquals('6', read(session)); 139 assertEquals('7', read(session)); 140 141 assertEquals("1234567", getReceived(session)); 142 assertEquals("1234567", getSent(session)); 143 144 } 145 146 session.close().join(); 147 } 148 149 private void write(IoSession session, String s) throws Exception { 150 session.write(ByteBuffer.wrap(s.getBytes("ASCII"))); 151 } 152 153 private int read(IoSession session) throws Exception { 154 int pos = ((Integer ) session.getAttribute("pos")).intValue(); 155 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) { 156 Object lock = session.getAttribute("lock"); 157 lock.wait(200); 158 } 159 session.setAttribute("pos", new Integer (pos + 1)); 160 return getReceived(session).charAt(pos); 161 } 162 163 private boolean canRead(IoSession session) throws Exception { 164 int pos = ((Integer ) session.getAttribute("pos")).intValue(); 165 Object lock = session.getAttribute("lock"); 166 lock.wait(250); 167 String received = getReceived(session); 168 return pos < received.length(); 169 } 170 171 private String getReceived(IoSession session) throws Exception { 172 return session.getAttribute("received").toString(); 173 } 174 175 private String getSent(IoSession session) throws Exception { 176 return session.getAttribute("sent").toString(); 177 } 178 179 public static class ClientIoHandler extends IoHandlerAdapter { 180 public void sessionCreated(IoSession session) throws Exception { 181 super.sessionCreated(session); 182 session.setAttribute("pos", new Integer (0)); 183 session.setAttribute("received", new StringBuffer ()); 184 session.setAttribute("sent", new StringBuffer ()); 185 session.setAttribute("lock", new Object ()); 186 } 187 188 public void messageReceived(IoSession session, Object message) 189 throws Exception { 190 ByteBuffer buffer = (ByteBuffer) message; 191 byte[] data = new byte[buffer.remaining()]; 192 buffer.get(data); 193 Object lock = session.getAttribute("lock"); 194 synchronized (lock) { 195 StringBuffer sb = (StringBuffer ) session 196 .getAttribute("received"); 197 sb.append(new String (data, "ASCII")); 198 lock.notifyAll(); 199 } 200 } 201 202 public void messageSent(IoSession session, Object message) 203 throws Exception { 204 ByteBuffer buffer = (ByteBuffer) message; 205 buffer.rewind(); 206 byte[] data = new byte[buffer.remaining()]; 207 buffer.get(data); 208 StringBuffer sb = (StringBuffer ) session.getAttribute("sent"); 209 sb.append(new String (data, "ASCII")); 210 } 211 212 } 213 214 private static class ServerIoHandler extends IoHandlerAdapter { 215 public void messageReceived(IoSession session, Object message) 216 throws Exception { 217 ByteBuffer rb = (ByteBuffer) message; 219 ByteBuffer wb = ByteBuffer.allocate(rb.remaining()); 220 wb.put(rb); 221 wb.flip(); 222 session.write(wb); 223 } 224 } 225 } 226 | Popular Tags |