KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > spring > ConsumerBean


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.spring;
19
20 import javax.jms.Message JavaDoc;
21 import javax.jms.MessageListener JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.List JavaDoc;
24
25 import junit.framework.Assert;
26
27 public class ConsumerBean extends Assert implements MessageListener JavaDoc {
28     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
29             .getLog(ConsumerBean.class);
30
31     private List JavaDoc messages = new ArrayList JavaDoc();
32     private Object JavaDoc semaphore;
33     private boolean verbose;
34
35     /**
36      * Constructor.
37      */

38     public ConsumerBean() {
39         this(new Object JavaDoc());
40     }
41
42     /**
43      * Constructor, initialized semaphore object.
44      *
45      * @param semaphore
46      */

47     public ConsumerBean(Object JavaDoc semaphore) {
48         this.semaphore = semaphore;
49     }
50
51     /**
52      * @return all the messages on the list so far, clearing the buffer
53      */

54     public synchronized List JavaDoc flushMessages() {
55         List JavaDoc answer = new ArrayList JavaDoc(messages);
56         messages.clear();
57         return answer;
58     }
59
60     /**
61      * Method implemented from MessageListener interface.
62      *
63      * @param message
64      */

65     public synchronized void onMessage(Message JavaDoc message) {
66         messages.add(message);
67         if (verbose) {
68             log.info("Received: " + message);
69         }
70         synchronized (semaphore) {
71             semaphore.notifyAll();
72         }
73     }
74
75     /**
76      * Use to wait for a single message to arrive.
77      */

78     public void waitForMessageToArrive() {
79         log.info("Waiting for message to arrive");
80
81         long start = System.currentTimeMillis();
82
83         try {
84             if (hasReceivedMessage()) {
85                 synchronized (semaphore) {
86                     semaphore.wait(4000);
87                 }
88             }
89         }
90         catch (InterruptedException JavaDoc e) {
91             log.info("Caught: " + e);
92         }
93         long end = System.currentTimeMillis() - start;
94
95         log.info("End of wait for " + end + " millis");
96     }
97
98     /**
99      * Used to wait for a message to arrive given a particular message count.
100      *
101      * @param messageCount
102      */

103     public void waitForMessagesToArrive(int messageCount) {
104         log.info("Waiting for message to arrive");
105
106         long start = System.currentTimeMillis();
107
108         for (int i = 0; i < 10; i++) {
109             try {
110                 if (hasReceivedMessages(messageCount)) {
111                     break;
112                 }
113                 synchronized (semaphore) {
114                     semaphore.wait(1000);
115                 }
116             }
117             catch (InterruptedException JavaDoc e) {
118                 log.info("Caught: " + e);
119             }
120         }
121         long end = System.currentTimeMillis() - start;
122
123         log.info("End of wait for " + end + " millis");
124     }
125
126     public void assertMessagesArrived(int total) {
127         waitForMessagesToArrive(total);
128         synchronized (this) {
129             int count = messages.size();
130
131             assertEquals("Messages received", total, count);
132         }
133     }
134
135     public boolean isVerbose() {
136         return verbose;
137     }
138
139     public void setVerbose(boolean verbose) {
140         this.verbose = verbose;
141     }
142
143     /**
144      * Identifies if the message is empty.
145      *
146      * @return
147      */

148     protected boolean hasReceivedMessage() {
149         return messages.isEmpty();
150     }
151
152     /**
153      * Identifies if the message count has reached the total size of message.
154      *
155      * @param messageCount
156      * @return
157      */

158     protected synchronized boolean hasReceivedMessages(int messageCount) {
159         return messages.size() >= messageCount;
160     }
161 }
162
Popular Tags