KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > QueueBrowserEndpoint


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: QueueBrowserEndpoint.java,v 1.2 2005/03/18 03:58:39 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.util.Vector JavaDoc;
48 import java.sql.Connection JavaDoc;
49 import javax.jms.InvalidSelectorException JavaDoc;
50 import javax.jms.JMSException JavaDoc;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54
55 import org.exolab.jms.client.JmsQueue;
56 import org.exolab.jms.message.MessageImpl;
57 import org.exolab.jms.scheduler.Scheduler;
58 import org.exolab.jms.server.JmsServerSession;
59
60
61 /**
62  * A QueueBrowserEndpoint is a QueueListener to a QueueDestinationCache. This
63  * enables it to receive all the messages, which it then feeds down to the
64  * client side.
65  *
66  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
67  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson
68  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:58:39 $
69  */

70 public class QueueBrowserEndpoint extends AbstractConsumerEndpoint {
71
72     /**
73      * Cache of all handles for this consumer.
74      */

75     private MessageQueue _handles = new MessageQueue();
76
77     /**
78      * The destination that this consumer subscribes to.
79      */

80     private QueueDestinationCache _cache;
81
82     /**
83      * The logger.
84      */

85     private static final Log _log =
86             LogFactory.getLog(QueueBrowserEndpoint.class);
87
88
89     /**
90      * Create a new <code>QueueBrowserEndpoint</code>.
91      *
92      * @param consumerId the identity of this consumer
93      * @param session the owning session
94      * @param queue the queue to browse
95      * @param selector the message selector. May be <code>null</code>
96      * @param scheduler used to schedule asynchronous message delivery.
97      * @throws InvalidSelectorException if <code>selector</code> is invalid
98      * @throws JMSException if the destination cache can't be
99      * created
100      */

101     public QueueBrowserEndpoint(long consumerId, JmsServerSession session,
102                                 JmsQueue queue, String JavaDoc selector,
103                                 Scheduler scheduler)
104             throws InvalidSelectorException JavaDoc, JMSException JavaDoc {
105         super(consumerId, session, queue, selector, false, scheduler);
106
107         _cache = (QueueDestinationCache)
108                 DestinationManager.instance().getDestinationCache(queue);
109
110         // set up the message cache and register itself as a listener to the
111
// cache
112
_cache.addQueueListener(this);
113         _cache.playbackMessages(this);
114     }
115
116     /**
117      * This event is called when a non-persistent message is added to a
118      * <code>DestinationCache</code>..
119      *
120      * @param handle a handle to the added message
121      * @param message the added message
122      * @return <code>true</code> if the listener accepted the message; otherwise
123      * <code>false</ode>
124      */

125     public boolean messageAdded(MessageHandle handle, MessageImpl message) {
126         addMessage(handle);
127         return true;
128     }
129
130     /**
131      * This event is called when a persistent message is added to the
132      * <code>DestinationCache</code>.
133      *
134      * @param handle a handle to the added message
135      * @param message the added message
136      * @param connection the database connection
137      * @return <code>true</code>
138      */

139     public boolean persistentMessageAdded(MessageHandle handle,
140                                           MessageImpl message,
141                                           Connection JavaDoc connection) {
142         return messageAdded(handle, message);
143     }
144
145     /**
146      * This event is called when a message is removed from the
147      * <code>DestinationCache</code>.
148      *
149      * @param messageId the identifier of the removed message
150      */

151     public void messageRemoved(String JavaDoc messageId) {
152         _handles.remove(messageId);
153     }
154
155     /**
156      * This event is called when a message is removed from the
157      * <code>DestinationCache</code>.
158      *
159      * @param messageId a handle to the removed message
160      * @param connection the database connection
161      */

162     public void persistentMessageRemoved(String JavaDoc messageId,
163                                          Connection JavaDoc connection) {
164         messageRemoved(messageId);
165     }
166
167     /**
168      * Return the next available message to the client.
169      * <p/>
170      * This operation is not supported for QueueBrowsers.
171      *
172      * @param wait number of milliseconds to wait
173      * @return null
174      * @throws JMSException if invoked
175      */

176     public MessageHandle receive(long wait) throws JMSException JavaDoc {
177         throw new JMSException JavaDoc("Cannot call receive for QueueBrowser");
178     }
179
180     /**
181      * Return, at most, count messages from the cache.
182      *
183      * @param count the max number of messages to receive
184      * @return a list of <code>MessageHandle</code> instances
185      */

186     public Vector JavaDoc receiveMessages(int count) {
187         Vector JavaDoc messages = new Vector JavaDoc();
188         int index = 0;
189         while (index < count) {
190
191             // check if we should exit the loop
192
if (isStopped() || getMessageCount() == 0) {
193                 break;
194             }
195
196             // remove the first message from the list and check
197
// that it is not null. Synchronize the removal of
198
// the message but not the sending to the remote
199
// consumer
200
try {
201                 MessageHandle handle = removeFirstMessage();
202                 if (handle != null) {
203                     MessageImpl m = handle.getMessage();
204                     if (m != null) {
205                         // add it to the list of messages to send
206
// but only deliver messages that satisfy the
207
// selection criteria.
208
if (selects(m)) {
209                             messages.addElement(handle);
210                             ++index;
211                         } else {
212                             // drop the message
213
}
214                     } else {
215                         // message may have been consumed in the interim
216
}
217                 }
218             } catch (Exception JavaDoc exception) {
219                 _log.error(exception, exception);
220             }
221         }
222
223         return messages;
224     }
225
226     /**
227      * Set the message listener for this consumer. If a message listener is set
228      * then messages will be scheduled to be sent to it when they are available.
229      *
230      * @param listener the message listener to add, or <code>null</code> to
231      * remove an existing listener
232      */

233     public void setMessageListener(ConsumerEndpointListener listener) {
234         _log.error("QueueBrowserEndpoint.setMessageListener "
235                     + "should never be called");
236     }
237
238     /**
239      * Return the number of unsent messages in the cache for this consumer.
240      *
241      * @return the number of unsent messages
242      */

243     public int getMessageCount() {
244         return _handles.size();
245     }
246
247     /**
248      * Add the handle to the cache.
249      *
250      * @param handle the message handle to add
251      */

252     protected void addMessage(MessageHandle handle) {
253         _handles.add(handle);
254
255         // notify the consumer
256
notifyMessageAvailable();
257     }
258
259     /**
260      * Deliver messages in the cache to the consumer.
261      * <p> This is not relevant to QueueBrowsers, and thus shouldn't be invoked.
262      *
263      * @return <code>false</code>
264      */

265     protected boolean deliverMessages() {
266         _log.error(
267                 "QueueBrowserEndpoint.deliverMessages() should never be called",
268                 new Exception JavaDoc());
269         return false;
270     }
271
272     /**
273      * Remove the handle from the cache.
274      *
275      * @param handle the handle to remove
276      * @return the removed handle, or <code>null</code> if it doesn't exist
277      */

278     protected MessageHandle removeMessage(MessageHandle handle) {
279         return _handles.remove(handle);
280     }
281
282     /**
283      * Return the first message handle in the cache.
284      *
285      * @return the first message or null if cache is empty
286      */

287     protected MessageHandle removeFirstMessage() {
288         return _handles.removeFirst();
289     }
290
291     /**
292      * Closes this endpoint.
293      */

294     protected void doClose() {
295         // unregister from the DestinationCache
296
_cache.removeQueueListener(this);
297     }
298
299 }
300
Popular Tags