KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > AbstractConsumerEndpoint


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  */

43 package org.exolab.jms.messagemgr;
44
45 import javax.jms.InvalidSelectorException JavaDoc;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49
50 import org.exolab.jms.client.JmsDestination;
51 import org.exolab.jms.message.MessageImpl;
52 import org.exolab.jms.scheduler.Scheduler;
53 import org.exolab.jms.selector.Selector;
54 import org.exolab.jms.server.ServerConnection;
55 import org.exolab.jms.server.JmsServerSession;
56
57
58 /**
59  * Abstract implementation of the {@link ConsumerEndpoint} interface.
60  *
61  * @author <a HREF="mailto:jima@exoffice.com">Jim Alateras</a>
62  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
63  * @version $Revision: 1.1 $ $Date: 2005/03/18 03:58:38 $
64  */

65 public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint {
66
67     /**
68      * The listener for messages.
69      */

70     private final ConsumerEndpointListener _session;
71
72     /**
73      * The identity of this consumer.
74      */

75     private final long _id;
76
77     /**
78      * The identity of the connection that owns this consumer.
79      */

80     private final long _connectionId;
81
82     /**
83      * The destination the consumer is acceesing.
84      */

85     private final JmsDestination _destination;
86
87     /**
88      * The message selector associated with this consumer. May be
89      * <code>null</code>.
90      */

91     private final Selector _selector;
92
93     /**
94      * If true, and the destination is a topic, inhibits the delivery of
95      * messages published by its own connection.
96      */

97     private final boolean _noLocal;
98
99     /**
100      * This determines whether message delivery to the registered listener is
101      * enabled or disabled.
102      */

103     private volatile boolean _stopped = true;
104
105     /**
106      * Identifies this endpoint as being closed.
107      */

108     private volatile boolean _closed = false;
109
110     /**
111      * Indicates whether the this cache has been scheduled with the dispatcher
112      * for asynchronous message delivery.
113      */

114     private boolean _scheduled = false;
115
116     /**
117      * Synchronization helper for close() and deliverMessages().
118      */

119     private final Object JavaDoc _lock = new Object JavaDoc();
120
121     /**
122      * Flag to indicate that the consumer is waiting for a message.
123      */

124     private boolean _waitingForMessage = false;;
125
126     /**
127      * Holds the consumer's message listener. This means that messages will be
128      * pushed down.
129      */

130     protected ConsumerEndpointListener _listener = null;
131
132     /**
133      * Maintains the maximum size of this cache.
134      */

135     protected int _size = 1000;
136
137     /**
138      * This is the scheduler that is used to deliver messages if a consumer has
139      * a registered listener.
140      */

141     protected transient Scheduler _scheduler = null;
142
143     /**
144      * The logger.
145      */

146     private static final Log _log = LogFactory.getLog(
147             AbstractConsumerEndpoint.class);
148
149
150     /**
151      * Construct a new <code>ConsumerEndpoint</code>.
152      * <p/>
153      * The destination and selector determine where it will be sourcing its
154      * messages from, and scheduler is used to asynchronously deliver messages
155      * to the consumer.
156      *
157      * @param consumerId the identity of this consumer
158      * @param session the owning session
159      * @param destination the destination to access
160      * @param selector the message selector. May be <code>null</code>
161      * @param noLocal if true, and the destination is a topic, inhibits the
162      * delivery of messages published by its own connection.
163      * @param scheduler used to schedule asynchronous message delivery.
164      * @throws InvalidSelectorException if the selector is not well formed
165      */

166     public AbstractConsumerEndpoint(long consumerId, JmsServerSession session,
167                                     JmsDestination destination,
168                                     String JavaDoc selector, boolean noLocal,
169                                     Scheduler scheduler)
170             throws InvalidSelectorException JavaDoc {
171         if (session == null) {
172             throw new IllegalArgumentException JavaDoc("Argument 'session' is null");
173         }
174         if (destination == null) {
175             throw new IllegalArgumentException JavaDoc(
176                     "Argument 'destination' is null");
177         }
178         if (scheduler == null) {
179             throw new IllegalArgumentException JavaDoc("Argument 'scheduler' is null");
180         }
181
182         _id = consumerId;
183         _connectionId = session.getConnectionId();
184         _destination = destination;
185         _selector = (selector != null) ? new Selector(selector) : null;
186         _noLocal = noLocal;
187         _session = session;
188         _scheduler = scheduler;
189     }
190
191     /**
192      * Returns the identity of this consumer.
193      *
194      * @return the identity of this consumer
195      */

196     public long getId() {
197         return _id;
198     }
199
200     /**
201      * Determines if this is a persistent or non-persistent consumer.
202      * <p/>
203      * If persistent, then the consumer is persistent accross subscriptions and
204      * server restarts, and {@link #getPersistentId} returns a non-null value.
205      *
206      * @return <code>false</code>
207      */

208     public boolean isPersistent() {
209         return false;
210     }
211
212     /**
213      * Returns the persistent identifier for this consumer. This is the identity
214      * of the consumer which is persistent across subscriptions and server
215      * restarts.
216      *
217      * @return <code>null</code>
218      */

219     public String JavaDoc getPersistentId() {
220         return null;
221     }
222
223     /**
224      * Return the destination that this consumer is accessing.
225      *
226      * @return the destination that this consumer is accessing
227      */

228     public JmsDestination getDestination() {
229         return _destination;
230     }
231
232     /**
233      * Returns the identity of the connection that owns this consumer.
234      *
235      * @return the identity of the connection
236      * @see ServerConnection#getConnectionId
237      */

238     public long getConnectionId() {
239         return _connectionId;
240     }
241
242     /**
243      * Returns the message selector.
244      *
245      * @return the message selector, or <code>null</code> if none was specified
246      * by the client
247      */

248     public Selector getSelector() {
249         return _selector;
250     }
251
252
253     /**
254      * Returns if locally produced messages are being inhibited.
255      *
256      * @return <code>true</code> if locally published messages are being
257      * inhibited.
258      */

259     public boolean getNoLocal() {
260         return _noLocal;
261     }
262
263     /**
264      * Stop/start message delivery.
265      *
266      * @param stop if <code>true</code> to stop message delivery, otherwise
267      * start it
268      */

269     public synchronized void setStopped(boolean stop) {
270         if (stop) {
271             _stopped = true;
272         } else {
273             _stopped = false;
274             // schedule message delivery if needed
275
schedule();
276         }
277     }
278
279     /**
280      * Set the message listener for this consumer. If a message listener is set
281      * then messages will be scheduled to be sent to it when they are available
282      * <p/>
283      *
284      * @param listener the message listener to add, or <code>null</code> to
285      * remove an existing listener
286      */

287     public synchronized void setMessageListener(
288             ConsumerEndpointListener listener) {
289         _listener = listener;
290         if (listener == null) {
291             // remove this from the scheduler
292
_scheduler.remove(this);
293             _scheduled = false;
294         } else {
295             // schedule it to run
296
schedule();
297         }
298     }
299
300     /**
301      * The run method is used to asynchronously deliver the messages in the
302      * cache to the consumer, by invoking {@link #deliverMessages}.
303      * <p/>
304      * It is scheduled by the {@link Scheduler}.
305      */

306     public void run() {
307         synchronized (_lock) {
308             if (!_closed) {
309                 boolean reschedule = deliverMessages();
310                 _scheduled = false;
311                 if (reschedule) {
312                     schedule();
313                 }
314             }
315         }
316     }
317
318
319     /**
320      * Close this endpoint.
321      * <p/>
322      * This synchronizes with {@link #deliverMessages} before invoking
323      * {@link #doClose}
324      */

325     public final void close() {
326         _stopped = true;
327
328         synchronized (_lock) {
329             // synchronize with deliverMessages()
330
_scheduler.remove(this); // remove this, if it is scheduled
331
_scheduled = false;
332
333         }
334
335         synchronized (this) {
336             doClose();
337             _closed = true;
338         }
339     }
340
341     /**
342      * Returns a stringified version of the consumer.
343      *
344      * @return a stringified version of the consumer
345      */

346     public String JavaDoc toString() {
347         return _id + ":" + getDestination();
348     }
349
350
351     /**
352      * Deliver messages in the cache to the consumer.
353      *
354      * @return <code>true</code> if the endpoint should be rescheduled
355      */

356     protected abstract boolean deliverMessages();
357
358     /**
359      * Closes the endpoint.
360      */

361     protected abstract void doClose();
362
363     /**
364      * Schedule asynchronouse message delivery.
365      */

366     protected void schedule() {
367         if (!_stopped && !_closed && _listener != null && !_scheduled) {
368             _scheduled = true;
369             _scheduler.add(this);
370         }
371     }
372
373     /**
374      * Determines if this endpoint has been stopped.
375      *
376      * @return <code>true</code> if this endpoint has been stopped
377      */

378     protected final boolean isStopped() {
379         return _stopped;
380     }
381
382     /**
383      * Check if the consumer is waiting for a message. If it is then notify it
384      * that a message has arrived.
385      */

386     protected void notifyMessageAvailable() {
387         // if we need to notify then send out the request
388
if (isWaitingForMessage()) {
389             clearWaitingForMessage();
390
391             try {
392                 _session.onMessageAvailable(getId());
393             } catch (Exception JavaDoc exception) {
394                 if (_log.isDebugEnabled()) {
395                     _log.debug("Failed to notify consumer of available message",
396                                exception);
397                 }
398             }
399         }
400     }
401
402     /**
403      * Determines if the endpoint is waiting for a message.
404      *
405      * @return <code>true</code> if the endpoint is waiting
406      */

407     protected final boolean isWaitingForMessage() {
408         return _waitingForMessage;
409     }
410
411     /**
412      * Set the waiting for message flag.
413      */

414     protected final void setWaitingForMessage() {
415         _waitingForMessage = true;
416     }
417
418     /**
419      * Clear the waiting for message flag.
420      */

421     protected final void clearWaitingForMessage() {
422         _waitingForMessage = false;
423     }
424
425     /**
426      * Helper for {@link #deliverMessages} implementations, to determine if
427      * asynchronous message delivery should be stopped.
428      *
429      * @return <code>true</code> if asynchronous message delivery should be
430      * stopped
431      */

432     protected boolean stopDelivery() {
433         return (_stopped || getMessageCount() == 0 || _listener == null);
434     }
435
436     /**
437      * Determines if a message is selected by the consumer.
438      *
439      * @param message the message to check
440      * @return <code>true</code> if the message is selected; otherwise
441      * <code>false</code>
442      */

443     protected boolean selects(MessageImpl message) {
444         return (_selector == null || _selector.selects(message));
445     }
446
447 }
448
Popular Tags