1 18 package org.apache.activemq; 19 20 import javax.jms.*; 21 22 import org.apache.activemq.network.DiscoveryNetworkConnector; 23 import org.apache.activemq.network.NetworkConnector; 24 import org.apache.activemq.util.MessageIdList; 25 import org.apache.activemq.util.IdGenerator; 26 import org.apache.activemq.command.ActiveMQDestination; 27 import org.apache.activemq.command.ActiveMQTopic; 28 import org.apache.activemq.command.ActiveMQQueue; 29 import org.apache.activemq.xbean.BrokerFactoryBean; 30 import org.apache.activemq.broker.BrokerService; 31 import org.apache.activemq.broker.BrokerFactory; 32 import org.apache.activemq.broker.TransportConnector; 33 import org.apache.activemq.CombinationTestSupport; 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.activemq.ConnectionClosedException; 36 import org.springframework.core.io.Resource; 37 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.HashMap ; 41 import java.util.ArrayList ; 42 import java.util.Iterator ; 43 import java.util.Collections ; 44 import java.util.Arrays ; 45 import java.util.Collection ; 46 import java.util.concurrent.CountDownLatch ; 47 import java.net.URI ; 48 49 54 public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { 55 public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0"; 56 public static int MAX_SETUP_TIME = 5000; 57 58 protected Map brokers; 59 protected Map destinations; 60 61 protected int messageSize = 1; 62 63 protected boolean persistentDelivery = true; 64 protected boolean verbose = false; 65 66 protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { 67 return bridgeBrokers(localBrokerName,remoteBrokerName,false,1); 68 } 69 70 71 protected void bridgeBrokers(String localBrokerName, String remoteBrokerName,boolean dynamicOnly) throws Exception { 72 BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker; 73 BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker; 74 75 bridgeBrokers(localBroker, remoteBroker,dynamicOnly,1); 76 } 77 78 protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName,boolean dynamicOnly, int networkTTL) throws Exception { 79 BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker; 80 BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker; 81 82 return bridgeBrokers(localBroker, remoteBroker,dynamicOnly,networkTTL); 83 } 84 85 86 87 protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,boolean dynamicOnly, int networkTTL) throws Exception { 90 List transportConnectors = remoteBroker.getTransportConnectors(); 91 URI remoteURI; 92 if (!transportConnectors.isEmpty()) { 93 remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri(); 94 NetworkConnector connector=new DiscoveryNetworkConnector(new URI ("static:" + remoteURI)); 95 connector.setDynamicOnly(dynamicOnly); 96 connector.setNetworkTTL(networkTTL); 97 localBroker.addNetworkConnector(connector); 98 MAX_SETUP_TIME = 2000; 99 return connector; 100 } else { 101 throw new Exception ("Remote broker has no registered connectors."); 102 } 103 104 } 105 106 protected void bridgeAllBrokers() throws Exception { 108 bridgeAllBrokers("default"); 109 } 110 111 protected void bridgeAllBrokers(String groupName) throws Exception { 112 Collection brokerList = brokers.values(); 113 for (Iterator i=brokerList.iterator(); i.hasNext();) { 114 BrokerService broker = ((BrokerItem)i.next()).broker; 115 List transportConnectors = broker.getTransportConnectors(); 116 117 if (transportConnectors.isEmpty()) { 118 broker.addConnector(new URI (AUTO_ASSIGN_TRANSPORT)); 119 transportConnectors = broker.getTransportConnectors(); 120 } 121 122 TransportConnector transport = (TransportConnector)transportConnectors.get(0); 123 transport.setDiscoveryUri(new URI ("multicast://" + groupName)); 124 broker.addNetworkConnector("multicast://" + groupName); 125 } 126 127 MAX_SETUP_TIME = 8000; 129 } 130 131 protected void startAllBrokers() throws Exception { 132 Collection brokerList = brokers.values(); 133 for (Iterator i=brokerList.iterator(); i.hasNext();) { 134 BrokerService broker = ((BrokerItem)i.next()).broker; 135 broker.start(); 136 } 137 138 Thread.sleep(MAX_SETUP_TIME); 139 } 140 141 protected BrokerService createBroker(String brokerName) throws Exception { 142 BrokerService broker = new BrokerService(); 143 broker.setBrokerName(brokerName); 144 brokers.put(brokerName, new BrokerItem(broker)); 145 146 return broker; 147 } 148 149 protected BrokerService createBroker(URI brokerUri) throws Exception { 150 BrokerService broker = BrokerFactory.createBroker(brokerUri); 151 brokers.put(broker.getBrokerName(), new BrokerItem(broker)); 152 153 return broker; 154 } 155 156 protected BrokerService createBroker(Resource configFile) throws Exception { 157 BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile); 158 brokerFactory.afterPropertiesSet(); 159 160 BrokerService broker = brokerFactory.getBroker(); 161 brokers.put(broker.getBrokerName(), new BrokerItem(broker)); 162 163 return broker; 164 } 165 166 protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception { 167 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 168 if (brokerItem != null) { 169 return brokerItem.factory; 170 } 171 return null; 172 } 173 174 protected Connection createConnection(String brokerName) throws Exception { 175 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 176 if (brokerItem != null) { 177 return brokerItem.createConnection(); 178 } 179 return null; 180 } 181 182 protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception { 183 return createConsumer(brokerName, dest, null); 184 } 185 186 protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception { 187 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 188 if (brokerItem != null) { 189 return brokerItem.createConsumer(dest, latch); 190 } 191 return null; 192 } 193 194 protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception { 195 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 196 if (brokerItem != null) { 197 return brokerItem.createDurableSubscriber(dest, name); 198 } 199 return null; 200 } 201 202 protected MessageIdList getBrokerMessages(String brokerName) { 203 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 204 if (brokerItem != null) { 205 return brokerItem.getAllMessages(); 206 } 207 return null; 208 } 209 210 protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) { 211 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 212 if (brokerItem != null) { 213 return brokerItem.getConsumerMessages(consumer); 214 } 215 return null; 216 } 217 218 protected void sendMessages(String brokerName, Destination destination, int count) throws Exception { 219 BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); 220 221 Connection conn = brokerItem.createConnection(); 222 conn.start(); 223 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 224 225 MessageProducer producer = brokerItem.createProducer(destination, sess); 226 producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 227 228 for (int i = 0; i < count; i++) { 229 TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i); 230 producer.send(msg); 231 } 232 233 producer.close(); 234 sess.close(); 235 conn.close(); 236 brokerItem.connections.remove(conn); 237 } 238 239 protected TextMessage createTextMessage(Session session, String initText) throws Exception { 240 TextMessage msg = session.createTextMessage(); 241 242 if (initText.length() < messageSize) { 244 char[] data = new char[messageSize - initText.length()]; 245 Arrays.fill(data, '*'); 246 String str = new String (data); 247 msg.setText(initText + str); 248 249 } else { 251 msg.setText(initText); 252 } 253 254 return msg; 255 } 256 257 protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException { 258 Destination dest; 259 if (topic) { 260 dest = new ActiveMQTopic(name); 261 destinations.put(name, dest); 262 return (ActiveMQDestination)dest; 263 } else { 264 dest = new ActiveMQQueue(name); 265 destinations.put(name, dest); 266 return (ActiveMQDestination)dest; 267 } 268 } 269 270 protected void setUp() throws Exception { 271 super.setUp(); 272 brokers = new HashMap (); 273 destinations = new HashMap (); 274 } 275 276 protected void tearDown() throws Exception { 277 destroyAllBrokers(); 278 super.tearDown(); 279 } 280 281 protected void destroyBroker(String brokerName) throws Exception { 282 BrokerItem brokerItem = (BrokerItem)brokers.remove(brokerName); 283 284 if (brokerItem != null) { 285 brokerItem.destroy(); 286 } 287 } 288 289 protected void destroyAllBrokers() throws Exception { 290 for (Iterator i=brokers.values().iterator(); i.hasNext();) { 291 BrokerItem brokerItem = (BrokerItem)i.next(); 292 brokerItem.destroy(); 293 } 294 brokers.clear(); 295 } 296 297 298 public class BrokerItem { 300 public BrokerService broker; 301 public ActiveMQConnectionFactory factory; 302 public List connections; 303 public Map consumers; 304 public MessageIdList allMessages = new MessageIdList(); 305 306 private IdGenerator id; 307 308 public boolean persistent = false; 309 310 public BrokerItem(BrokerService broker) throws Exception { 311 this.broker = broker; 312 313 factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); 314 consumers = Collections.synchronizedMap(new HashMap ()); 315 connections = Collections.synchronizedList(new ArrayList ()); 316 allMessages.setVerbose(verbose); 317 id = new IdGenerator(broker.getBrokerName() + ":"); 318 } 319 320 public Connection createConnection() throws Exception { 321 Connection conn = factory.createConnection(); 322 conn.setClientID(id.generateId()); 323 324 connections.add(conn); 325 return conn; 326 } 327 328 public MessageConsumer createConsumer(Destination dest) throws Exception { 329 return createConsumer(dest, null); 330 } 331 332 public MessageConsumer createConsumer(Destination dest, CountDownLatch latch) throws Exception { 333 Connection c = createConnection(); 334 c.start(); 335 Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); 336 return createConsumerWithSession(dest, s, latch); 337 } 338 339 public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception { 340 return createConsumerWithSession(dest, sess, null); 341 } 342 public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch) throws Exception { 343 MessageConsumer client = sess.createConsumer(dest); 344 MessageIdList messageIdList = new MessageIdList(); 345 messageIdList.setCountDownLatch(latch); 346 messageIdList.setParent(allMessages); 347 client.setMessageListener(messageIdList); 348 consumers.put(client, messageIdList); 349 return client; 350 } 351 352 public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception { 353 Connection c = createConnection(); 354 c.start(); 355 Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); 356 return createDurableSubscriber(dest, s, name); 357 } 358 359 public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception { 360 MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name); 361 MessageIdList messageIdList = new MessageIdList(); 362 messageIdList.setParent(allMessages); 363 client.setMessageListener(messageIdList); 364 consumers.put(client, messageIdList); 365 366 return client; 367 } 368 369 public MessageIdList getAllMessages() { 370 return allMessages; 371 } 372 373 public MessageIdList getConsumerMessages(MessageConsumer consumer) { 374 return (MessageIdList)consumers.get(consumer); 375 } 376 377 public MessageProducer createProducer(Destination dest) throws Exception { 378 Connection c = createConnection(); 379 c.start(); 380 Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); 381 return createProducer(dest, s); 382 } 383 384 public MessageProducer createProducer(Destination dest, Session sess) throws Exception { 385 MessageProducer client = sess.createProducer(dest); 386 client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 387 return client; 388 } 389 390 public void destroy() throws Exception { 391 while (!connections.isEmpty()) { 392 Connection c = (Connection)connections.remove(0); 393 try { 394 c.close(); 395 } catch (ConnectionClosedException e) { 396 } 397 } 398 399 broker.stop(); 400 consumers.clear(); 401 402 broker = null; 403 connections = null; 404 consumers = null; 405 factory = null; 406 } 407 } 408 409 } 410 | Popular Tags |