1 18 package org.apache.activemq.proxy; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.Iterator ; 24 25 import org.apache.activemq.Service; 26 import org.apache.activemq.transport.CompositeTransport; 27 import org.apache.activemq.transport.Transport; 28 import org.apache.activemq.transport.TransportAcceptListener; 29 import org.apache.activemq.transport.TransportFactory; 30 import org.apache.activemq.transport.TransportFilter; 31 import org.apache.activemq.transport.TransportServer; 32 import org.apache.activemq.util.ServiceStopper; 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 36 import java.util.concurrent.CopyOnWriteArrayList ; 37 38 43 public class ProxyConnector implements Service { 44 45 private static final Log log = LogFactory.getLog(ProxyConnector.class); 46 private TransportServer server; 47 private URI bind; 48 private URI remote; 49 private URI localUri; 50 private String name; 51 52 CopyOnWriteArrayList connections = new CopyOnWriteArrayList (); 53 54 public void start() throws Exception { 55 56 this.getServer().setAcceptListener(new TransportAcceptListener() { 57 public void onAccept(Transport localTransport) { 58 try { 59 Transport remoteTransport = createRemoteTransport(); 60 ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport); 61 connections.add(connection); 62 connection.start(); 63 } 64 catch (Exception e) { 65 onAcceptError(e); 66 } 67 } 68 69 public void onAcceptError(Exception error) { 70 log.error("Could not accept connection: " + error, error); 71 } 72 }); 73 getServer().start(); 74 log.info("Proxy Connector "+getName()+" Started"); 75 76 } 77 78 public void stop() throws Exception { 79 ServiceStopper ss = new ServiceStopper(); 80 if (this.server != null) { 81 ss.stop(this.server); 82 } 83 for (Iterator iter = connections.iterator(); iter.hasNext();) { 84 log.info("Connector stopped: Stopping proxy."); 85 ss.stop((Service) iter.next()); 86 } 87 ss.throwFirstException(); 88 log.info("Proxy Connector " + getName() + " Stopped"); 89 } 90 91 94 public URI getLocalUri() { 95 return localUri; 96 } 97 98 public void setLocalUri(URI localURI) { 99 this.localUri = localURI; 100 } 101 102 public URI getBind() { 103 return bind; 104 } 105 106 public void setBind(URI bind) { 107 this.bind = bind; 108 } 109 110 public URI getRemote() { 111 return remote; 112 } 113 114 public void setRemote(URI remote) { 115 this.remote = remote; 116 } 117 118 public TransportServer getServer() throws IOException , URISyntaxException { 119 if (server == null) { 120 server = createServer(); 121 } 122 return server; 123 } 124 125 public void setServer(TransportServer server) { 126 this.server = server; 127 } 128 129 protected TransportServer createServer() throws IOException , URISyntaxException { 130 if (bind == null) { 131 throw new IllegalArgumentException ("You must specify either a server or the bind property"); 132 } 133 return TransportFactory.bind(null, bind); 134 } 135 136 private Transport createRemoteTransport() throws Exception { 137 Transport transport = TransportFactory.compositeConnect(remote); 138 CompositeTransport ct = (CompositeTransport) transport.narrow(CompositeTransport.class); 139 if( ct !=null && localUri!=null ) { 140 ct.add(new URI []{localUri}); 141 } 142 143 transport = new TransportFilter(transport) { 145 public void stop() throws Exception { 146 System.out.println("Stopping proxy."); 147 super.stop(); 148 connections.remove(this); 149 } 150 }; 151 return transport; 152 } 153 154 public String getName() { 155 if( name == null ) { 156 if( server!=null ) { 157 name = server.getConnectURI().toString(); 158 } else { 159 name = "proxy"; 160 } 161 } 162 return name; 163 } 164 165 public void setName(String name) { 166 this.name = name; 167 } 168 169 } 170 | Popular Tags |