1 18 19 package org.apache.activemq.transport.peer; 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.advisory.AdvisorySupport; 22 import org.apache.activemq.command.ActiveMQDestination; 23 import org.apache.activemq.command.ActiveMQMessage; 24 import org.apache.activemq.command.ActiveMQQueue; 25 import org.apache.activemq.command.ActiveMQTextMessage; 26 import org.apache.activemq.command.ActiveMQTopic; 27 import org.apache.activemq.command.ConsumerInfo; 28 import org.apache.activemq.util.MessageIdList; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 import javax.jms.Connection ; 33 import javax.jms.DeliveryMode ; 34 import javax.jms.Destination ; 35 import javax.jms.JMSException ; 36 import javax.jms.Message ; 37 import javax.jms.MessageConsumer ; 38 import javax.jms.MessageProducer ; 39 import javax.jms.Session ; 40 import javax.jms.TextMessage ; 41 42 import junit.framework.TestCase; 43 44 47 public class PeerTransportTest extends TestCase { 48 protected Log log = LogFactory.getLog(getClass()); 49 protected ActiveMQDestination destination; 50 protected boolean topic = true; 51 protected static int MESSAGE_COUNT = 50; 52 protected static int NUMBER_IN_CLUSTER = 3; 53 protected int deliveryMode = DeliveryMode.NON_PERSISTENT; 54 protected MessageProducer [] producers; 55 protected Connection [] connections; 56 protected MessageIdList messageIdList[]; 57 58 protected void setUp() throws Exception { 59 60 connections = new Connection [NUMBER_IN_CLUSTER]; 61 producers = new MessageProducer [NUMBER_IN_CLUSTER]; 62 messageIdList = new MessageIdList[NUMBER_IN_CLUSTER]; 63 ActiveMQDestination destination = createDestination(); 64 65 String root = System.getProperty("activemq.store.dir"); 66 67 68 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) { 69 connections[i] = createConnection(i); 70 connections[i].setClientID("ClusterTest" + i); 71 connections[i].start(); 72 73 Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); 74 producers[i] = session.createProducer(destination); 75 producers[i].setDeliveryMode(deliveryMode); 76 MessageConsumer consumer = createMessageConsumer(session, destination); 77 messageIdList[i] = new MessageIdList(); 78 consumer.setMessageListener(messageIdList[i]); 79 } 80 81 log.info("Waiting for cluster to be fully connected"); 82 83 ActiveMQDestination advisoryDest = AdvisorySupport.getConsumerAdvisoryTopic(destination); 85 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) { 86 Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); 87 MessageConsumer consumer = createMessageConsumer(session, advisoryDest); 88 89 int j=0; 90 while(j < NUMBER_IN_CLUSTER) { 91 ActiveMQMessage message = (ActiveMQMessage) consumer.receive(1000); 92 if( message == null ) { 93 fail("Connection "+i+" saw "+j+" consumers, expected: "+NUMBER_IN_CLUSTER); 94 } 95 if( message.getDataStructure()!=null && message.getDataStructure().getDataStructureType()==ConsumerInfo.DATA_STRUCTURE_TYPE ) { 96 j++; 97 } 98 } 99 100 session.close(); 101 } 102 103 log.info("Cluster is online."); 104 } 105 106 protected void tearDown() throws Exception { 107 if (connections != null) { 108 for (int i = 0;i < connections.length;i++) { 109 connections[i].close(); 110 } 111 } 112 } 113 114 protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException { 115 return session.createConsumer(destination); 116 } 117 118 protected Connection createConnection(int i) throws JMSException { 119 log.info("creating connection ...."); 120 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName()+"/node"+i); 121 return fac.createConnection(); 122 } 123 124 protected ActiveMQDestination createDestination() { 125 return createDestination(getClass().getName()); 126 } 127 128 protected ActiveMQDestination createDestination(String name) { 129 if (topic) { 130 return new ActiveMQTopic(name); 131 } 132 else { 133 return new ActiveMQQueue(name); 134 } 135 } 136 137 138 141 public void testSendReceive() throws Exception { 142 for (int i = 0;i < MESSAGE_COUNT;i++) { 143 for (int x = 0;x < producers.length;x++) { 144 TextMessage textMessage = new ActiveMQTextMessage(); 145 textMessage.setText("MSG-NO: " + i + " in cluster: " + x); 146 producers[x].send(textMessage); 147 } 148 } 149 150 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) { 151 messageIdList[i].assertMessagesReceived(expectedReceiveCount()); 152 } 153 } 154 155 protected int expectedReceiveCount() { 156 return MESSAGE_COUNT * NUMBER_IN_CLUSTER; 157 } 158 159 } 160 | Popular Tags |