KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > peer > PeerTransportTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.transport.peer;
20 import org.apache.activemq.ActiveMQConnectionFactory;
21 import org.apache.activemq.advisory.AdvisorySupport;
22 import org.apache.activemq.command.ActiveMQDestination;
23 import org.apache.activemq.command.ActiveMQMessage;
24 import org.apache.activemq.command.ActiveMQQueue;
25 import org.apache.activemq.command.ActiveMQTextMessage;
26 import org.apache.activemq.command.ActiveMQTopic;
27 import org.apache.activemq.command.ConsumerInfo;
28 import org.apache.activemq.util.MessageIdList;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31
32 import javax.jms.Connection JavaDoc;
33 import javax.jms.DeliveryMode JavaDoc;
34 import javax.jms.Destination JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.Message JavaDoc;
37 import javax.jms.MessageConsumer JavaDoc;
38 import javax.jms.MessageProducer JavaDoc;
39 import javax.jms.Session JavaDoc;
40 import javax.jms.TextMessage JavaDoc;
41
42 import junit.framework.TestCase;
43
44 /**
45  * @version $Revision: 1.1.1.1 $
46  */

47 public class PeerTransportTest extends TestCase {
48     protected Log log = LogFactory.getLog(getClass());
49     protected ActiveMQDestination destination;
50     protected boolean topic = true;
51     protected static int MESSAGE_COUNT = 50;
52     protected static int NUMBER_IN_CLUSTER = 3;
53     protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
54     protected MessageProducer JavaDoc[] producers;
55     protected Connection JavaDoc[] connections;
56     protected MessageIdList messageIdList[];
57
58     protected void setUp() throws Exception JavaDoc {
59         
60         connections = new Connection JavaDoc[NUMBER_IN_CLUSTER];
61         producers = new MessageProducer JavaDoc[NUMBER_IN_CLUSTER];
62         messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
63         ActiveMQDestination destination = createDestination();
64
65         String JavaDoc root = System.getProperty("activemq.store.dir");
66
67         
68         for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
69             connections[i] = createConnection(i);
70             connections[i].setClientID("ClusterTest" + i);
71             connections[i].start();
72
73             Session JavaDoc session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
74             producers[i] = session.createProducer(destination);
75             producers[i].setDeliveryMode(deliveryMode);
76             MessageConsumer JavaDoc consumer = createMessageConsumer(session, destination);
77             messageIdList[i] = new MessageIdList();
78             consumer.setMessageListener(messageIdList[i]);
79         }
80         
81         log.info("Waiting for cluster to be fully connected");
82         
83         // Each connection should see that NUMBER_IN_CLUSTER consumers get registered on the destination.
84
ActiveMQDestination advisoryDest = AdvisorySupport.getConsumerAdvisoryTopic(destination);
85         for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
86             Session JavaDoc session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
87             MessageConsumer JavaDoc consumer = createMessageConsumer(session, advisoryDest);
88             
89             int j=0;
90             while(j < NUMBER_IN_CLUSTER) {
91                 ActiveMQMessage message = (ActiveMQMessage) consumer.receive(1000);
92                 if( message == null ) {
93                     fail("Connection "+i+" saw "+j+" consumers, expected: "+NUMBER_IN_CLUSTER);
94                 }
95                 if( message.getDataStructure()!=null && message.getDataStructure().getDataStructureType()==ConsumerInfo.DATA_STRUCTURE_TYPE ) {
96                     j++;
97                 }
98             }
99             
100             session.close();
101         }
102         
103         log.info("Cluster is online.");
104     }
105
106     protected void tearDown() throws Exception JavaDoc {
107         if (connections != null) {
108             for (int i = 0;i < connections.length;i++) {
109                 connections[i].close();
110             }
111         }
112     }
113
114     protected MessageConsumer JavaDoc createMessageConsumer(Session JavaDoc session, Destination destination) throws JMSException JavaDoc {
115         return session.createConsumer(destination);
116     }
117
118     protected Connection JavaDoc createConnection(int i) throws JMSException JavaDoc {
119         log.info("creating connection ....");
120         ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName()+"/node"+i);
121         return fac.createConnection();
122     }
123
124     protected ActiveMQDestination createDestination() {
125         return createDestination(getClass().getName());
126     }
127
128     protected ActiveMQDestination createDestination(String JavaDoc name) {
129         if (topic) {
130             return new ActiveMQTopic(name);
131         }
132         else {
133             return new ActiveMQQueue(name);
134         }
135     }
136
137
138     /**
139      * @throws Exception
140      */

141     public void testSendReceive() throws Exception JavaDoc {
142         for (int i = 0;i < MESSAGE_COUNT;i++) {
143             for (int x = 0;x < producers.length;x++) {
144                 TextMessage textMessage = new ActiveMQTextMessage();
145                 textMessage.setText("MSG-NO: " + i + " in cluster: " + x);
146                 producers[x].send(textMessage);
147             }
148         }
149         
150         for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
151             messageIdList[i].assertMessagesReceived(expectedReceiveCount());
152         }
153     }
154     
155     protected int expectedReceiveCount() {
156         return MESSAGE_COUNT * NUMBER_IN_CLUSTER;
157     }
158
159 }
160
Popular Tags