KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > scalagent > kjoram > MessageConsumer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): Nicolas Tachker (ScalAgent)
23  */

24 package com.scalagent.kjoram;
25
26 import com.scalagent.kjoram.jms.*;
27 import com.scalagent.kjoram.util.TimerTask;
28
29 import java.util.Vector JavaDoc;
30
31 import com.scalagent.kjoram.excepts.IllegalStateException;
32 import com.scalagent.kjoram.excepts.*;
33
34
35 public class MessageConsumer
36 {
37   /** The selector for filtering messages. */
38   private String JavaDoc selector;
39   /** The message listener, if any. */
40   private MessageListener messageListener = null;
41   /** <code>true</code> for a durable subscriber. */
42   private boolean durableSubscriber;
43   /** Pending "receive" or listener request. */
44   private AbstractJmsRequest pendingReq = null;
45   /**
46    * <code>true</code> if the consumer has a pending synchronous "receive"
47    * request.
48    */

49   private boolean receiving = false;
50   /** Task for replying to a pending synchronous "receive" with timer. */
51   private TimerTask replyingTask = null;
52
53   /** The destination the consumer gets its messages from. */
54   protected Destination dest;
55   /**
56    * <code>true</code> if the subscriber does not wish to consume messages
57    * produced by its connection.
58    */

59   protected boolean noLocal;
60   /** <code>true</code> if the consumer is closed. */
61   protected boolean closed = false;
62
63   /** The session the consumer belongs to. */
64   Session sess;
65   /**
66    * The consumer server side target is either a queue or a subscription on
67    * its proxy.
68    */

69   String JavaDoc targetName;
70   /** <code>true</code> if the consumer is a queue consumer. */
71   boolean queueMode;
72
73   /**
74    * Constructs a consumer.
75    *
76    * @param sess The session the consumer belongs to.
77    * @param dest The destination the consumer gets messages from.
78    * @param selector Selector for filtering messages.
79    * @param subName The durableSubscriber subscription's name, if any.
80    * @param noLocal <code>true</code> for a subscriber not wishing to consume
81    * messages produced by its connection.
82    *
83    * @exception InvalidSelectorException If the selector syntax is invalid.
84    * @exception IllegalStateException If the connection is broken.
85    * @exception JMSException If the creation fails for any other reason.
86    */

87   MessageConsumer(Session sess, Destination dest, String JavaDoc selector,
88                   String JavaDoc subName, boolean noLocal) throws JMSException
89   {
90     if (dest == null)
91       throw new InvalidDestinationException("Invalid null destination.");
92
93     if (dest instanceof TemporaryQueue) {
94       Connection tempQCnx = ((TemporaryQueue) dest).getCnx();
95
96       if (tempQCnx == null || ! tempQCnx.equals(sess.cnx))
97         throw new JMSSecurityException("Forbidden consumer on this "
98                                        + "temporary destination.");
99     }
100     else if (dest instanceof TemporaryTopic) {
101       Connection tempTCnx = ((TemporaryTopic) dest).getCnx();
102     
103       if (tempTCnx == null || ! tempTCnx.equals(sess.cnx))
104         throw new JMSSecurityException("Forbidden consumer on this "
105                                        + "temporary destination.");
106     }
107
108     // If the destination is a topic, the consumer is a subscriber:
109
if (dest instanceof Topic) {
110       if (subName == null) {
111         subName = sess.cnx.nextSubName();
112         durableSubscriber = false;
113       }
114       else
115         durableSubscriber = true;
116
117       sess.cnx.syncRequest(new ConsumerSubRequest(dest.getName(),
118                                                   subName,
119                                                   selector,
120                                                   noLocal,
121                                                   durableSubscriber));
122       targetName = subName;
123       this.noLocal = noLocal;
124       queueMode = false;
125     }
126     else {
127       targetName = dest.getName();
128       queueMode = true;
129     }
130
131     this.sess = sess;
132     this.dest = dest;
133     this.selector = selector;
134
135     sess.consumers.addElement(this);
136
137     if (JoramTracing.dbgClient)
138       JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
139   }
140
141   /**
142    * Constructs a consumer.
143    *
144    * @param sess The session the consumer belongs to.
145    * @param dest The destination the consumer gets messages from.
146    * @param selector Selector for filtering messages.
147    *
148    * @exception InvalidSelectorException If the selector syntax is invalid.
149    * @exception IllegalStateException If the connection is broken.
150    * @exception JMSException If the creation fails for any other reason.
151    */

152   MessageConsumer(Session sess, Destination dest,
153                   String JavaDoc selector) throws JMSException
154   {
155     this(sess, dest, selector, null, false);
156   }
157
158   /** Returns a string view of this consumer. */
159   public String JavaDoc toString()
160   {
161     return "Consumer:" + sess.ident;
162   }
163
164   /**
165    * API method.
166    * <p>
167    * This method must not be called if the connection the consumer belongs to
168    * is started, because the session would then be accessed by the thread
169    * calling this method and by the thread controlling asynchronous deliveries.
170    * This situation is clearly forbidden by the single threaded nature of
171    * sessions. Moreover, unsetting a message listener without stopping the
172    * connection may lead to the situation where asynchronous deliveries would
173    * arrive on the connection, the session or the consumer without being
174    * able to reach their target listener!
175    *
176    * @exception IllegalStateException If the consumer is closed, or if the
177    * connection is broken.
178    * @exception JMSException If the request fails for any other reason.
179    * @exception IllegalStateException If the consumer is closed.
180    */

181   public void setMessageListener(MessageListener messageListener)
182               throws JMSException
183   {
184     if (closed)
185       throw new IllegalStateException JavaDoc("Forbidden call on a closed consumer.");
186
187     if (JoramTracing.dbgClient)
188       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
189                        + ": setting MessageListener to "
190                        + messageListener);
191
192     if (sess.cnx.started && JoramTracing.dbgClient)
193       JoramTracing.log(JoramTracing.WARN, this + ": improper call"
194                        + " on a started connection.");
195     
196     // If unsetting the listener:
197
if (this.messageListener != null && messageListener == null) {
198       if (JoramTracing.dbgClient)
199         JoramTracing.log(JoramTracing.DEBUG, this + ": unsets"
200                          + " listener request.");
201
202       sess.cnx.requestsTable.remove(pendingReq.getKey());
203
204       this.messageListener = messageListener;
205       sess.msgListeners--;
206
207       ConsumerUnsetListRequest unsetLR = null;
208       if (queueMode) {
209         unsetLR = new ConsumerUnsetListRequest(true);
210         unsetLR.setCancelledRequestId(pendingReq.getRequestId());
211       }
212       else {
213         unsetLR = new ConsumerUnsetListRequest(false);
214         unsetLR.setTarget(targetName);
215       }
216
217       try {
218         sess.cnx.syncRequest(unsetLR);
219       }
220       // A JMSException might be caught if the connection is broken.
221
catch (JMSException jE) {}
222       pendingReq = null;
223
224       // Stopping the daemon if not needed anymore:
225
if (sess.msgListeners == 0 && sess.started) {
226         if (JoramTracing.dbgClient)
227           JoramTracing.log(JoramTracing.DEBUG, this + ": stops the"
228                            + " session daemon.");
229         sess.daemon.stop();
230         sess.daemon = null;
231         sess.started = false;
232       }
233     }
234     // Else, if setting a new listener:
235
else if (this.messageListener == null && messageListener != null) {
236       sess.msgListeners++;
237
238       if (sess.msgListeners == 1
239           && (sess.started || sess.cnx.started)) {
240         if (JoramTracing.dbgClient)
241           JoramTracing.log(JoramTracing.DEBUG, this + ": starts the"
242                            + " session daemon.");
243         sess.daemon = new SessionDaemon(sess);
244         sess.daemon.setDaemon(false);
245         sess.daemon.start();
246         sess.started = true;
247       }
248
249       this.messageListener = messageListener;
250       pendingReq = new ConsumerSetListRequest(targetName, selector, queueMode);
251       pendingReq.setRequestId(sess.cnx.nextRequestId());
252       sess.cnx.requestsTable.put(pendingReq.getKey(), this);
253       sess.cnx.asyncRequest(pendingReq);
254     }
255
256     if (JoramTracing.dbgClient)
257       JoramTracing.log(JoramTracing.DEBUG, this + ": MessageListener"
258                        + " set.");
259   }
260
261   /**
262    * API method.
263    *
264    * @exception IllegalStateException If the consumer is closed.
265    */

