1 17 package org.apache.activemq.transport.tcp; 18 19 import java.io.IOException ; 20 import java.net.URI ; 21 import java.net.URISyntaxException ; 22 23 import org.apache.activemq.CombinationTestSupport; 24 import org.apache.activemq.command.WireFormatInfo; 25 import org.apache.activemq.openwire.OpenWireFormat; 26 import org.apache.activemq.transport.Transport; 27 import org.apache.activemq.transport.TransportAcceptListener; 28 import org.apache.activemq.transport.TransportFactory; 29 import org.apache.activemq.transport.TransportListener; 30 import org.apache.activemq.transport.TransportServer; 31 32 import javax.net.SocketFactory; 33 34 import java.util.concurrent.atomic.AtomicBoolean ; 35 import java.util.concurrent.atomic.AtomicInteger ; 36 37 38 public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener { 39 40 private TransportServer server; 41 private Transport clientTransport; 42 private Transport serverTransport; 43 44 private final AtomicInteger clientReceiveCount = new AtomicInteger (0); 45 private final AtomicInteger clientErrorCount = new AtomicInteger (0); 46 private final AtomicInteger serverReceiveCount = new AtomicInteger (0); 47 private final AtomicInteger serverErrorCount = new AtomicInteger (0); 48 49 private final AtomicBoolean ignoreClientError = new AtomicBoolean (false); 50 private final AtomicBoolean ignoreServerError = new AtomicBoolean (false); 51 52 public Runnable serverRunOnCommand; 53 public Runnable clientRunOnCommand; 54 55 protected void setUp() throws Exception { 56 super.setUp(); 57 startTransportServer(); 58 } 59 60 64 private void startClient() throws Exception , URISyntaxException { 65 clientTransport = TransportFactory.connect(new URI ("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); 66 clientTransport.setTransportListener(new TransportListener() { 67 public void onCommand(Object command) { 68 clientReceiveCount.incrementAndGet(); 69 if( clientRunOnCommand !=null ) { 70 clientRunOnCommand.run(); 71 } 72 } 73 public void onException(IOException error) { 74 if( !ignoreClientError.get() ) { 75 log.info("Client transport error:"); 76 error.printStackTrace(); 77 clientErrorCount.incrementAndGet(); 78 } 79 } 80 public void transportInterupted() { 81 } 82 public void transportResumed() { 83 }}); 84 clientTransport.start(); 85 } 86 87 92 private void startTransportServer() throws IOException , URISyntaxException , Exception { 93 server = TransportFactory.bind("localhost", new URI ("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); 94 server.setAcceptListener(this); 95 server.start(); 96 } 97 98 protected void tearDown() throws Exception { 99 ignoreClientError.set(true); 100 ignoreServerError.set(true); 101 try { 102 if( clientTransport!=null ) 103 clientTransport.stop(); 104 if( serverTransport!=null ) 105 serverTransport.stop(); 106 if( server!=null ) 107 server.stop(); 108 } catch (Throwable e) { 109 e.printStackTrace(); 110 } 111 super.tearDown(); 112 } 113 114 public void onAccept(Transport transport) { 115 try { 116 log.info("["+getName()+"] Server Accepted a Connection"); 117 serverTransport = transport; 118 serverTransport.setTransportListener(new TransportListener() { 119 public void onCommand(Object command) { 120 serverReceiveCount.incrementAndGet(); 121 if( serverRunOnCommand !=null ) { 122 serverRunOnCommand.run(); 123 } 124 } 125 public void onException(IOException error) { 126 if( !ignoreClientError.get() ) { 127 log.info("Server transport error:"); 128 error.printStackTrace(); 129 serverErrorCount.incrementAndGet(); 130 } 131 } 132 public void transportInterupted() { 133 } 134 public void transportResumed() { 135 }}); 136 serverTransport.start(); 137 } catch (Exception e) { 138 e.printStackTrace(); 139 } 140 } 141 142 public void onAcceptError(Exception error) { 143 error.printStackTrace(); 144 } 145 146 public void testClientHang() throws Exception { 147 148 clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI ("tcp://localhost:61616"), null); 152 clientTransport.setTransportListener(new TransportListener() { 153 public void onCommand(Object command) { 154 clientReceiveCount.incrementAndGet(); 155 if( clientRunOnCommand !=null ) { 156 clientRunOnCommand.run(); 157 } 158 } 159 public void onException(IOException error) { 160 if( !ignoreClientError.get() ) { 161 log.info("Client transport error:"); 162 error.printStackTrace(); 163 clientErrorCount.incrementAndGet(); 164 } 165 } 166 public void transportInterupted() { 167 } 168 public void transportResumed() { 169 }}); 170 clientTransport.start(); 171 WireFormatInfo info = new WireFormatInfo(); 172 info.seMaxInactivityDuration(1000); 173 clientTransport.oneway(info); 174 175 assertEquals(0, serverErrorCount.get()); 176 assertEquals(0, clientErrorCount.get()); 177 178 Thread.sleep(3000); 180 181 assertEquals(0, clientErrorCount.get()); 182 assertTrue(serverErrorCount.get()>0); 183 } 184 185 public void testNoClientHang() throws Exception { 186 startClient(); 187 188 assertEquals(0, serverErrorCount.get()); 189 assertEquals(0, clientErrorCount.get()); 190 191 Thread.sleep(4000); 192 193 assertEquals(0, clientErrorCount.get()); 194 assertEquals(0, serverErrorCount.get()); 195 } 196 197 203 public void initCombosForTestNoClientHangWithServerBlock() throws Exception { 204 205 startClient(); 206 207 addCombinationValues("clientInactivityLimit", new Object [] { new Long (1000)}); 208 addCombinationValues("serverInactivityLimit", new Object [] { new Long (1000)}); 209 addCombinationValues("serverRunOnCommand", new Object [] { new Runnable () { 210 public void run() { 211 try { 212 log.info("Sleeping"); 213 Thread.sleep(4000); 214 } catch (InterruptedException e) { 215 } 216 } 217 }}); 218 } 219 220 public void testNoClientHangWithServerBlock() throws Exception { 221 222 startClient(); 223 224 assertEquals(0, serverErrorCount.get()); 225 assertEquals(0, clientErrorCount.get()); 226 227 Thread.sleep(4000); 228 229 assertEquals(0, clientErrorCount.get()); 230 assertEquals(0, serverErrorCount.get()); 231 } 232 233 } 234 | Popular Tags |