1 18 package org.apache.activemq.proxy; 19 20 import java.io.IOException ; 21 import org.apache.activemq.Service; 22 import org.apache.activemq.command.ShutdownInfo; 23 import org.apache.activemq.transport.DefaultTransportListener; 24 import org.apache.activemq.transport.Transport; 25 import org.apache.activemq.transport.TransportListener; 26 import org.apache.activemq.util.IOExceptionSupport; 27 import org.apache.activemq.util.ServiceStopper; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import java.util.concurrent.atomic.AtomicBoolean ; 31 32 class ProxyConnection implements Service { 33 34 static final private Log log = LogFactory.getLog(ProxyConnection.class); 35 36 private final Transport localTransport; 37 private final Transport remoteTransport; 38 private AtomicBoolean shuttingDown = new AtomicBoolean (false); 39 private AtomicBoolean running = new AtomicBoolean (false); 40 41 public ProxyConnection(Transport localTransport, Transport remoteTransport) { 42 this.localTransport = localTransport; 43 this.remoteTransport = remoteTransport; 44 } 45 46 public void onFailure(IOException e) { 47 if( !shuttingDown.get() ) { 48 log.debug("Transport error: "+e,e); 49 try { 50 stop(); 51 } catch (Exception ignore) { 52 } 53 } 54 } 55 56 public void start() throws Exception { 57 if( !running.compareAndSet(false, true) ) { 58 return; 59 } 60 61 this.localTransport.setTransportListener(new DefaultTransportListener() { 62 public void onCommand(Object command) { 63 boolean shutdown=false; 64 if( command.getClass() == ShutdownInfo.class ) { 65 shuttingDown.set(true); 66 shutdown=true; 67 } 68 try { 69 remoteTransport.oneway(command); 70 if( shutdown ) 71 stop(); 72 } catch (IOException error) { 73 onFailure(error); 74 } catch (Exception error) { 75 onFailure(IOExceptionSupport.create(error)); 76 } 77 } 78 public void onException(IOException error) { 79 onFailure(error); 80 } 81 }); 82 83 this.remoteTransport.setTransportListener(new DefaultTransportListener() { 84 public void onCommand(Object command) { 85 try { 86 localTransport.oneway(command); 87 } catch (IOException error) { 88 onFailure(error); 89 } 90 } 91 public void onException(IOException error) { 92 onFailure(error); 93 } 94 }); 95 96 localTransport.start(); 97 remoteTransport.start(); 98 } 99 100 public void stop() throws Exception { 101 if( !running.compareAndSet(true, false) ) { 102 return; 103 } 104 shuttingDown.set(true); 105 ServiceStopper ss = new ServiceStopper(); 106 ss.stop(localTransport); 107 ss.stop(remoteTransport); 108 ss.throwFirstException(); 109 } 110 111 } 112 | Popular Tags |