KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQQueueBrowser


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.util.Enumeration JavaDoc;
21
22 import javax.jms.IllegalStateException JavaDoc;
23 import javax.jms.JMSException JavaDoc;
24 import javax.jms.Message JavaDoc;
25 import javax.jms.Queue JavaDoc;
26 import javax.jms.QueueBrowser JavaDoc;
27
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ConsumerId;
30 import org.apache.activemq.command.MessageDispatch;
31
32 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
33
34 /**
35  * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
36  * queue without removing them.
37  * <p/>
38  * <P>
39  * The <CODE>getEnumeration</CODE> method returns a <CODE>
40  * java.util.Enumeration</CODE> that is used to scan the queue's messages. It
41  * may be an enumeration of the entire content of a queue, or it may contain
42  * only the messages matching a message selector.
43  * <p/>
44  * <P>
45  * Messages may be arriving and expiring while the scan is done. The JMS API
46  * does not require the content of an enumeration to be a static snapshot of
47  * queue content. Whether these changes are visible or not depends on the JMS
48  * provider.
49  * <p/>
50  * <P>
51  * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
52  * </CODE> or a <CODE>QueueSession</CODE>.
53  *
54  * @see javax.jms.Session#createBrowser
55  * @see javax.jms.QueueSession#createBrowser
56  * @see javax.jms.QueueBrowser
57  * @see javax.jms.QueueReceiver
58  */

59
60 public class ActiveMQQueueBrowser implements
61         QueueBrowser JavaDoc, Enumeration JavaDoc {
62
63     private final ActiveMQSession session;
64     private final ActiveMQDestination destination;
65     private final String JavaDoc selector;
66     
67     private ActiveMQMessageConsumer consumer;
68     private boolean closed;
69     private final ConsumerId consumerId;
70     private final AtomicBoolean JavaDoc browseDone = new AtomicBoolean JavaDoc(true);
71     private final boolean dispatchAsync;
72     private Object JavaDoc semaphore = new Object JavaDoc();
73     
74     /**
75      * Constructor for an ActiveMQQueueBrowser - used internally
76      *
77      * @param theSession
78      * @param dest
79      * @param selector
80      * @throws JMSException
81      */

82     protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String JavaDoc selector, boolean dispatchAsync) throws JMSException JavaDoc {
83         this.session = session;
84         this.consumerId = consumerId;
85         this.destination = destination;
86         this.selector = selector;
87         this.dispatchAsync=dispatchAsync;
88         this.consumer = createConsumer();
89     }
90
91     /**
92      * @param session
93      * @param originalDestination
94      * @param selectorExpression
95      * @param cnum
96      * @return
97      * @throws JMSException
98      */

99     private ActiveMQMessageConsumer createConsumer() throws JMSException JavaDoc {
100         browseDone.set(false);
101         ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
102         return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(),
103                 prefetchPolicy.getMaximumPendingMessageLimit(), false, true, dispatchAsync) {
104             public void dispatch(MessageDispatch md) {
105                 if( md.getMessage()==null ) {
106                     browseDone.set(true);
107                 } else {
108                     super.dispatch(md);
109                 }
110                 notifyMessageAvailable();
111             }
112         };
113     }
114
115     private void destroyConsumer() {
116         if( consumer == null )
117             return;
118         try {
119             consumer.close();
120             consumer=null;
121         } catch (JMSException JavaDoc e) {
122             e.printStackTrace();
123         }
124     }
125
126     /**
127      * Gets an enumeration for browsing the current queue messages in the order
128      * they would be received.
129      *
130      * @return an enumeration for browsing the messages
131      * @throws JMSException if the JMS provider fails to get the enumeration for this
132      * browser due to some internal error.
133      */

134
135     public Enumeration JavaDoc getEnumeration() throws JMSException JavaDoc {
136         checkClosed();
137         if( consumer==null )
138             consumer = createConsumer();
139         return this;
140     }
141
142     private void checkClosed() throws IllegalStateException JavaDoc {
143         if (closed) {
144             throw new IllegalStateException JavaDoc("The Consumer is closed");
145         }
146     }
147
148     /**
149      * @return true if more messages to process
150      */

151     public boolean hasMoreElements() {
152         while( true ) {
153             
154             synchronized(this) {
155                 if( consumer==null )
156                     return false;
157             }
158             
159             if( consumer.getMessageSize() > 0 ) {
160                 return true;
161             }
162             
163             if( browseDone.get() || !session.isRunning() ) {
164                 destroyConsumer();
165                 return false;
166             }
167             
168             waitForMessage();
169         }
170     }
171
172
173     /**
174      * @return the next message
175      */

176     public Object JavaDoc nextElement() {
177         while( true ) {
178             
179             synchronized(this) {
180                 if( consumer==null )
181                     return null;
182             }
183             
184             try {
185                 Message answer = consumer.receiveNoWait();
186                 if( answer!=null )
187                     return answer;
188             } catch (JMSException JavaDoc e) {
189                 this.session.connection.onAsyncException(e);
190                 return null;
191             }
192             
193             if( browseDone.get() || !session.isRunning() ) {
194                 destroyConsumer();
195                 return null;
196             }
197             
198             waitForMessage();
199         }
200     }
201
202     synchronized public void close() throws JMSException JavaDoc {
203         destroyConsumer();
204         closed=true;
205     }
206
207     /**
208      * Gets the queue associated with this queue browser.
209      *
210      * @return the queue
211      * @throws JMSException if the JMS provider fails to get the queue associated
212      * with this browser due to some internal error.
213      */

214
215     public Queue JavaDoc getQueue() throws JMSException JavaDoc {
216         return (Queue JavaDoc) destination;
217     }
218
219
220     public String JavaDoc getMessageSelector() throws JMSException JavaDoc {
221         return selector;
222     }
223
224
225     // Implementation methods
226
// -------------------------------------------------------------------------
227

228     /**
229      * Wait on a semaphore for a fixed amount of time for a message to come in.
230      */

231     protected void waitForMessage() {
232         try {
233             synchronized (semaphore) {
234                 semaphore.wait(2000);
235             }
236         }
237         catch (InterruptedException JavaDoc e) {
238             Thread.currentThread().interrupt();
239         }
240     }
241     
242     
243     protected void notifyMessageAvailable() {
244         synchronized (semaphore ) {
245             semaphore.notifyAll();
246         }
247     }
248     
249     public String JavaDoc toString() {
250         return "ActiveMQQueueBrowser { value=" +consumerId+" }";
251     }
252
253 }
254
Popular Tags