1 18 package org.apache.activemq.transport.vm; 19 20 import java.io.IOException ; 21 import java.net.InetSocketAddress ; 22 import java.net.URI ; 23 24 import org.apache.activemq.command.BrokerInfo; 25 import org.apache.activemq.transport.MutexTransport; 26 import org.apache.activemq.transport.ResponseCorrelator; 27 import org.apache.activemq.transport.Transport; 28 import org.apache.activemq.transport.TransportAcceptListener; 29 import org.apache.activemq.transport.TransportServer; 30 31 import java.util.concurrent.atomic.AtomicInteger ; 32 33 37 public class VMTransportServer implements TransportServer { 38 39 private TransportAcceptListener acceptListener; 40 private final URI location; 41 private boolean disposed; 42 43 private final AtomicInteger connectionCount = new AtomicInteger (0); 44 private final boolean disposeOnDisconnect; 45 46 50 public VMTransportServer(URI location, boolean disposeOnDisconnect) { 51 this.location = location; 52 this.disposeOnDisconnect=disposeOnDisconnect; 53 } 54 55 58 public String toString(){ 59 return "VMTransportServer(" + location +")"; 60 } 61 62 66 public VMTransport connect() throws IOException { 67 TransportAcceptListener al; 68 synchronized (this) { 69 if( disposed ) 70 throw new IOException ("Server has been disposed."); 71 al = acceptListener; 72 } 73 if( al == null) 74 throw new IOException ("Server TransportAcceptListener is null."); 75 76 connectionCount.incrementAndGet(); 77 VMTransport client = new VMTransport(location) { 78 public void stop() throws Exception { 79 if( disposed ) 80 return; 81 super.stop(); 82 if( connectionCount.decrementAndGet()==0 && disposeOnDisconnect ) { 83 VMTransportServer.this.stop(); 84 } 85 }; 86 }; 87 88 VMTransport server = new VMTransport(location); 89 client.setPeer(server); 90 server.setPeer(client); 91 al.onAccept(configure(server)); 92 return client; 93 } 94 95 100 public static Transport configure(Transport transport) { 101 transport = new MutexTransport(transport); 102 transport = new ResponseCorrelator(transport); 103 return transport; 104 } 105 106 107 112 synchronized public void setAcceptListener(TransportAcceptListener acceptListener) { 113 this.acceptListener = acceptListener; 114 } 115 116 public void start() throws IOException { 117 } 118 119 public void stop() throws IOException { 120 VMTransportFactory.stopped(this); 121 } 122 123 public URI getConnectURI() { 124 return location; 125 } 126 127 public URI getBindURI() { 128 return location; 129 } 130 131 public void setBrokerInfo(BrokerInfo brokerInfo) { 132 } 133 134 public InetSocketAddress getSocketAddress() { 135 return null; 136 } 137 } 138 | Popular Tags |