266   public MessageListener getMessageListener() throws JMSException
267   {
268     if (closed)
269       throw new IllegalStateException JavaDoc("Forbidden call on a closed consumer.");
270
271     return messageListener;
272   }
273
274   /**
275    * API method.
276    *
277    * @exception IllegalStateException If the consumer is closed.
278    */

279   public String JavaDoc getMessageSelector() throws JMSException
280   {
281     if (closed)
282       throw new IllegalStateException JavaDoc("Forbidden call on a closed consumer.");
283
284     return selector;
285   }
286
287   /**
288    * API method implemented in subclasses.
289    *
290    * @exception IllegalStateException If the consumer is closed, or if the
291    * connection is broken.
292    * @exception JMSSecurityException If the requester is not a READER on the
293    * destination.
294    * @exception JMSException If the request fails for any other reason.
295    */

296   public Message receive(long timeOut) throws JMSException
297   {
298     // Synchronizing with a possible "close".
299
synchronized(this) {
300       if (JoramTracing.dbgClient)
301         JoramTracing.log(JoramTracing.DEBUG, "--- " + this
302                          + ": requests to receive a message.");
303
304       if (closed)
305         throw new IllegalStateException JavaDoc("Forbidden call on a closed consumer.");
306
307       if (messageListener != null) {
308         if (JoramTracing.dbgClient)
309           JoramTracing.log(JoramTracing.WARN, "Improper call as a"
310                            + " listener exists for this consumer.");
311       }
312       else if (sess.msgListeners > 0) {
313         if (JoramTracing.dbgClient)
314           JoramTracing.log(JoramTracing.WARN, "Improper call as"
315                            + " asynchronous consumers have already"
316                            + " been set on the session.");
317       }
318       pendingReq = new ConsumerReceiveRequest(targetName, selector, timeOut,
319                                               queueMode);
320       pendingReq.setRequestId(sess.cnx.nextRequestId());
321       receiving = true;
322
323       // In case of a timer, scheduling the receive:
324
if (timeOut > 0) {
325         replyingTask = new ConsumerReplyTask(pendingReq);
326         sess.schedule(replyingTask, timeOut);
327       }
328     }
329
330     // Expecting an answer:
331
ConsumerMessages reply =
332      (ConsumerMessages) sess.cnx.syncRequest(pendingReq);
333
334     // Synchronizing again with a possible "close":
335
synchronized(this) {
336       receiving = false;
337       pendingReq = null;
338       if (replyingTask != null)
339         replyingTask.cancel();
340       if (JoramTracing.dbgClient)
341         JoramTracing.log(JoramTracing.DEBUG, this + ": received a"
342                          + " reply.");
343
344       Vector JavaDoc msgs = reply.getMessages();
345       if (msgs != null && ! msgs.isEmpty()) {
346         com.scalagent.kjoram.messages.Message msg =
347           (com.scalagent.kjoram.messages.Message) msgs.elementAt(0);
348         String JavaDoc msgId = msg.getIdentifier();
349         // Auto ack: acknowledging the message:
350
if (sess.autoAck)
351           sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
352                                                        queueMode));
353         // Session ack: passing the id for later ack or deny:
354
else
355           sess.prepareAck(targetName, msgId, queueMode);
356
357         return Message.wrapMomMessage(sess, msg);
358       }
359       else
360         return null;
361       }
362     }
363
364   /**
365    * API method.
366    *
367    * @exception IllegalStateException If the consumer is closed, or if the
368    * connection is broken.
369    * @exception JMSSecurityException If the requester is not a READER on the
370    * destination.
371    * @exception JMSException If the request fails for any other reason.
372    */

