KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > util > MessageIdList


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.util;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.List JavaDoc;
22 import java.util.concurrent.CountDownLatch JavaDoc;
23
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27
28 import junit.framework.Assert;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 /**
34  * A simple container of messages for performing testing and rendezvous style
35  * code. You can use this class a {@link MessageListener} and then make
36  * assertions about how many messages it has received allowing a certain maximum
37  * amount of time to ensure that the test does not hang forever.
38  *
39  * Also you can chain these instances together with the
40  * {@link #setParent(MessageListener)} method so that you can aggregate the
41  * total number of messages consumed across a number of consumers.
42  *
43  * @version $Revision: 1.6 $
44  */

45 public class MessageIdList extends Assert implements MessageListener JavaDoc {
46     
47     protected static final Log log = LogFactory.getLog(MessageIdList.class);
48
49     private List JavaDoc messageIds = new ArrayList JavaDoc();
50     private Object JavaDoc semaphore;
51     private boolean verbose;
52     private MessageListener JavaDoc parent;
53     private long maximumDuration = 15000L;
54
55     private CountDownLatch JavaDoc countDownLatch;
56
57     public MessageIdList() {
58         this(new Object JavaDoc());
59     }
60
61     public MessageIdList(Object JavaDoc semaphore) {
62         this.semaphore = semaphore;
63     }
64
65     public boolean equals(Object JavaDoc that) {
66         if (that instanceof MessageIdList) {
67             MessageIdList thatList = (MessageIdList) that;
68             return getMessageIds().equals(thatList.getMessageIds());
69         }
70         return false;
71     }
72
73     public int hashCode() {
74         synchronized (semaphore) {
75             return messageIds.hashCode() + 1;
76         }
77     }
78
79     public String JavaDoc toString() {
80         synchronized (semaphore) {
81             return messageIds.toString();
82         }
83     }
84
85     /**
86      * @return all the messages on the list so far, clearing the buffer
87      */

88     public List JavaDoc flushMessages() {
89         synchronized (semaphore) {
90             List JavaDoc answer = new ArrayList JavaDoc(messageIds);
91             messageIds.clear();
92             return answer;
93         }
94     }
95
96     public synchronized List JavaDoc getMessageIds() {
97         synchronized (semaphore) {
98             return new ArrayList JavaDoc(messageIds);
99         }
100     }
101
102     public void onMessage(Message JavaDoc message) {
103         String JavaDoc id=null;
104         try {
105             if( countDownLatch != null )
106                 countDownLatch.countDown();
107             
108             id = message.getJMSMessageID();
109             synchronized (semaphore) {
110                 messageIds.add(id);
111                 semaphore.notifyAll();
112             }
113             if (log.isDebugEnabled()) {
114                 log.debug("Received message: " + message);
115             }
116         } catch (JMSException JavaDoc e) {
117             e.printStackTrace();
118         }
119         if (parent != null) {
120             parent.onMessage(message);
121         }
122     }
123
124     public int getMessageCount() {
125         synchronized (semaphore) {
126             return messageIds.size();
127         }
128     }
129
130     public void waitForMessagesToArrive(int messageCount) {
131         log.info("Waiting for " + messageCount + " message(s) to arrive");
132
133         long start = System.currentTimeMillis();
134
135         for (int i = 0; i < messageCount; i++) {
136             try {
137                 if (hasReceivedMessages(messageCount)) {
138                     break;
139                 }
140                 long duration = System.currentTimeMillis() - start;
141                 if (duration >= maximumDuration ) {
142                     break;
143                 }
144                 synchronized (semaphore) {
145                     semaphore.wait(maximumDuration-duration);
146                 }
147             }
148             catch (InterruptedException JavaDoc e) {
149                 log.info("Caught: " + e);
150             }
151         }
152         long end = System.currentTimeMillis() - start;
153
154         log.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
155     }
156
157     /**
158      * Performs a testing assertion that the correct number of messages have
159      * been received without waiting
160      *
161      * @param messageCount
162      */

163     public void assertMessagesReceivedNoWait(int messageCount) {
164         assertEquals("expected number of messages when received", messageCount, getMessageCount());
165     }
166     
167     /**
168      * Performs a testing assertion that the correct number of messages have
169      * been received waiting for the messages to arrive up to a fixed amount of time.
170      *
171      * @param messageCount
172      */

173     public void assertMessagesReceived(int messageCount) {
174         waitForMessagesToArrive(messageCount);
175
176         assertMessagesReceivedNoWait(messageCount);
177     }
178
179     /**
180      * Asserts that there are at least the given number of messages received without waiting.
181      */

182     public void assertAtLeastMessagesReceived(int messageCount) {
183         int actual = getMessageCount();
184         assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount);
185     }
186
187     /**
188      * Asserts that there are at most the number of messages received without waiting
189      * @param messageCount
190      */

191     public void assertAtMostMessagesReceived(int messageCount) {
192         int actual = getMessageCount();
193         assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount);
194     }
195
196     public boolean hasReceivedMessage() {
197         return getMessageCount() == 0;
198     }
199
200     public boolean hasReceivedMessages(int messageCount) {
201         return getMessageCount() >= messageCount;
202     }
203
204     public boolean isVerbose() {
205         return verbose;
206     }
207
208     public void setVerbose(boolean verbose) {
209         this.verbose = verbose;
210     }
211
212     public MessageListener JavaDoc getParent() {
213         return parent;
214     }
215
216     /**
217      * Allows a parent listener to be specified such as to aggregate messages
218      * consumed across consumers
219      */

220     public void setParent(MessageListener JavaDoc parent) {
221         this.parent = parent;
222     }
223
224     
225     /**
226      * @return the maximumDuration
227      */

228     public long getMaximumDuration(){
229         return this.maximumDuration;
230     }
231
232     
233     /**
234      * @param maximumDuration the maximumDuration to set
235      */

236     public void setMaximumDuration(long maximumDuration){
237         this.maximumDuration=maximumDuration;
238     }
239
240     public void setCountDownLatch(CountDownLatch JavaDoc countDownLatch) {
241         this.countDownLatch = countDownLatch;
242     }
243
244 }
245
Popular Tags