1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.command.ActiveMQTopic; 22 import org.apache.activemq.test.JmsSendReceiveTestSupport; 23 24 import javax.jms.Connection ; 25 import javax.jms.Destination ; 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageListener ; 30 import javax.jms.Session ; 31 import java.util.List ; 32 33 36 public class CompositePublishTest extends JmsSendReceiveTestSupport { 37 38 protected Connection sendConnection; 39 protected Connection receiveConnection; 40 protected Session receiveSession; 41 protected MessageConsumer [] consumers; 42 protected List [] messageLists; 43 44 protected void setUp() throws Exception { 45 super.setUp(); 46 47 connectionFactory = createConnectionFactory(); 48 49 sendConnection = createConnection(); 50 sendConnection.start(); 51 52 receiveConnection = createConnection(); 53 receiveConnection.start(); 54 55 log.info("Created sendConnection: " + sendConnection); 56 log.info("Created receiveConnection: " + receiveConnection); 57 58 session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 59 receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 60 61 log.info("Created sendSession: " + session); 62 log.info("Created receiveSession: " + receiveSession); 63 64 producer = session.createProducer(null); 65 66 log.info("Created producer: " + producer); 67 68 if (topic) { 69 consumerDestination = session.createTopic(getConsumerSubject()); 70 producerDestination = session.createTopic(getProducerSubject()); 71 } 72 else { 73 consumerDestination = session.createQueue(getConsumerSubject()); 74 producerDestination = session.createQueue(getProducerSubject()); 75 } 76 77 log.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); 78 log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); 79 80 Destination [] destinations = getDestinations(); 81 consumers = new MessageConsumer [destinations.length]; 82 messageLists = new List [destinations.length]; 83 for (int i = 0; i < destinations.length; i++) { 84 Destination dest = destinations[i]; 85 messageLists[i] = createConcurrentList(); 86 consumers[i] = receiveSession.createConsumer(dest); 87 consumers[i].setMessageListener(createMessageListener(i, messageLists[i])); 88 } 89 90 91 log.info("Started connections"); 92 } 93 94 protected MessageListener createMessageListener(int i, final List messageList) { 95 return new MessageListener () { 96 public void onMessage(Message message) { 97 consumeMessage(message, messageList); 98 } 99 }; 100 } 101 102 105 protected String getSubject() { 106 return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y"; 107 } 108 109 112 protected Destination [] getDestinations() { 113 return new Destination []{new ActiveMQTopic(getPrefix() + "FOO.BAR"), new ActiveMQTopic(getPrefix() + "FOO.*"), new ActiveMQTopic(getPrefix() + "FOO.X.Y")}; 114 } 115 116 protected String getPrefix() { 117 return super.getSubject() + "."; 118 } 119 120 protected void assertMessagesAreReceived() throws JMSException { 121 waitForMessagesToBeDelivered(); 122 123 for (int i = 0, size = messageLists.length; i < size; i++) { 124 log.info("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)"); 125 } 126 127 for (int i = 0, size = messageLists.length; i < size; i++) { 128 assertMessagesReceivedAreValid(messageLists[i]); 129 } 130 } 131 132 protected ActiveMQConnectionFactory createConnectionFactory() { 133 return new ActiveMQConnectionFactory("vm://localhost"); 134 } 135 136 protected void tearDown() throws Exception { 137 session.close(); 138 receiveSession.close(); 139 140 sendConnection.close(); 141 receiveConnection.close(); 142 } 143 } 144 | Popular Tags |