KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsSendReceiveTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import javax.jms.DeliveryMode JavaDoc;
21 import javax.jms.Destination JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import javax.jms.Message JavaDoc;
24 import javax.jms.MessageConsumer JavaDoc;
25 import javax.jms.MessageListener JavaDoc;
26 import javax.jms.MessageProducer JavaDoc;
27 import javax.jms.Session JavaDoc;
28 import javax.jms.TextMessage JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Arrays JavaDoc;
31 import java.util.Collections JavaDoc;
32 import java.util.Date JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.List JavaDoc;
35
36 /**
37  * @version $Revision: 1.7 $
38  */

39 public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener JavaDoc {
40     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
41             .getLog(JmsSendReceiveTestSupport.class);
42     
43     protected int messageCount = 100;
44     protected String JavaDoc[] data;
45     protected Session JavaDoc session;
46     protected MessageConsumer JavaDoc consumer;
47     protected MessageProducer JavaDoc producer;
48     protected Destination JavaDoc consumerDestination;
49     protected Destination JavaDoc producerDestination;
50     protected List JavaDoc messages = createConcurrentList();
51     protected boolean topic = true;
52     protected boolean durable = false;
53     protected int deliveryMode = DeliveryMode.PERSISTENT;
54     protected final Object JavaDoc lock = new Object JavaDoc();
55     protected boolean verbose = false;
56
57     /*
58      * @see junit.framework.TestCase#setUp()
59      */

60     protected void setUp() throws Exception JavaDoc {
61         super.setUp();
62         String JavaDoc temp = System.getProperty("messageCount");
63         
64         if (temp != null) {
65             int i = Integer.parseInt(temp);
66             if (i > 0) {
67                 messageCount = i;
68             }
69         }
70         
71         log.info("Message count for test case is: " + messageCount);
72         data = new String JavaDoc[messageCount];
73         
74         for (int i = 0; i < messageCount; i++) {
75             data[i] = "Text for message: " + i + " at " + new Date JavaDoc();
76         }
77     }
78     
79     /**
80      * Sends and consumes the messages.
81      *
82      * @throws Exception
83      */

84     public void testSendReceive() throws Exception JavaDoc {
85         messages.clear();
86         
87         for (int i = 0; i < data.length; i++) {
88             Message message = session.createTextMessage(data[i]);
89             message.setStringProperty("stringProperty",data[i]);
90             message.setIntProperty("intProperty",i);
91         
92             if (verbose) {
93                 if (log.isDebugEnabled()) {
94                     log.debug("About to send a message: " + message + " with text: " + data[i]);
95                 }
96             }
97             
98             producer.send(producerDestination, message);
99             messageSent();
100         }
101         
102         assertMessagesAreReceived();
103         log.info("" + data.length + " messages(s) received, closing down connections");
104     }
105
106     /**
107      * Asserts messages are received.
108      *
109      * @throws JMSException
110      */

111     protected void assertMessagesAreReceived() throws JMSException JavaDoc {
112         waitForMessagesToBeDelivered();
113         assertMessagesReceivedAreValid(messages);
114     }
115
116     /**
117      * Tests if the messages received are valid.
118      *
119      * @param receivedMessages - list of received messages.
120      * @throws JMSException
121      */

122     protected void assertMessagesReceivedAreValid(List JavaDoc receivedMessages) throws JMSException JavaDoc {
123         List JavaDoc copyOfMessages = Arrays.asList(receivedMessages.toArray());
124         int counter = 0;
125         
126         if (data.length != copyOfMessages.size()) {
127             for (Iterator JavaDoc iter = copyOfMessages.iterator(); iter.hasNext();) {
128                 TextMessage JavaDoc message = (TextMessage JavaDoc) iter.next();
129                 if (log.isDebugEnabled()) {
130                     log.info("<== " + counter++ + " = " + message);
131                 }
132             }
133         }
134         
135         assertEquals("Not enough messages received", data.length, receivedMessages.size());
136         
137         for (int i = 0; i < data.length; i++) {
138             TextMessage JavaDoc received = (TextMessage JavaDoc) receivedMessages.get(i);
139             String JavaDoc text = received.getText();
140             String JavaDoc stringProperty = received.getStringProperty("stringProperty");
141             int intProperty = received.getIntProperty("intProperty");
142             
143             if (verbose) {
144                 if (log.isDebugEnabled()) {
145                     log.info("Received Text: " + text);
146                 }
147             }
148             
149             assertEquals("Message: " + i, data[i], text);
150             assertEquals(data[i],stringProperty);
151             assertEquals(i,intProperty) ;
152         }
153     }
154
155     /**
156      * Waits for messages to be delivered.
157      */

158     protected void waitForMessagesToBeDelivered() {
159         long maxWaitTime = 30000;
160         long waitTime = maxWaitTime;
161         long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
162         
163         synchronized (lock) {
164             while (messages.size() < data.length && waitTime >= 0) {
165                 try {
166                     lock.wait(200);
167                 }
168                 catch (InterruptedException JavaDoc e) {
169                     e.printStackTrace();
170                 }
171         
172                 waitTime = maxWaitTime - (System.currentTimeMillis() - start);
173             }
174         }
175     }
176
177     /* (non-Javadoc)
178      * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
179      */

180     public synchronized void onMessage(Message message) {
181         consumeMessage(message, messages);
182     }
183
184     /**
185      * Consumes messages.
186      *
187      * @param message - message to be consumed.
188      * @param messageList -list of consumed messages.
189      */

190     protected void consumeMessage(Message message, List JavaDoc messageList) {
191         if (verbose) {
192             if (log.isDebugEnabled()) {
193                 log.info("Received message: " + message);
194             }
195         }
196         
197         messageList.add(message);
198         
199         if (messageList.size() >= data.length) {
200             synchronized (lock) {
201                 lock.notifyAll();
202             }
203         }
204     }
205
206     /**
207      * Returns the ArrayList as a synchronized list.
208      *
209      * @return List
210      */

211     protected List JavaDoc createConcurrentList() {
212         return Collections.synchronizedList(new ArrayList JavaDoc());
213     }
214     
215     /**
216      * Just a hook so can insert failure tests
217      * @throws Exception
218      *
219      */

220     protected void messageSent() throws Exception JavaDoc{
221         
222     }
223 }
224
Popular Tags