1 18 package org.apache.activemq; 19 20 import java.util.concurrent.TimeUnit ; 21 import java.util.concurrent.atomic.AtomicBoolean ; 22 23 import org.apache.activemq.ActiveMQConnectionFactory; 24 import org.apache.activemq.broker.Broker; 25 import org.apache.activemq.broker.BrokerFactory; 26 import org.apache.activemq.broker.BrokerService; 27 import org.apache.activemq.broker.StubConnection; 28 import org.apache.activemq.broker.TransportConnector; 29 import org.apache.activemq.command.ActiveMQDestination; 30 import org.apache.activemq.command.ConnectionId; 31 import org.apache.activemq.command.ConnectionInfo; 32 import org.apache.activemq.command.ConsumerInfo; 33 import org.apache.activemq.command.Message; 34 import org.apache.activemq.command.MessageAck; 35 import org.apache.activemq.command.MessageDispatch; 36 import org.apache.activemq.command.RemoveInfo; 37 import org.apache.activemq.command.SessionInfo; 38 import org.apache.activemq.transport.TransportFactory; 39 40 import javax.jms.JMSException ; 41 42 import java.io.File ; 43 import java.io.IOException ; 44 import java.net.URI ; 45 import java.net.URISyntaxException ; 46 47 import junit.framework.TestCase; 48 49 public class ClientTestSupport extends TestCase { 50 51 private ActiveMQConnectionFactory connFactory; 52 protected BrokerService broker; 53 private String brokerURL = "vm://localhost?broker.persistent=false"; 54 55 protected long idGenerator=0; 56 57 public void setUp() throws Exception { 58 final AtomicBoolean connected = new AtomicBoolean (false); 59 TransportConnector connector; 60 61 try { 63 broker = BrokerFactory.createBroker(new URI (this.brokerURL)); 64 String brokerId = broker.getBrokerName(); 65 connector = new TransportConnector(broker.getBroker(), TransportFactory.bind(brokerId,new URI (this.brokerURL))) { 66 protected org.apache.activemq.broker.Connection createConnection(org.apache.activemq.transport.Transport transport) throws IOException { 68 connected.set(true); 69 return super.createConnection(transport); 70 } 71 }; 72 connector.start(); 73 broker.start(); 74 75 } catch (IOException e) { 76 throw new JMSException ("Error creating broker " + e); 77 } catch (URISyntaxException e) { 78 throw new JMSException ("Error creating broker " + e); 79 } 80 81 URI connectURI; 82 connectURI = connector.getServer().getConnectURI(); 83 84 connFactory = new ActiveMQConnectionFactory(connectURI); 86 } 87 88 89 protected void tearDown() throws Exception { 90 super.tearDown(); 91 if (broker != null) { 92 broker.stop(); 93 } 94 } 95 96 97 public ActiveMQConnectionFactory getConnectionFactory() throws JMSException { 98 if(this.connFactory == null){ 99 throw new JMSException ("ActiveMQConnectionFactory is null "); 100 } 101 return this.connFactory; 102 } 103 104 protected ConnectionInfo createConnectionInfo() throws Exception { 106 ConnectionInfo info = new ConnectionInfo(); 107 info.setConnectionId(new ConnectionId("connection:"+(++idGenerator))); 108 info.setClientId( info.getConnectionId().getValue() ); 109 return info; 110 } 111 112 protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { 113 SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); 114 return info; 115 } 116 117 protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { 118 ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); 119 info.setBrowser(false); 120 info.setDestination(destination); 121 info.setPrefetchSize(1000); 122 info.setDispatchAsync(false); 123 return info; 124 } 125 126 protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) { 127 return consumerInfo.createRemoveCommand(); 128 } 129 130 protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { 131 MessageAck ack = new MessageAck(); 132 ack.setAckType(ackType); 133 ack.setConsumerId(consumerInfo.getConsumerId()); 134 ack.setDestination( msg.getDestination() ); 135 ack.setLastMessageId( msg.getMessageId() ); 136 ack.setMessageCount(count); 137 return ack; 138 } 139 140 protected Message receiveMessage(StubConnection connection, int MAX_WAIT) throws InterruptedException { 141 while( true ) { 142 Object o = connection.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS); 143 144 if( o == null ) 145 return null; 146 147 if( o instanceof MessageDispatch ) { 148 MessageDispatch dispatch = (MessageDispatch)o; 149 return dispatch.getMessage(); 150 } 151 } 152 } 153 154 protected Broker getBroker() throws Exception { 155 return this.broker != null?this.broker.getBroker():null; 156 } 157 158 public static void removeMessageStore() { 159 if( System.getProperty("activemq.store.dir")!=null ) { 160 recursiveDelete(new File (System.getProperty("activemq.store.dir"))); 161 } 162 if( System.getProperty("derby.system.home")!=null ) { 163 recursiveDelete(new File (System.getProperty("derby.system.home"))); 164 } 165 } 166 167 public static void recursiveDelete(File f) { 168 if( f.isDirectory() ) { 169 File [] files = f.listFiles(); 170 for (int i = 0; i < files.length; i++) { 171 recursiveDelete(files[i]); 172 } 173 } 174 f.delete(); 175 } 176 177 } 178 | Popular Tags |