373   public Message receive() throws JMSException
374   {
375     return receive(0);
376   }
377
378   /**
379    * API method.
380    *
381    * @exception IllegalStateException If the consumer is closed, or if the
382    * connection is broken.
383    * @exception JMSSecurityException If the requester is not a READER on the
384    * destination.
385    * @exception JMSException If the request fails for any other reason.
386    */

387   public Message receiveNoWait() throws JMSException
388   {
389     return receive(-1);
390   }
391
392   /**
393    * API method.
394    *
395    * @exception JMSException Actually never thrown.
396    */

397   public void close() throws JMSException
398   {
399     // Ignoring the call if consumer is already closed:
400
if (closed)
401       return;
402
403     if (JoramTracing.dbgClient)
404       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
405                        + ": closing...");
406
407     // Synchronizig with a possible receive() or onMessage() ongoing process.
408
syncro();
409
410     // Removing this resource's reference from everywhere:
411
Object JavaDoc lock = null;
412     if (pendingReq != null)
413       lock = sess.cnx.requestsTable.remove(pendingReq.getKey());
414     sess.consumers.removeElement(this);
415
416     // Unsetting the listener, if any:
417
try {
418       if (messageListener != null) {
419         if (JoramTracing.dbgClient)
420           JoramTracing.log(JoramTracing.DEBUG, "Unsetting listener.");
421
422         if (queueMode) {
423           ConsumerUnsetListRequest unsetLR =
424             new ConsumerUnsetListRequest(true);
425           unsetLR.setCancelledRequestId(pendingReq.getRequestId());
426           sess.cnx.syncRequest(unsetLR);
427         }
428       }
429
430       if (durableSubscriber)
431         sess.cnx.syncRequest(new ConsumerCloseSubRequest(targetName));
432       else if (! queueMode)
433         sess.cnx.syncRequest(new ConsumerUnsubRequest(targetName));
434     }
435     // A JMSException might be caught if the connection is broken.
436
catch (JMSException jE) {}
437
438     // In the case of a pending "receive" request, replying by a null to it:
439
if (lock != null && receiving) {
440       if (JoramTracing.dbgClient)
441         JoramTracing.log(JoramTracing.DEBUG, "Replying to the"
442                          + " pending receive "
443                          + pendingReq.getRequestId()
444                          + " with a null message.");
445
446       sess.cnx.repliesTable.put(pendingReq.getKey(), new ConsumerMessages());
447
448       synchronized(lock) {
449         lock.notify();
450       }
451     }
452
453     // Synchronizing again:
454
syncro();
455
456     closed = true;
457     
458     if (JoramTracing.dbgClient)
459       JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
460   }
461
462   /**
463    * Returns when the consumer isn't busy executing a "onMessage" or a
464    * "receive" anymore; method called for synchronization purposes.
465    */

