1 17 package org.apache.activemq.broker.virtual; 18 19 import org.apache.activemq.EmbeddedBrokerTestSupport; 20 import org.apache.activemq.broker.BrokerService; 21 import org.apache.activemq.command.ActiveMQQueue; 22 import org.apache.activemq.command.ActiveMQTopic; 23 import org.apache.activemq.spring.ConsumerBean; 24 import org.apache.activemq.xbean.XBeanBrokerFactory; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 28 import javax.jms.Connection ; 29 import javax.jms.Destination ; 30 import javax.jms.MessageConsumer ; 31 import javax.jms.MessageProducer ; 32 import javax.jms.Session ; 33 import javax.jms.TextMessage ; 34 import javax.jms.JMSException ; 35 36 import java.net.URI ; 37 38 42 public class CompositeQueueTest extends EmbeddedBrokerTestSupport { 43 44 private static final Log log = LogFactory.getLog(CompositeQueueTest.class); 45 46 private Connection connection; 47 48 protected int total = 10; 49 50 51 public void testVirtualTopicCreation() throws Exception { 52 if (connection == null) { 53 connection = createConnection(); 54 } 55 connection.start(); 56 57 ConsumerBean messageList1 = new ConsumerBean(); 58 ConsumerBean messageList2 = new ConsumerBean(); 59 messageList1.setVerbose(true); 60 messageList2.setVerbose(true); 61 62 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 63 64 Destination producerDestination = getProducerDestination(); 65 Destination destination1 = getConsumer1Dsetination(); 66 Destination destination2 = getConsumer2Dsetination(); 67 68 log.info("Sending to: " + producerDestination); 69 log.info("Consuming from: " + destination1 + " and " + destination2); 70 71 MessageConsumer c1 = session.createConsumer(destination1); 72 MessageConsumer c2 = session.createConsumer(destination2); 73 74 c1.setMessageListener(messageList1); 75 c2.setMessageListener(messageList2); 76 77 MessageProducer producer = session.createProducer(producerDestination); 79 assertNotNull(producer); 80 81 for (int i = 0; i < total; i++) { 82 producer.send(createMessage(session, i)); 83 } 84 85 assertMessagesArrived(messageList1, messageList2); 86 } 87 88 protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { 89 messageList1.assertMessagesArrived(total); 90 messageList2.assertMessagesArrived(total); 91 } 92 93 protected TextMessage createMessage(Session session, int i) throws JMSException { 94 TextMessage textMessage = session.createTextMessage("message: " + i); 95 if (i % 2 == 1) { 96 textMessage.setStringProperty("odd", "yes"); 97 } 98 textMessage.setIntProperty("i", i); 99 return textMessage; 100 } 101 102 protected Destination getConsumer1Dsetination() { 103 return new ActiveMQQueue("FOO"); 104 } 105 106 protected Destination getConsumer2Dsetination() { 107 return new ActiveMQTopic("BAR"); 108 } 109 110 protected Destination getProducerDestination() { 111 return new ActiveMQQueue("MY.QUEUE"); 112 } 113 114 protected void tearDown() throws Exception { 115 if (connection != null) { 116 connection.close(); 117 } 118 super.tearDown(); 119 } 120 121 protected BrokerService createBroker() throws Exception { 122 XBeanBrokerFactory factory = new XBeanBrokerFactory(); 123 BrokerService answer = factory.createBroker(new URI (getBrokerConfigUri())); 124 return answer; 125 } 126 127 protected String getBrokerConfigUri() { 128 return "org/apache/activemq/broker/virtual/composite-queue.xml"; 129 } 130 } 131 | Popular Tags |