1 18 package org.apache.activemq.broker; 19 20 import javax.jms.DeliveryMode ; 21 import junit.framework.Test; 22 import org.apache.activemq.broker.StubConnection; 23 import org.apache.activemq.command.ActiveMQDestination; 24 import org.apache.activemq.command.ActiveMQQueue; 25 import org.apache.activemq.command.ConnectionInfo; 26 import org.apache.activemq.command.ConsumerInfo; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.command.MessageAck; 29 import org.apache.activemq.command.ProducerInfo; 30 import org.apache.activemq.command.SessionInfo; 31 import org.apache.activemq.network.NetworkTestSupport; 32 33 38 39 public class DoubleSubscriptionTest extends NetworkTestSupport { 40 41 public ActiveMQDestination destination; 42 public int deliveryMode; 43 44 private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 45 46 public static Test suite() { 47 return suite(DoubleSubscriptionTest.class); 48 } 49 50 public static void main(String [] args) { 51 junit.textui.TestRunner.run(suite()); 52 } 53 54 public void initCombosForTestDoubleSubscription() { 55 addCombinationValues("destination", new Object [] { new ActiveMQQueue("TEST"), new ActiveMQQueue("TEST"), }); 56 } 57 public void testDoubleSubscription() throws Exception { 58 59 StubConnection connection1 = createRemoteConnection(); 61 ConnectionInfo connectionInfo1 = createConnectionInfo(); 62 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 63 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 64 connection1.send(connectionInfo1); 65 connection1.send(sessionInfo1); 66 connection1.request(consumerInfo1); 67 68 StubConnection connection2 = createRemoteConnection(); 70 ConnectionInfo connectionInfo2 = createConnectionInfo(); 71 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 72 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 73 connection2.send(connectionInfo2); 74 connection2.send(sessionInfo2); 75 connection2.request(producerInfo2); 76 77 connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); 79 80 Message m1 = receiveMessage(connection1); 81 assertNotNull(m1); 82 assertNoMessagesLeft(connection1); 83 84 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 85 86 connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); 88 89 connection1.send(connectionInfo1); 92 connection1.send(sessionInfo1); 93 connection1.request(consumerInfo1); 94 95 connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); 97 98 connection1.stop(); 100 StubConnection connection3 = createRemoteConnection(); 101 ConnectionInfo connectionInfo3 = createConnectionInfo(); 102 SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); 103 ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, destination); 104 connection3.send(connectionInfo3); 105 connection3.send(sessionInfo3); 106 connection3.request(consumerInfo3); 107 108 assertNotNull(receiveMessage(connection3)); 110 assertNotNull(receiveMessage(connection3)); 111 assertNoMessagesLeft(connection3); 112 } 113 114 protected String getRemoteURI() { 115 return remoteURI; 116 } 117 118 } 119 | Popular Tags |