KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > TopicClusterTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.transport;
20 import java.net.URI JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.List JavaDoc;
23
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.DeliveryMode JavaDoc;
26 import javax.jms.Destination JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.Message JavaDoc;
29 import javax.jms.MessageConsumer JavaDoc;
30 import javax.jms.MessageListener JavaDoc;
31 import javax.jms.MessageProducer JavaDoc;
32 import javax.jms.Session JavaDoc;
33 import javax.jms.TextMessage JavaDoc;
34
35 import junit.framework.TestCase;
36
37 import org.apache.activemq.ActiveMQConnectionFactory;
38 import org.apache.activemq.broker.BrokerService;
39 import org.apache.activemq.broker.TransportConnector;
40 import org.apache.activemq.command.ActiveMQQueue;
41 import org.apache.activemq.command.ActiveMQTextMessage;
42 import org.apache.activemq.command.ActiveMQTopic;
43 import org.apache.activemq.util.ServiceStopper;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46
47 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
48
49 /**
50  * @version $Revision: 475999 $
51  */

52 public class TopicClusterTest extends TestCase implements MessageListener JavaDoc {
53     protected final static Log log = LogFactory.getLog(TopicClusterTest.class);
54     protected Destination JavaDoc destination;
55     protected boolean topic = true;
56     protected AtomicInteger JavaDoc receivedMessageCount = new AtomicInteger JavaDoc(0);
57     protected static int MESSAGE_COUNT = 50;
58     protected static int NUMBER_IN_CLUSTER = 3;
59     protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
60     protected MessageProducer JavaDoc[] producers;
61     protected Connection JavaDoc[] connections;
62     protected List JavaDoc services = new ArrayList JavaDoc();
63
64     protected void setUp() throws Exception JavaDoc {
65         connections = new Connection JavaDoc[NUMBER_IN_CLUSTER];
66         producers = new MessageProducer JavaDoc[NUMBER_IN_CLUSTER];
67         Destination JavaDoc destination = createDestination();
68         int portStart = 50000;
69         String JavaDoc root = System.getProperty("activemq.store.dir");
70         if (root == null) {
71             root = "target/store";
72         }
73         try {
74             for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
75
76                 System.setProperty("activemq.store.dir", root + "_broker_" + i);
77                 connections[i] = createConnection("broker-" + i);
78                 connections[i].setClientID("ClusterTest" + i);
79                 connections[i].start();
80                 Session JavaDoc session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
81                 producers[i] = session.createProducer(destination);
82                 producers[i].setDeliveryMode(deliveryMode);
83                 MessageConsumer JavaDoc consumer = createMessageConsumer(session,destination);
84                 consumer.setMessageListener(this);
85
86             }
87             log.info("Sleeping to ensure cluster is fully connected");
88             Thread.sleep(5000);
89         } finally {
90             System.setProperty("activemq.store.dir", root);
91         }
92     }
93
94     protected void tearDown() throws Exception JavaDoc {
95         if (connections != null) {
96             for (int i = 0;i < connections.length;i++) {
97                 connections[i].close();
98             }
99         }
100         ServiceStopper stopper = new ServiceStopper();
101         stopper.stopServices(services);
102     }
103     
104     protected MessageConsumer JavaDoc createMessageConsumer(Session JavaDoc session, Destination JavaDoc destination) throws JMSException JavaDoc{
105         return session.createConsumer(destination);
106     }
107
108     protected ActiveMQConnectionFactory createGenericClusterFactory(String JavaDoc brokerName) throws Exception JavaDoc {
109         BrokerService container = new BrokerService();
110         container.setBrokerName(brokerName);
111       
112         String JavaDoc url = "tcp://localhost:0";
113         TransportConnector connector = container.addConnector(url);
114         connector.setDiscoveryUri(new URI JavaDoc("multicast://default"));
115         container.addNetworkConnector("multicast://default");
116         container.start();
117         
118         services.add(container);
119         
120         return new ActiveMQConnectionFactory("vm://"+brokerName);
121     }
122
123     protected int expectedReceiveCount() {
124         return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
125     }
126
127     protected Connection JavaDoc createConnection(String JavaDoc name) throws Exception JavaDoc {
128         return createGenericClusterFactory(name).createConnection();
129     }
130
131     protected Destination JavaDoc createDestination() {
132         return createDestination(getClass().getName());
133     }
134
135     protected Destination JavaDoc createDestination(String JavaDoc name) {
136         if (topic) {
137             return new ActiveMQTopic(name);
138         }
139         else {
140             return new ActiveMQQueue(name);
141         }
142     }
143
144
145     /**
146      * @param msg
147      */

148     public void onMessage(Message JavaDoc msg) {
149         //log.info("GOT: " + msg);
150
receivedMessageCount.incrementAndGet();
151         synchronized (receivedMessageCount) {
152             if (receivedMessageCount.get() >= expectedReceiveCount()) {
153                 receivedMessageCount.notify();
154             }
155         }
156     }
157
158     /**
159      * @throws Exception
160      */

161     public void testSendReceive() throws Exception JavaDoc {
162         for (int i = 0;i < MESSAGE_COUNT;i++) {
163             TextMessage JavaDoc textMessage = new ActiveMQTextMessage();
164             textMessage.setText("MSG-NO:" + i);
165             for (int x = 0;x < producers.length;x++) {
166                 producers[x].send(textMessage);
167             }
168         }
169         synchronized (receivedMessageCount) {
170             if (receivedMessageCount.get() < expectedReceiveCount()) {
171                 receivedMessageCount.wait(20000);
172             }
173         }
174         //sleep a little - to check we don't get too many messages
175
Thread.sleep(2000);
176         System.err.println("GOT: " + receivedMessageCount.get());
177         assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
178     }
179
180 }
181
Popular Tags