1 18 package org.apache.activemq.network; 19 20 import javax.jms.DeliveryMode ; 21 22 import org.apache.activemq.broker.StubConnection; 23 import org.apache.activemq.command.ActiveMQDestination; 24 import org.apache.activemq.command.ConnectionInfo; 25 import org.apache.activemq.command.ConsumerInfo; 26 import org.apache.activemq.command.Message; 27 import org.apache.activemq.command.ProducerInfo; 28 import org.apache.activemq.command.SessionInfo; 29 import org.apache.activemq.network.DemandForwardingBridge; 30 31 import junit.framework.Test; 32 33 34 public class DemandForwardingBridgeTest extends NetworkTestSupport { 35 36 public ActiveMQDestination destination; 37 public byte destinationType; 38 public int deliveryMode; 39 private DemandForwardingBridge bridge; 40 41 public void initCombosForTestSendThenAddConsumer() { 42 addCombinationValues( "deliveryMode", new Object []{ 43 new Integer (DeliveryMode.NON_PERSISTENT), 44 new Integer (DeliveryMode.PERSISTENT)} ); 45 addCombinationValues( "destinationType", new Object []{ 46 new Byte (ActiveMQDestination.QUEUE_TYPE), 47 } ); 48 } 49 public void testSendThenAddConsumer() throws Exception { 50 51 StubConnection connection1 = createConnection(); 53 ConnectionInfo connectionInfo1 = createConnectionInfo(); 54 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 55 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 56 connection1.send(connectionInfo1); 57 connection1.send(sessionInfo1); 58 connection1.send(producerInfo); 59 60 destination = createDestinationInfo(connection1, connectionInfo1, destinationType); 61 62 StubConnection connection2 = createRemoteConnection(); 64 ConnectionInfo connectionInfo2 = createConnectionInfo(); 65 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 66 connection2.send(connectionInfo2); 67 connection2.send(sessionInfo2); 68 69 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 71 72 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 74 connection1.send(consumerInfo1); 75 Message m = receiveMessage(connection1); 76 assertNotNull(m); 77 connection1.send(consumerInfo1.createRemoveCommand()); 79 80 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 82 connection2.send(consumerInfo2); 83 84 m = receiveMessage(connection2); 86 assertNotNull(m); 87 } 88 89 public void initCombosForTestAddConsumerThenSend() { 90 addCombinationValues( "deliveryMode", new Object []{ 91 new Integer (DeliveryMode.NON_PERSISTENT), 92 new Integer (DeliveryMode.PERSISTENT)} ); 93 addCombinationValues( "destinationType", new Object []{ 94 new Byte (ActiveMQDestination.QUEUE_TYPE), 95 new Byte (ActiveMQDestination.TOPIC_TYPE), 96 } ); 97 } 98 public void testAddConsumerThenSend() throws Exception { 99 100 StubConnection connection1 = createConnection(); 102 ConnectionInfo connectionInfo1 = createConnectionInfo(); 103 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 104 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 105 connection1.send(connectionInfo1); 106 connection1.send(sessionInfo1); 107 connection1.send(producerInfo); 108 109 destination = createDestinationInfo(connection1, connectionInfo1, destinationType); 110 111 StubConnection connection2 = createRemoteConnection(); 113 ConnectionInfo connectionInfo2 = createConnectionInfo(); 114 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 115 connection2.send(connectionInfo2); 116 connection2.send(sessionInfo2); 117 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); 118 connection2.send(consumerInfo); 119 120 connection1.request(createMessage(producerInfo, destination, deliveryMode)); 122 Message m = receiveMessage(connection2); 124 } 125 126 protected void setUp() throws Exception { 127 super.setUp(); 128 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 129 config.setBrokerName("local"); 130 config.setDispatchAsync(false); 131 bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport()); 132 bridge.start(); 133 134 try { 136 Thread.sleep(1000); 137 } catch (InterruptedException ie) { 138 ie.printStackTrace(); 139 } 140 } 141 142 protected void tearDown() throws Exception { 143 bridge.stop(); 144 super.tearDown(); 145 } 146 147 public static Test suite() { 148 return suite(DemandForwardingBridgeTest.class); 149 } 150 151 public static void main(String [] args) { 152 junit.textui.TestRunner.run(suite()); 153 } 154 155 } 156 | Popular Tags |