1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.CombinationTestSupport; 22 import org.apache.activemq.util.MessageIdList; 23 import org.apache.activemq.command.ActiveMQQueue; 24 import org.apache.activemq.broker.BrokerService; 25 import org.apache.activemq.xbean.XBeanBrokerFactory; 26 27 import java.net.URI ; 28 import java.util.Arrays ; 29 30 import junit.framework.Test; 31 32 import javax.jms.Destination ; 33 import javax.jms.ConnectionFactory ; 34 import javax.jms.Connection ; 35 import javax.jms.Session ; 36 import javax.jms.JMSException ; 37 import javax.jms.MessageConsumer ; 38 import javax.jms.MessageProducer ; 39 import javax.jms.TextMessage ; 40 41 public class TwoBrokerMulticastQueueTest extends CombinationTestSupport { 42 43 public static Test suite() { 44 return suite(TwoBrokerMulticastQueueTest.class); 45 } 46 47 public static void main(String [] args) { 48 junit.textui.TestRunner.run(suite()); 49 } 50 51 public static final int MESSAGE_COUNT = 100; 52 public static final int BROKER_COUNT = 2; 53 public static final int CONSUMER_COUNT = 20; 54 55 private BrokerService[] brokers; 56 public String sendUri, recvUri; 57 58 public void setUp() throws Exception { 59 super.setAutoFail(true); 60 super.setUp(); 61 } 62 63 public void tearDown() throws Exception { 64 if (brokers != null) { 65 for (int i=0; i<BROKER_COUNT; i++) { 66 if (brokers[i] != null) { 67 brokers[i].stop(); 68 } 69 } 70 super.tearDown(); 71 } 72 } 73 74 private void doSendReceiveTest() throws Exception { 75 Destination dest = new ActiveMQQueue("TEST.FOO"); 76 77 ConnectionFactory sendFactory = createConnectionFactory(sendUri); 78 79 Connection conn = createConnection(sendFactory); 80 sendMessages(conn, dest, MESSAGE_COUNT); 81 82 Thread.sleep(500); 83 84 ConnectionFactory recvFactory = createConnectionFactory(recvUri); 85 assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0)); 86 } 87 88 private void doMultipleConsumersConnectTest() throws Exception { 89 Destination dest = new ActiveMQQueue("TEST.FOO"); 90 91 ConnectionFactory sendFactory = createConnectionFactory(sendUri); 92 93 Connection conn = createConnection(sendFactory); 94 sendMessages(conn, dest, MESSAGE_COUNT); 95 96 Thread.sleep(500); 97 98 ConnectionFactory recvFactory = createConnectionFactory(recvUri); 99 assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0)); 100 101 for (int i=0; i<(CONSUMER_COUNT-1); i++) { 102 assertEquals(0, receiveMessages(createConnection(recvFactory), dest, 200)); 103 } 104 } 105 106 public void initCombosForTestSendReceive() { 107 addCombinationValues("sendUri", new Object [] { 108 "tcp://localhost:61616", "tcp://localhost:61617" 109 }); 110 addCombinationValues("recvUri", new Object [] { 111 "tcp://localhost:61616", "tcp://localhost:61617" 112 }); 113 } 114 115 public void testSendReceive() throws Exception { 116 createMulticastBrokerNetwork(); 117 doSendReceiveTest(); 118 } 119 120 public void initCombosForTestMultipleConsumersConnect() { 121 addCombinationValues("sendUri", new Object [] { 122 "tcp://localhost:61616", "tcp://localhost:61617", 123 }); 124 addCombinationValues("recvUri", new Object [] { 125 "tcp://localhost:61616", "tcp://localhost:61617" 126 }); 127 } 128 129 public void testMultipleConsumersConnect() throws Exception { 130 createMulticastBrokerNetwork(); 131 doMultipleConsumersConnectTest(); 132 } 133 134 public void testSendReceiveUsingFailover() throws Exception { 135 sendUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; 136 recvUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; 137 createMulticastBrokerNetwork(); 138 doSendReceiveTest(); 139 } 140 141 public void testMultipleConsumersConnectUsingFailover() throws Exception { 142 sendUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; 143 recvUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; 144 createMulticastBrokerNetwork(); 145 doMultipleConsumersConnectTest(); 146 } 147 148 public void testSendReceiveUsingDiscovery() throws Exception { 149 sendUri = "discovery:multicast://default"; 150 recvUri = "discovery:multicast://default"; 151 createMulticastBrokerNetwork(); 152 doSendReceiveTest(); 153 } 154 155 public void testMultipleConsumersConnectUsingDiscovery() throws Exception { 156 sendUri = "discovery:multicast://default"; 157 recvUri = "discovery:multicast://default"; 158 createMulticastBrokerNetwork(); 159 doMultipleConsumersConnectTest(); 160 } 161 162 public void testSendReceiveUsingAutoAssignFailover() throws Exception { 163 sendUri = "failover:(discovery:multicast://default)"; 164 recvUri = "failover:(discovery:multicast://default)"; 165 createAutoAssignMulticastBrokerNetwork(); 166 doSendReceiveTest(); 167 } 168 169 public void testMultipleConsumersConnectUsingAutoAssignFailover() throws Exception { 170 sendUri = "failover:(discovery:multicast://default)"; 171 recvUri = "failover:(discovery:multicast://default)"; 172 createAutoAssignMulticastBrokerNetwork(); 173 doMultipleConsumersConnectTest(); 174 } 175 176 public void testSendReceiveUsingAutoAssignDiscovery() throws Exception { 177 sendUri = "discovery:multicast://default"; 178 recvUri = "discovery:multicast://default"; 179 createAutoAssignMulticastBrokerNetwork(); 180 doSendReceiveTest(); 181 } 182 183 public void testMultipleConsumersConnectUsingAutoAssignDiscovery() throws Exception { 184 sendUri = "discovery:multicast://default"; 185 recvUri = "discovery:multicast://default"; 186 createAutoAssignMulticastBrokerNetwork(); 187 doMultipleConsumersConnectTest(); 188 } 189 190 protected void createMulticastBrokerNetwork() throws Exception { 191 192 brokers = new BrokerService[BROKER_COUNT]; 193 for (int i=0; i<BROKER_COUNT; i++) { 194 brokers[i] = createBroker("org/apache/activemq/usecases/multicast-broker-" + (i+1)+ ".xml"); 195 brokers[i].start(); 196 } 197 198 Thread.sleep(1000); 200 } 201 202 protected void createAutoAssignMulticastBrokerNetwork() throws Exception { 203 brokers = new BrokerService[BROKER_COUNT]; 204 for (int i=0; i<BROKER_COUNT; i++) { 205 brokers[i] = createBroker("org/apache/activemq/usecases/multicast-broker-auto.xml"); 206 brokers[i].start(); 207 } 208 209 Thread.sleep(1000); 211 } 212 213 protected BrokerService createBroker(String uri) throws Exception { 214 return (new XBeanBrokerFactory()).createBroker(new URI (uri)); 215 } 216 217 protected ConnectionFactory createConnectionFactory(String uri) { 218 return new ActiveMQConnectionFactory(uri); 219 } 220 221 protected Connection createConnection(ConnectionFactory factory) throws JMSException { 222 Connection conn = factory.createConnection(); 223 return conn; 224 } 225 226 protected int receiveMessages(Connection conn, Destination dest, int waitTime) throws JMSException , InterruptedException { 227 conn.start(); 228 MessageIdList list = new MessageIdList(); 229 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 230 MessageConsumer consumer = sess.createConsumer(dest); 231 consumer.setMessageListener(list); 232 233 if (waitTime > 0) { 234 Thread.sleep(waitTime); 235 } else { 236 list.waitForMessagesToArrive(MESSAGE_COUNT); 237 } 238 239 conn.close(); 240 241 return list.getMessageCount(); 242 } 243 244 protected void sendMessages(Connection conn, Destination dest, int count) throws JMSException { 245 conn.start(); 246 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 247 MessageProducer prod = sess.createProducer(dest); 248 249 for (int i=0; i<count; i++) { 250 prod.send(createTextMessage(sess, "Message " + i, 1024)); 251 } 252 253 conn.close(); 254 } 255 256 protected TextMessage createTextMessage(Session session, String initText, int messageSize) throws JMSException { 257 TextMessage msg = session.createTextMessage(); 258 259 if (initText.length() < messageSize) { 261 char[] data = new char[messageSize - initText.length()]; 262 Arrays.fill(data, '*'); 263 String str = new String (data); 264 msg.setText(initText + str); 265 266 } else { 268 msg.setText(initText); 269 } 270 271 return msg; 272 } 273 274 } 275 | Popular Tags |