KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsMultipleClientsTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import org.apache.activemq.command.ActiveMQDestination;
21 import org.apache.activemq.command.ActiveMQQueue;
22 import org.apache.activemq.command.ActiveMQTopic;
23 import org.apache.activemq.util.MessageIdList;
24 import org.apache.activemq.broker.BrokerService;
25 import org.apache.activemq.broker.BrokerFactory;
26
27 import java.util.List JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.Iterator JavaDoc;
32 import java.util.Arrays JavaDoc;
33 import java.util.Collections JavaDoc;
34 import java.net.URI JavaDoc;
35
36 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
37
38 import javax.jms.*;
39
40 /**
41  * Test case support used to test multiple message comsumers and message producers connecting to a single broker.
42  *
43  * @version $Revision$
44  */

45 public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
46     private AtomicInteger JavaDoc producerLock;
47
48     protected Map JavaDoc consumers = new HashMap JavaDoc(); // Map of consumer with messages received
49
protected int consumerCount = 1;
50     protected int producerCount = 1;
51
52     protected int messageSize = 1024;
53
54     protected boolean useConcurrentSend = true;
55     protected boolean durable = false;
56     protected boolean topic = false;
57     protected boolean persistent = false;
58
59     protected BrokerService broker;
60     protected Destination destination;
61     protected List connections = Collections.synchronizedList(new ArrayList JavaDoc());
62     protected MessageIdList allMessagesList = new MessageIdList();
63
64     protected void startProducers(Destination dest, int msgCount) throws Exception JavaDoc {
65         startProducers(createConnectionFactory(), dest, msgCount);
66     }
67
68     protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception JavaDoc {
69         // Use concurrent send
70
if (useConcurrentSend) {
71             producerLock = new AtomicInteger JavaDoc(producerCount);
72
73             for (int i=0; i<producerCount; i++) {
74                 Thread JavaDoc t = new Thread JavaDoc(new Runnable JavaDoc() {
75                     public void run() {
76                         try {
77                             sendMessages(factory.createConnection(), dest, msgCount);
78                         } catch (Exception JavaDoc e) {
79                             e.printStackTrace();
80                         }
81
82                         synchronized (producerLock) {
83                             producerLock.decrementAndGet();
84                             producerLock.notifyAll();
85                         }
86                     }
87                 });
88
89                 t.start();
90             }
91
92             // Wait for all producers to finish sending
93
synchronized (producerLock) {
94                 while (producerLock.get() != 0) {
95                     producerLock.wait(2000);
96                 }
97             }
98
99
100         // Use serialized send
101
} else {
102             for (int i=0; i<producerCount; i++) {
103                 sendMessages(factory.createConnection(), dest, msgCount);
104             }
105         }
106     }
107
108     protected void sendMessages(Connection connection, Destination destination, int count) throws Exception JavaDoc {
109         connection.start();
110
111         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
112         MessageProducer producer = session.createProducer(destination);
113         producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
114         
115         for (int i = 0; i < count; i++) {
116             TextMessage msg = createTextMessage(session, "" + i);
117             producer.send(msg);
118         }
119
120         producer.close();
121         session.close();
122         connection.close();
123     }
124
125     protected TextMessage createTextMessage(Session session, String JavaDoc initText) throws Exception JavaDoc {
126         TextMessage msg = session.createTextMessage();
127
128         // Pad message text
129
if (initText.length() < messageSize) {
130             char[] data = new char[messageSize - initText.length()];
131             Arrays.fill(data, '*');
132             String JavaDoc str = new String JavaDoc(data);
133             msg.setText(initText + str);
134
135         // Do not pad message text
136
} else {
137             msg.setText(initText);
138         }
139
140         return msg;
141     }
142
143     protected void startConsumers(Destination dest) throws Exception JavaDoc {
144         startConsumers(createConnectionFactory(), dest);
145     }
146
147     protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception JavaDoc {
148         MessageConsumer consumer;
149         for (int i=0; i<consumerCount; i++) {
150             if (durable && topic) {
151                 consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i+1));
152             } else {
153                 consumer = createMessageConsumer(factory.createConnection(), dest);
154             }
155             MessageIdList list = new MessageIdList();
156             list.setParent(allMessagesList);
157             consumer.setMessageListener(list);
158             consumers.put(consumer, list);
159         }
160     }
161
162     protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception JavaDoc {
163         connections.add(conn);
164
165         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
166         final MessageConsumer consumer = sess.createConsumer(dest);
167         conn.start();
168
169         return consumer;
170     }
171
172     protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String JavaDoc name) throws Exception JavaDoc {
173         conn.setClientID(name);
174         connections.add(conn);
175         conn.start();
176
177         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
178         final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic JavaDoc)dest, name);
179
180         return consumer;
181     }
182
183     protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception JavaDoc {
184         allMessagesList.waitForMessagesToArrive(messageCount);
185     }
186
187     protected ActiveMQDestination createDestination() throws JMSException {
188         String JavaDoc name = "." + getClass().getName() + "." + getName();
189         if (topic) {
190             destination = new ActiveMQTopic("Topic" + name);
191             return (ActiveMQDestination)destination;
192         } else {
193             destination = new ActiveMQQueue("Queue" + name);
194             return (ActiveMQDestination)destination;
195         }
196     }
197
198     protected ConnectionFactory createConnectionFactory() throws Exception JavaDoc {
199         return new ActiveMQConnectionFactory("vm://localhost");
200     }
201
202     protected BrokerService createBroker() throws Exception JavaDoc {
203         return BrokerFactory.createBroker(new URI JavaDoc("broker://()/localhost?persistent=false&useJmx=true"));
204     }
205
206     protected void setUp() throws Exception JavaDoc {
207         super.setAutoFail(true);
208         super.setUp();
209         broker = createBroker();
210         broker.start();
211     }
212
213     protected void tearDown() throws Exception JavaDoc {
214         for (Iterator JavaDoc iter = connections.iterator(); iter.hasNext();) {
215             Connection conn= (Connection) iter.next();
216             try {
217                 conn.close();
218             } catch (Throwable JavaDoc e) {
219             }
220         }
221         broker.stop();
222         allMessagesList.flushMessages();
223         consumers.clear();
224         super.tearDown();
225     }
226
227     /*
228      * Some helpful assertions for multiple consumers.
229      */

