1 18 package org.apache.activemq.broker; 19 20 import java.io.IOException ; 21 22 import org.apache.activemq.Service; 23 import org.apache.activemq.command.Command; 24 import org.apache.activemq.command.ExceptionResponse; 25 import org.apache.activemq.command.Message; 26 import org.apache.activemq.command.Response; 27 import org.apache.activemq.command.ShutdownInfo; 28 import org.apache.activemq.transport.DefaultTransportListener; 29 import org.apache.activemq.transport.Transport; 30 import org.apache.activemq.transport.TransportFactory; 31 import org.apache.activemq.util.JMSExceptionSupport; 32 import org.apache.activemq.util.ServiceSupport; 33 34 import java.util.concurrent.BlockingQueue ; 35 import java.util.concurrent.LinkedBlockingQueue ; 36 37 public class StubConnection implements Service { 38 39 private final BlockingQueue dispatchQueue = new LinkedBlockingQueue (); 40 private Connection connection; 41 private Transport transport; 42 boolean shuttingDown = false; 43 44 protected void dispatch(Object command) throws InterruptedException , IOException { 45 dispatchQueue.put(command); 46 } 47 48 public StubConnection(BrokerService broker) throws Exception { 49 this(TransportFactory.connect(broker.getVmConnectorURI())); 50 } 51 52 public StubConnection(Connection connection) { 53 this.connection = connection; 54 } 55 56 public StubConnection(Transport transport) throws Exception { 57 this.transport = transport; 58 transport.setTransportListener(new DefaultTransportListener() { 59 public void onCommand(Object command) { 60 try { 61 if (command.getClass() == ShutdownInfo.class) { 62 shuttingDown = true; 63 } 64 StubConnection.this.dispatch(command); 65 } 66 catch (Exception e) { 67 onException(new IOException ("" + e)); 68 } 69 } 70 71 public void onException(IOException error) { 72 if (!shuttingDown) { 73 error.printStackTrace(); 74 } 75 } 76 }); 77 transport.start(); 78 } 79 80 public BlockingQueue getDispatchQueue() { 81 return dispatchQueue; 82 } 83 84 public void send(Command command) throws Exception { 85 if( command instanceof Message ) { 86 Message message = (Message) command; 87 message.setProducerId(message.getMessageId().getProducerId()); 88 } 89 command.setResponseRequired(false); 90 if (connection != null) { 91 Response response = connection.service(command); 92 if (response != null && response.isException()) { 93 ExceptionResponse er = (ExceptionResponse) response; 94 throw JMSExceptionSupport.create(er.getException()); 95 } 96 } 97 else if (transport != null) { 98 transport.oneway(command); 99 } 100 } 101 102 public Response request(Command command) throws Exception { 103 if( command instanceof Message ) { 104 Message message = (Message) command; 105 message.setProducerId(message.getMessageId().getProducerId()); 106 } 107 command.setResponseRequired(true); 108 if (connection != null) { 109 Response response = connection.service(command); 110 if (response != null && response.isException()) { 111 ExceptionResponse er = (ExceptionResponse) response; 112 throw JMSExceptionSupport.create(er.getException()); 113 } 114 return response; 115 } 116 else if (transport != null) { 117 Response response = (Response) transport.request(command); 118 if (response != null && response.isException()) { 119 ExceptionResponse er = (ExceptionResponse) response; 120 throw JMSExceptionSupport.create(er.getException()); 121 } 122 return response; 123 } 124 return null; 125 } 126 127 public Connection getConnection() { 128 return connection; 129 } 130 131 public Transport getTransport() { 132 return transport; 133 } 134 135 public void start() throws Exception { 136 } 137 138 public void stop() throws Exception { 139 shuttingDown = true; 140 if (transport != null) { 141 try { 142 transport.oneway(new ShutdownInfo()); 143 } 144 catch (IOException e) { 145 } 146 ServiceSupport.dispose(transport); 147 } 148 } 149 } 150 | Popular Tags |