1 17 package org.apache.activemq.transport.tcp; 18 19 import java.io.IOException ; 20 import java.net.URI ; 21 import java.net.URISyntaxException ; 22 23 import org.apache.activemq.CombinationTestSupport; 24 import org.apache.activemq.command.CommandTypes; 25 import org.apache.activemq.command.WireFormatInfo; 26 import org.apache.activemq.openwire.OpenWireFormat; 27 import org.apache.activemq.transport.Transport; 28 import org.apache.activemq.transport.TransportAcceptListener; 29 import org.apache.activemq.transport.TransportFactory; 30 import org.apache.activemq.transport.TransportListener; 31 import org.apache.activemq.transport.TransportServer; 32 33 import javax.net.SocketFactory; 34 35 import java.util.concurrent.CountDownLatch ; 36 import java.util.concurrent.TimeUnit ; 37 import java.util.concurrent.atomic.AtomicBoolean ; 38 import java.util.concurrent.atomic.AtomicInteger ; 39 import java.util.concurrent.atomic.AtomicReference ; 40 41 42 public class WireformatNegociationTest extends CombinationTestSupport { 43 44 private TransportServer server; 45 private Transport clientTransport; 46 private Transport serverTransport; 47 48 private final AtomicReference <WireFormatInfo> clientWF = new AtomicReference <WireFormatInfo>(); 49 private final AtomicReference <WireFormatInfo> serverWF = new AtomicReference <WireFormatInfo>(); 50 private final AtomicReference <Exception > asyncError = new AtomicReference <Exception >(); 51 private final AtomicBoolean ignoreAsycError = new AtomicBoolean (); 52 53 private final CountDownLatch negociationCounter = new CountDownLatch (2); 54 55 protected void setUp() throws Exception { 56 super.setUp(); 57 } 58 59 63 private void startClient(String uri) throws Exception , URISyntaxException { 64 clientTransport = TransportFactory.connect(new URI (uri)); 65 clientTransport.setTransportListener(new TransportListener() { 66 public void onCommand(Object command) { 67 if( command instanceof WireFormatInfo ) { 68 clientWF.set((WireFormatInfo) command); 69 negociationCounter.countDown(); 70 } 71 } 72 public void onException(IOException error) { 73 if( !ignoreAsycError.get() ) { 74 log.info("Client transport error: ", error); 75 asyncError.set(error); 76 negociationCounter.countDown(); 77 } 78 } 79 public void transportInterupted() { 80 } 81 public void transportResumed() { 82 }}); 83 clientTransport.start(); 84 } 85 86 91 private void startServer(String uri ) throws IOException , URISyntaxException , Exception { 92 server = TransportFactory.bind("localhost", new URI (uri)); 93 server.setAcceptListener(new TransportAcceptListener(){ 94 public void onAccept(Transport transport) { 95 try { 96 log.info("["+getName()+"] Server Accepted a Connection"); 97 serverTransport = transport; 98 serverTransport.setTransportListener(new TransportListener() { 99 public void onCommand(Object command) { 100 if( command instanceof WireFormatInfo ) { 101 serverWF.set((WireFormatInfo) command); 102 negociationCounter.countDown(); 103 } 104 } 105 public void onException(IOException error) { 106 if( !ignoreAsycError.get() ) { 107 log.info("Server transport error: ", error); 108 asyncError.set(error); 109 negociationCounter.countDown(); 110 } 111 } 112 public void transportInterupted() { 113 } 114 public void transportResumed() { 115 }}); 116 serverTransport.start(); 117 } catch (Exception e) { 118 e.printStackTrace(); 119 } 120 } 121 122 public void onAcceptError(Exception error) { 123 error.printStackTrace(); 124 } 125 }); 126 server.start(); 127 } 128 129 protected void tearDown() throws Exception { 130 ignoreAsycError.set(true); 131 try { 132 if( clientTransport!=null ) 133 clientTransport.stop(); 134 if( serverTransport!=null ) 135 serverTransport.stop(); 136 if( server!=null ) 137 server.stop(); 138 } catch (Throwable e) { 139 e.printStackTrace(); 140 } 141 super.tearDown(); 142 } 143 144 145 148 public void testWireFomatInfoSeverVersion1() throws Exception { 149 150 startServer("tcp://localhost:61616?wireFormat.version=1"); 151 startClient("tcp://localhost:61616"); 152 153 assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); 154 assertNull("Async error: "+asyncError, asyncError.get()); 155 156 assertNotNull(clientWF.get()); 157 assertEquals(1, clientWF.get().getVersion()); 158 159 assertNotNull(serverWF.get()); 160 assertEquals(1, serverWF.get().getVersion()); 161 } 162 163 166 public void testWireFomatInfoClientVersion1() throws Exception { 167 168 startServer("tcp://localhost:61616"); 169 startClient("tcp://localhost:61616?wireFormat.version=1"); 170 171 assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); 172 assertNull("Async error: "+asyncError, asyncError.get()); 173 174 assertNotNull(clientWF.get()); 175 assertEquals(1, clientWF.get().getVersion()); 176 177 assertNotNull(serverWF.get()); 178 assertEquals(1, serverWF.get().getVersion()); 179 } 180 181 184 public void testWireFomatInfoCurrentVersion() throws Exception { 185 186 startServer("tcp://localhost:61616"); 187 startClient("tcp://localhost:61616"); 188 189 assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); 190 assertNull("Async error: "+asyncError, asyncError.get()); 191 192 assertNotNull(clientWF.get()); 193 assertEquals(CommandTypes.PROTOCOL_VERSION, clientWF.get().getVersion()); 194 195 assertNotNull(serverWF.get()); 196 assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion()); 197 } 198 199 } 200 | Popular Tags |