230     protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
231         MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
232         messageIdList.assertAtLeastMessagesReceived(msgCount);
233     }
234
235     protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
236         MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
237         messageIdList.assertAtMostMessagesReceived(msgCount);
238     }
239
240     protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
241         MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
242         messageIdList.assertMessagesReceivedNoWait(msgCount);
243     }
244
245     protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
246         for (Iterator JavaDoc i=consumers.keySet().iterator();i.hasNext();) {
247             assertConsumerReceivedAtLeastXMessages((MessageConsumer)i.next(), msgCount);
248         }
249     }
250
251     protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
252         for (Iterator JavaDoc i=consumers.keySet().iterator();i.hasNext();) {
253             assertConsumerReceivedAtMostXMessages((MessageConsumer)i.next(), msgCount);
254         }
255     }
256
257     protected void assertEachConsumerReceivedXMessages(int msgCount) {
258         for (Iterator JavaDoc i=consumers.keySet().iterator();i.hasNext();) {
259             assertConsumerReceivedXMessages((MessageConsumer)i.next(), msgCount);
260         }
261     }
262
263     protected void assertTotalMessagesReceived(int msgCount) {
264         allMessagesList.assertMessagesReceivedNoWait(msgCount);
265         
266         // now lets count the individual messages received
267
int totalMsg = 0;
268         for (Iterator JavaDoc i=consumers.keySet().iterator(); i.hasNext();) {
269             MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
270             totalMsg += messageIdList.getMessageCount();
271         }
272         assertEquals("Total of consumers message count", msgCount, totalMsg);
273     }
274 }
275
Popular Tags