KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > test > JmsSendReceiveTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.test;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Arrays JavaDoc;
22 import java.util.Collections JavaDoc;
23 import java.util.Date JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.List JavaDoc;
26
27 import javax.jms.DeliveryMode JavaDoc;
28 import javax.jms.Destination JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageConsumer JavaDoc;
32 import javax.jms.MessageListener JavaDoc;
33 import javax.jms.MessageProducer JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.jms.TextMessage JavaDoc;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40 /**
41  * @version $Revision: 1.2 $
42  */

43 public abstract class JmsSendReceiveTestSupport extends TestSupport implements MessageListener JavaDoc {
44     protected static final Log log = LogFactory.getLog(JmsSendReceiveTestSupport.class);
45     
46     protected int messageCount = 100;
47     protected String JavaDoc[] data;
48     protected Session JavaDoc session;
49     protected Session JavaDoc consumeSession;
50     protected MessageConsumer JavaDoc consumer;
51     protected MessageProducer JavaDoc producer;
52     protected Destination JavaDoc consumerDestination;
53     protected Destination JavaDoc producerDestination;
54     protected List JavaDoc messages = createConcurrentList();
55     protected boolean topic = true;
56     protected boolean durable = false;
57     protected int deliveryMode = DeliveryMode.PERSISTENT;
58     protected final Object JavaDoc lock = new Object JavaDoc();
59     protected boolean verbose = false;
60     protected boolean useSeparateSession = false;
61     protected boolean largeMessages = false;
62     protected int largeMessageLoopSize = 4 * 1024;
63
64     /*
65      * @see junit.framework.TestCase#setUp()
66      */

67     protected void setUp() throws Exception JavaDoc {
68         super.setUp();
69         String JavaDoc temp = System.getProperty("messageCount");
70         
71         if (temp != null) {
72             int i = Integer.parseInt(temp);
73             if (i > 0) {
74                 messageCount = i;
75             }
76         }
77         
78         log.info("Message count for test case is: " + messageCount);
79         data = new String JavaDoc[messageCount];
80         
81         for (int i = 0; i < messageCount; i++) {
82             data[i] = createMessageText(i);
83         }
84     }
85
86
87     protected String JavaDoc createMessageText(int i) {
88         if (largeMessages) {
89             return createMessageBodyText();
90         }
91         else {
92             return "Text for message: " + i + " at " + new Date JavaDoc();
93         }
94     }
95
96     protected String JavaDoc createMessageBodyText() {
97         StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
98         for (int i = 0; i < largeMessageLoopSize; i++) {
99             buffer.append("0123456789");
100         }
101         return buffer.toString();
102     }
103
104     /**
105      * Test if all the messages sent are being received.
106      *
107      * @throws Exception
108      */

109     public void testSendReceive() throws Exception JavaDoc {
110
111         Thread.sleep(1000);
112         messages.clear();
113         
114         for (int i = 0; i < data.length; i++) {
115             Message JavaDoc message = createMessage(i);
116             configureMessage(message);
117             if (verbose) {
118                 log.info("About to send a message: " + message + " with text: " + data[i]);
119             }
120             
121             producer.send(producerDestination, message);
122         }
123         
124         assertMessagesAreReceived();
125         log.info("" + data.length + " messages(s) received, closing down connections");
126     }
127
128
129     protected Message JavaDoc createMessage(int index) throws JMSException JavaDoc {
130         Message JavaDoc message = session.createTextMessage(data[index]);
131         return message;
132     }
133
134     /**
135      * A hook to allow the message to be configured such as adding extra headers
136      * @throws JMSException
137      */

138     protected void configureMessage(Message JavaDoc message) throws JMSException JavaDoc {
139     }
140
141
142     /**
143      * Waits to receive the messages and performs the test if all messages have been received and
144      * are in sequential order.
145      *
146      * @throws JMSException
147      */

148     protected void assertMessagesAreReceived() throws JMSException JavaDoc {
149         waitForMessagesToBeDelivered();
150         assertMessagesReceivedAreValid(messages);
151     }
152
153     /**
154      * Tests if the messages have all been received and are in sequential order.
155      *
156      * @param receivedMessages
157      * @throws JMSException
158      */

159     protected void assertMessagesReceivedAreValid(List JavaDoc receivedMessages) throws JMSException JavaDoc {
160         List JavaDoc copyOfMessages = Arrays.asList(receivedMessages.toArray());
161         int counter = 0;
162         
163         if (data.length != copyOfMessages.size()) {
164             for (Iterator JavaDoc iter = copyOfMessages.iterator(); iter.hasNext();) {
165                 Object JavaDoc message = iter.next();
166                 log.info("<== " + counter++ + " = " + message);
167             }
168         }
169         
170         assertEquals("Not enough messages received", data.length, receivedMessages.size());
171         
172         for (int i = 0; i < data.length; i++) {
173             Message JavaDoc received = (Message JavaDoc) receivedMessages.get(i);
174             assertMessageValid(i, received);
175         }
176     }
177
178
179     protected void assertMessageValid(int index, Message JavaDoc message) throws JMSException JavaDoc {
180         TextMessage JavaDoc textMessage = (TextMessage JavaDoc) message;
181         String JavaDoc text = textMessage.getText();
182       
183         if (verbose) {
184             log.info("Received Text: " + text);
185         }
186         
187         assertEquals("Message: " + index, data[index], text);
188     }
189
190     /**
191      * Waits for the messages to be delivered or when the wait time has been reached.
192      */

193     protected void waitForMessagesToBeDelivered() {
194         long maxWaitTime = 60000;
195         long waitTime = maxWaitTime;
196         long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
197         
198         synchronized (lock) {
199             while (messages.size() < data.length && waitTime >= 0) {
200                 try {
201                     lock.wait(200);
202                 }
203                 catch (InterruptedException JavaDoc e) {
204                     e.printStackTrace();
205                 }
206                 
207                 waitTime = maxWaitTime - (System.currentTimeMillis() - start);
208             }
209         }
210     }
211
212
213     /**
214      * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
215      */

216     public synchronized void onMessage(Message JavaDoc message) {
217         consumeMessage(message, messages);
218     }
219
220     /**
221      * Consumes a received message.
222      *
223      * @param message - a newly received message.
224      * @param messageList - list containing the received messages.
225      */

226     protected void consumeMessage(Message JavaDoc message, List JavaDoc messageList) {
227         if (verbose) {
228             log.info("Received message: " + message);
229         }
230        
231         messageList.add(message);
232        
233         if (messageList.size() >= data.length) {
234             synchronized (lock) {
235                 lock.notifyAll();
236             }
237         }
238     }
239
240     /**
241      * Creates a synchronized list.
242      *
243      * @return a synchronized view of the specified list.
244      */

245     protected List JavaDoc createConcurrentList() {
246         return Collections.synchronizedList(new ArrayList JavaDoc());
247     }
248 }
249
Popular Tags