466   synchronized void syncro() {}
467   
468   /**
469    * Method called by the session daemon for passing an asynchronous message
470    * delivery to the listener.
471    */

472   synchronized void onMessage(com.scalagent.kjoram.messages.Message message)
473   {
474     String JavaDoc msgId = message.getIdentifier();
475
476     try {
477       // If the listener has been unset without having stopped the
478
// connection, this case might happen:
479
if (messageListener == null) {
480         if (JoramTracing.dbgClient)
481           JoramTracing.log(JoramTracing.WARN, this + ": an"
482                            + " asynchronous delivery arrived"
483                            + " for an improperly unset listener:"
484                            + " denying the message.");
485         sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
486                                                      queueMode, true));
487       }
488       else {
489         // In session ack mode, preparing later ack or deny:
490
if (! sess.autoAck)
491           sess.prepareAck(targetName, msgId, queueMode);
492
493         try {
494           messageListener.onMessage(Message.wrapMomMessage(sess, message));
495           // Auto ack: acknowledging the message:
496
if (sess.autoAck)
497             sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
498                                                          queueMode));
499         }
500         // Catching a JMSException means that the building of the Joram
501
// message went wrong: denying as expected by the spec:
502
catch (JMSException jE) {
503           JoramTracing.log(JoramTracing.ERROR, this
504                            + ": error while processing the"
505                            + " received message: " + jE);
506           
507           if (queueMode)
508             sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
509                                                          queueMode));
510           else
511             sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
512                                                           queueMode));
513         }
514         // Catching a RuntimeException means that the client onMessage() code
515
// is incorrect; denying as expected by the JMS spec:
516
catch (RuntimeException JavaDoc rE) {
517           JoramTracing.log(JoramTracing.ERROR, this
518                            + ": RuntimeException thrown"
519                            + " by the listener: " + rE);
520
521           if (sess.autoAck && queueMode)
522             sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
523                                                          queueMode));
524           else if (sess.autoAck && ! queueMode)
525             sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
526                                                           queueMode));
527         }
528         // Sending a new request if queue mode:
529
if (queueMode) {
530           pendingReq = new ConsumerSetListRequest(targetName, selector, true);
531           pendingReq.setRequestId(sess.cnx.nextRequestId());
532           sess.cnx.requestsTable.put(pendingReq.getKey(), this);
533           sess.cnx.asyncRequest(pendingReq);
534         }
535       }
536     }
537     // Catching an IllegalStateException means that the acknowledgement or
538
// denying went wrong because the connection has been lost. Nothing more
539
// can be done here.
540
catch (JMSException jE) {
541       JoramTracing.log(JoramTracing.ERROR, this + ": " + jE);
542     }
543   }
544
545   /**
546    * The <code>ConsumerReplyTask</code> class is used by "receive" requests
547    * with timer for taking care of answering them if the timer expires.
548    */

549   private class ConsumerReplyTask extends TimerTask
550   {
551     /** The request to answer. */
552     private AbstractJmsRequest request;
553     /** The reply to put in the connection's table. */
554     private ConsumerMessages nullReply;
555
556     /**
557      * Constructs a <code>ConsumerReplyTask</code> instance.
558      *
559      * @param requestId The request to answer.
560      */

561     ConsumerReplyTask(AbstractJmsRequest request)
562     {
563       this.request = request;
564       this.nullReply = new ConsumerMessages(request.getRequestId(),
565                                             targetName,
566                                             queueMode);
567     }
568
569     /**
570      * Method called when the timer expires, actually putting a null answer
571      * in the replies table and unlocking the requester.
572      */

573     public void run()
574     {
575       try {
576         if (JoramTracing.dbgClient)
577           JoramTracing.log(JoramTracing.WARN, "Receive request" +
578                            " answered because timer expired");
579
580         Lock lock = (Lock) sess.cnx.requestsTable.remove(request.getKey());
581
582         if (lock == null)
583           return;
584
585         synchronized (lock) {
586           sess.cnx.repliesTable.put(request.getKey(), nullReply);
587           lock.notify();
588         }
589       }
590       catch (Exception JavaDoc e) {}
591     }
592   }
593 }
594
Popular Tags