1 18 package org.apache.activemq.transport; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 23 import org.apache.activemq.command.Command; 24 import org.apache.activemq.command.WireFormatInfo; 25 import org.apache.activemq.openwire.OpenWireFormat; 26 import org.apache.activemq.util.IOExceptionSupport; 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 30 import java.util.concurrent.CountDownLatch ; 31 import java.util.concurrent.TimeUnit ; 32 import java.util.concurrent.atomic.AtomicBoolean ; 33 34 35 38 public class WireFormatNegotiator extends TransportFilter { 39 40 private static final Log log = LogFactory.getLog(WireFormatNegotiator.class); 41 42 private OpenWireFormat wireFormat; 43 private final int minimumVersion; 44 private long negotiateTimeout=15000; 45 46 private final AtomicBoolean firstStart=new AtomicBoolean (true); 47 private final CountDownLatch readyCountDownLatch = new CountDownLatch (1); 48 private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch (1); 49 50 55 public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) { 56 super(next); 57 this.wireFormat = wireFormat; 58 if (minimumVersion <= 0) { 59 minimumVersion = 1; 60 } 61 this.minimumVersion = minimumVersion; 62 } 63 64 65 public void start() throws Exception { 66 super.start(); 67 if( firstStart.compareAndSet(true, false) ) { 68 try { 69 WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); 70 if (log.isDebugEnabled()) { 71 log.debug("Sending: " + info); 72 } 73 sendWireFormat(info); 74 } finally { 75 wireInfoSentDownLatch.countDown(); 76 } 77 } 78 } 79 80 public void stop() throws Exception { 81 super.stop(); 82 readyCountDownLatch.countDown(); 83 } 84 85 public void oneway(Object command) throws IOException { 86 try { 87 if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) ) 88 throw new IOException ("Wire format negotiation timeout: peer did not send his wire format."); 89 } catch (InterruptedException e) { 90 Thread.currentThread().interrupt(); 91 throw new InterruptedIOException (); 92 } 93 super.oneway(command); 94 } 95 96 97 public void onCommand(Object o) { 98 Command command = (Command) o; 99 if( command.isWireFormatInfo() ) { 100 WireFormatInfo info = (WireFormatInfo) command; 101 if (log.isDebugEnabled()) { 102 log.debug("Received WireFormat: " + info); 103 } 104 105 try { 106 wireInfoSentDownLatch.await(); 107 108 if (log.isDebugEnabled()) { 109 log.debug(this + " before negotiation: " + wireFormat); 110 } 111 if( !info.isValid() ) { 112 onException(new IOException ("Remote wire format magic is invalid")); 113 } else if( info.getVersion() < minimumVersion ) { 114 onException(new IOException ("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")")); 115 } 116 117 wireFormat.renegotiateWireFormat(info); 118 119 if (log.isDebugEnabled()) { 120 log.debug(this + " after negotiation: " + wireFormat); 121 } 122 123 } catch (IOException e) { 124 onException(e); 125 } catch (InterruptedException e) { 126 onException((IOException ) new InterruptedIOException ().initCause(e)); 127 } catch (Exception e) { 128 onException(IOExceptionSupport.create(e)); 129 } 130 readyCountDownLatch.countDown(); 131 onWireFormatNegotiated(info); 132 } 133 getTransportListener().onCommand(command); 134 } 135 136 137 public void onException(IOException error) { 138 readyCountDownLatch.countDown(); 139 147 super.onException(error); 148 } 149 150 public String toString() { 151 return next.toString(); 152 } 153 154 protected void sendWireFormat(WireFormatInfo info) throws IOException { 155 next.oneway(info); 156 } 157 158 protected void onWireFormatNegotiated(WireFormatInfo info) { 159 } 160 161 162 public long getNegotiateTimeout() { 163 return negotiateTimeout; 164 } 165 166 public void setNegotiateTimeout(long negotiateTimeout) { 167 this.negotiateTimeout = negotiateTimeout; 168 } 169 } 170 | Popular Tags |