1 18 19 package org.apache.activemq.transport; 20 import java.net.URI ; 21 import java.util.ArrayList ; 22 import java.util.List ; 23 24 import javax.jms.Connection ; 25 import javax.jms.DeliveryMode ; 26 import javax.jms.Destination ; 27 import javax.jms.JMSException ; 28 import javax.jms.Message ; 29 import javax.jms.MessageConsumer ; 30 import javax.jms.MessageListener ; 31 import javax.jms.MessageProducer ; 32 import javax.jms.Session ; 33 import javax.jms.TextMessage ; 34 35 import junit.framework.TestCase; 36 37 import org.apache.activemq.ActiveMQConnectionFactory; 38 import org.apache.activemq.broker.BrokerService; 39 import org.apache.activemq.broker.TransportConnector; 40 import org.apache.activemq.command.ActiveMQQueue; 41 import org.apache.activemq.command.ActiveMQTextMessage; 42 import org.apache.activemq.command.ActiveMQTopic; 43 import org.apache.activemq.util.ServiceStopper; 44 import org.apache.commons.logging.Log; 45 import org.apache.commons.logging.LogFactory; 46 47 import java.util.concurrent.atomic.AtomicInteger ; 48 49 52 public class TopicClusterTest extends TestCase implements MessageListener { 53 protected final static Log log = LogFactory.getLog(TopicClusterTest.class); 54 protected Destination destination; 55 protected boolean topic = true; 56 protected AtomicInteger receivedMessageCount = new AtomicInteger (0); 57 protected static int MESSAGE_COUNT = 50; 58 protected static int NUMBER_IN_CLUSTER = 3; 59 protected int deliveryMode = DeliveryMode.NON_PERSISTENT; 60 protected MessageProducer [] producers; 61 protected Connection [] connections; 62 protected List services = new ArrayList (); 63 64 protected void setUp() throws Exception { 65 connections = new Connection [NUMBER_IN_CLUSTER]; 66 producers = new MessageProducer [NUMBER_IN_CLUSTER]; 67 Destination destination = createDestination(); 68 int portStart = 50000; 69 String root = System.getProperty("activemq.store.dir"); 70 if (root == null) { 71 root = "target/store"; 72 } 73 try { 74 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) { 75 76 System.setProperty("activemq.store.dir", root + "_broker_" + i); 77 connections[i] = createConnection("broker-" + i); 78 connections[i].setClientID("ClusterTest" + i); 79 connections[i].start(); 80 Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); 81 producers[i] = session.createProducer(destination); 82 producers[i].setDeliveryMode(deliveryMode); 83 MessageConsumer consumer = createMessageConsumer(session,destination); 84 consumer.setMessageListener(this); 85 86 } 87 log.info("Sleeping to ensure cluster is fully connected"); 88 Thread.sleep(5000); 89 } finally { 90 System.setProperty("activemq.store.dir", root); 91 } 92 } 93 94 protected void tearDown() throws Exception { 95 if (connections != null) { 96 for (int i = 0;i < connections.length;i++) { 97 connections[i].close(); 98 } 99 } 100 ServiceStopper stopper = new ServiceStopper(); 101 stopper.stopServices(services); 102 } 103 104 protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException { 105 return session.createConsumer(destination); 106 } 107 108 protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws Exception { 109 BrokerService container = new BrokerService(); 110 container.setBrokerName(brokerName); 111 112 String url = "tcp://localhost:0"; 113 TransportConnector connector = container.addConnector(url); 114 connector.setDiscoveryUri(new URI ("multicast://default")); 115 container.addNetworkConnector("multicast://default"); 116 container.start(); 117 118 services.add(container); 119 120 return new ActiveMQConnectionFactory("vm://"+brokerName); 121 } 122 123 protected int expectedReceiveCount() { 124 return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER; 125 } 126 127 protected Connection createConnection(String name) throws Exception { 128 return createGenericClusterFactory(name).createConnection(); 129 } 130 131 protected Destination createDestination() { 132 return createDestination(getClass().getName()); 133 } 134 135 protected Destination createDestination(String name) { 136 if (topic) { 137 return new ActiveMQTopic(name); 138 } 139 else { 140 return new ActiveMQQueue(name); 141 } 142 } 143 144 145 148 public void onMessage(Message msg) { 149 receivedMessageCount.incrementAndGet(); 151 synchronized (receivedMessageCount) { 152 if (receivedMessageCount.get() >= expectedReceiveCount()) { 153 receivedMessageCount.notify(); 154 } 155 } 156 } 157 158 161 public void testSendReceive() throws Exception { 162 for (int i = 0;i < MESSAGE_COUNT;i++) { 163 TextMessage textMessage = new ActiveMQTextMessage(); 164 textMessage.setText("MSG-NO:" + i); 165 for (int x = 0;x < producers.length;x++) { 166 producers[x].send(textMessage); 167 } 168 } 169 synchronized (receivedMessageCount) { 170 if (receivedMessageCount.get() < expectedReceiveCount()) { 171 receivedMessageCount.wait(20000); 172 } 173 } 174 Thread.sleep(2000); 176 System.err.println("GOT: " + receivedMessageCount.get()); 177 assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get()); 178 } 179 180 } 181 | Popular Tags |