KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > MessageConsumer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.client.jms;
25
26 import javax.jms.IllegalStateException JavaDoc;
27 import javax.jms.InvalidDestinationException JavaDoc;
28 import javax.jms.InvalidSelectorException JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.JMSSecurityException JavaDoc;
31
32 import org.objectweb.joram.shared.client.ConsumerCloseSubRequest;
33 import org.objectweb.joram.shared.client.ConsumerSubRequest;
34 import org.objectweb.joram.shared.client.ConsumerUnsubRequest;
35
36 import org.objectweb.joram.shared.selectors.ClientSelector;
37
38 import org.objectweb.util.monolog.api.BasicLevel;
39 import org.objectweb.joram.shared.JoramTracing;
40
41 /**
42  * Implements the <code>javax.jms.MessageConsumer</code> interface.
43  */

44 public class MessageConsumer implements javax.jms.MessageConsumer JavaDoc {
45   /**
46    * Status of the message consumer.
47    */

48   private static class Status {
49     /**
50      * Status of the message consumer
51      * when it is open. It is the initial state.
52      */

53     public static final int OPEN = 0;
54     
55     /**
56      * Status of the message consumer when it is
57      * closed.
58      */

59     public static final int CLOSE = 1;
60     
61     private static final String JavaDoc[] names = {
62       "OPEN", "CLOSE"};
63     
64     public static String JavaDoc toString(int status) {
65       return names[status];
66     }
67   }
68
69   /** The selector for filtering messages. */
70   String JavaDoc selector;
71
72   /** <code>true</code> for a durable subscriber. */
73   private boolean durableSubscriber;
74
75   /** The destination the consumer gets its messages from. */
76   protected Destination dest;
77
78   /**
79    * <code>true</code> if the subscriber does not wish to consume messages
80    * produced by its connection.
81    */

82   protected boolean noLocal;
83
84   /** The session the consumer belongs to. */
85   protected Session sess;
86
87   /**
88    * The consumer server side target is either a queue or a subscription on
89    * its proxy.
90    */

91   String JavaDoc targetName;
92
93   /** <code>true</code> if the consumer is a queue consumer. */
94   boolean queueMode;
95
96   /**
97    * Message listener context (null if no message listener).
98    */

99   private MessageConsumerListener mcl;
100
101   /**
102    * Status of the message consumer
103    * OPEN, CLOSE
104    */

105   private int status;
106   
107   /**
108    * Used to synchonize the
109    * method close()
110    */

111   private Closer closer;
112
113   /**
114    * Constructs a consumer.
115    *
116    * @param sess The session the consumer belongs to.
117    * @param dest The destination the consumer gets messages from.
118    * @param selector Selector for filtering messages.
119    * @param subName The durableSubscriber subscription's name, if any.
120    * @param noLocal <code>true</code> for a subscriber not wishing to consume
121    * messages produced by its connection.
122    *
123    * @exception InvalidSelectorException If the selector syntax is invalid.
124    * @exception IllegalStateException If the connection is broken, or if the
125    * subscription is durable and already
126    * activated.
127    * @exception JMSException Generic exception.
128    */

129   MessageConsumer(Session sess,
130                   Destination dest,
131                   String JavaDoc selector,
132                   String JavaDoc subName,
133                   boolean noLocal) throws JMSException JavaDoc {
134     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
135       JoramTracing.dbgClient.log(
136         BasicLevel.DEBUG,
137         "MessageConsumer.<init>(" +
138         sess + ',' + dest + ',' + selector + ',' +
139         subName + ',' + noLocal + ')');
140     
141     if (dest == null)
142       throw new InvalidDestinationException JavaDoc("Invalid null destination.");
143
144     if (dest instanceof TemporaryQueue) {
145       Connection tempQCnx = ((TemporaryQueue) dest).getCnx();
146
147       if (tempQCnx == null || ! tempQCnx.equals(sess.getConnection()))
148         throw new JMSSecurityException JavaDoc("Forbidden consumer on this "
149                                        + "temporary destination.");
150     }
151     else if (dest instanceof TemporaryTopic) {
152       Connection tempTCnx = ((TemporaryTopic) dest).getCnx();
153     
154       if (tempTCnx == null || ! tempTCnx.equals(sess.getConnection()))
155         throw new JMSSecurityException JavaDoc("Forbidden consumer on this "
156                                        + "temporary destination.");
157     }
158
159     try {
160       ClientSelector.checks(selector);
161     } catch (org.objectweb.joram.shared.excepts.SelectorException sE) {
162       throw new InvalidSelectorException JavaDoc("Invalid selector syntax: " + sE);
163     }
164
165     // If the destination is a topic, the consumer is a subscriber:
166
if (dest instanceof javax.jms.Topic JavaDoc) {
167       if (subName == null) {
168         subName = sess.getConnection().nextSubName();
169         durableSubscriber = false;
170       } else {
171         durableSubscriber = true;
172       }
173       sess.syncRequest(
174         new ConsumerSubRequest(dest.getName(),
175                                subName,
176                                selector,
177                                noLocal,
178                                durableSubscriber));
179       targetName = subName;
180       this.noLocal = noLocal;
181       queueMode = false;
182     } else {
183       targetName = dest.getName();
184       queueMode = true;
185     }
186
187     this.sess = sess;
188     this.dest = dest;
189     this.selector = selector;
190     
191     closer = new Closer();
192
193     setStatus(Status.OPEN);
194   }
195
196   /**
197    * Constructs a consumer.
198    *
199    * @param sess The session the consumer belongs to.
200    * @param dest The destination the consumer gets messages from.
201    * @param selector Selector for filtering messages.
202    *
203    * @exception InvalidSelectorException If the selector syntax is invalid.
204    * @exception IllegalStateException If the connection is broken, or if the
205    * subscription is durable and already
206    * activated.
207    * @exception JMSException Generic exception.
208    */

209   MessageConsumer(Session sess,
210                   Destination dest,
211                   String JavaDoc selector) throws JMSException JavaDoc {
212     this(sess, dest, selector, null, false);
213   }
214
215   private synchronized void setStatus(int status) {
216     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
217       JoramTracing.dbgClient.log(
218         BasicLevel.DEBUG,
219         "MessageConsumer.setStatus(" + Status.toString(status) + ')');
220     this.status = status;
221   }
222
223   public final String JavaDoc getTargetName() {
224     return targetName;
225   }
226
227   public final boolean getQueueMode() {
228     return queueMode;
229   }
230
231   protected synchronized void checkClosed()
232     throws IllegalStateException JavaDoc {
233     if (status == Status.CLOSE)
234       throw new IllegalStateException JavaDoc("Forbidden call on a closed consumer.");
235   }
236
237   /** Returns a string view of this consumer. */
238   public String JavaDoc toString() {
239     return "Consumer:" + sess.getId();
240   }
241   
242   /**
243    * Sets the message consumer's MessageListener.
244    * API method.
245    * <p>
246    * This method must not be called if the connection the consumer belongs to
247    * is started, because the session would then be accessed by the thread
248    * calling this method and by the thread controlling asynchronous deliveries.
249    * This situation is clearly forbidden by the single threaded nature of
250    * sessions. Moreover, unsetting a message listener without stopping the
251    * connection may lead to the situation where asynchronous deliveries would
252    * arrive on the connection, the session or the consumer without being
253    * able to reach their target listener!
254    *
255    * @exception IllegalStateException If the consumer is closed, or if the
256    * connection is broken.
257    * @exception JMSException If the request fails for any other reason.
258    */

259   public synchronized void setMessageListener(
260     javax.jms.MessageListener JavaDoc messageListener)
261     throws JMSException JavaDoc {
262     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
263       JoramTracing.dbgClient.log(
264         BasicLevel.DEBUG,
265         "MessageConsumer.setMessageListener(" +
266         messageListener + ')');
267     checkClosed();
268     if (mcl != null) {
269       if (messageListener == null) {
270         sess.removeMessageListener(mcl, true);
271         mcl = null;
272       } else throw new IllegalStateException JavaDoc(
273         "Message listener not null");
274     } else {
275       if (messageListener != null) {
276         mcl = sess.addMessageListener(
277           new SingleSessionConsumer(
278             queueMode,
279             durableSubscriber,
280             selector,
281             targetName,
282             sess,
283             messageListener,
284             sess.getQueueMessageReadMax(),
285             sess.getTopicActivationThreshold(),
286             sess.getTopicPassivationThreshold(),
287             sess.getTopicAckBufferMax(),
288             sess.getRequestMultiplexer()));
289       }
290       // else idempotent
291
}
292   }
293
294   /**
295    * Gets the message consumer's MessageListener.
296    * API method.
297    *
298    * @exception IllegalStateException If the consumer is closed.
299    */

300   public synchronized javax.jms.MessageListener JavaDoc getMessageListener()
301     throws JMSException JavaDoc {
302     checkClosed();
303     if (mcl == null) return null;
304     return mcl.getMessageListener();
305   }
306
307   /**
308    * Gets this message consumer's message selector expression.
309    * API method.
310    *
311    * @exception IllegalStateException If the consumer is closed.
312    */

313   public final String JavaDoc getMessageSelector()
314     throws JMSException JavaDoc {
315     checkClosed();
316     return selector;
317   }
318
319   /**
320    * Receives the next message that arrives before the specified timeout.
321    * API method.
322    *
323    * @exception IllegalStateException If the consumer is closed, or if the
324    * connection is broken.
325    * @exception JMSSecurityException If the requester is not a READER on the
326    * destination.
327    * @exception JMSException If the request fails for any other reason.
328    */

329   public javax.jms.Message JavaDoc receive(long timeOut)
330     throws JMSException JavaDoc {
331     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
332       JoramTracing.dbgClient.log(
333         BasicLevel.DEBUG,
334         "MessageConsumer.receive(" + timeOut + ')');
335     checkClosed();
336     return sess.receive(timeOut, timeOut, this,
337                         targetName, selector, queueMode);
338   }
339
340   /**
341    * Receives the next message produced for this message consumer.
342    * API method.
343    *
344    * @exception IllegalStateException If the consumer is closed, or if the
345    * connection is broken.
346    * @exception JMSSecurityException If the requester is not a READER on the
347    * destination.
348    * @exception JMSException If the request fails for any other reason.
349    */

350   public javax.jms.Message JavaDoc receive()
351     throws JMSException JavaDoc {
352     return receive(0);
353   }
354
355   /**
356    * Receives the next message if one is immediately available.
357    * API method.
358    *
359    * @exception IllegalStateException If the consumer is closed, or if the
360    * connection is broken.
361    * @exception JMSSecurityException If the requester is not a READER on the
362    * destination.
363    * @exception JMSException If the request fails for any other reason.
364    */

365   public javax.jms.Message JavaDoc receiveNoWait()
366     throws JMSException JavaDoc {
367     checkClosed();
368     if (sess.getConnection().isStopped()) {
369       return null;
370     } else {
371       return sess.receive(-1, 0, this,
372                           targetName, selector, queueMode);
373     }
374   }
375
376   /**
377    * API method.
378    *
379    * @exception JMSException
380    */

381   public void close() throws JMSException JavaDoc {
382     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
383       JoramTracing.dbgClient.log(
384         BasicLevel.DEBUG,
385         "MessageConsumer.close()");
386     closer.close();
387   }
388
389   /**
390    * This class synchronizes the close.
391    * Close can't be synchronized with 'this'
392    * because the MessageConsumer must be accessed
393    * concurrently during its closure. So
394    * we need a second lock.
395    */

396   class Closer {
397     synchronized void close() throws JMSException JavaDoc {
398       doClose();
399     }
400   }
401
402   void doClose() throws JMSException JavaDoc {
403     synchronized (this) {
404       if (status == Status.CLOSE)
405         return;
406       // The status must be changed before
407
// the call to Session.closeConsumer
408
// in order to enable Session.preReceive
409
// to check if the consumer has been closed.
410
setStatus(Status.CLOSE);
411     }
412     
413     if (!queueMode) {
414       // For a topic, remove the subscription.
415
if (durableSubscriber) {
416         try {
417           sess.syncRequest(new ConsumerCloseSubRequest(targetName));
418         } catch (JMSException JavaDoc exc) {
419           if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
420             JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);
421         }
422       } else {
423         try {
424           sess.syncRequest(new ConsumerUnsubRequest(targetName));
425         } catch (JMSException JavaDoc exc) {
426           if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
427             JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);
428         }
429       }
430     }
431     
432     sess.closeConsumer(this);
433     
434     if (mcl != null) {
435       // Stop the listener.
436
sess.removeMessageListener(mcl, false);
437     }
438   }
439
440   void activateMessageInput() throws JMSException JavaDoc {
441     if (mcl != null)
442       mcl.activateMessageInput();
443   }
444
445   void passivateMessageInput() throws JMSException JavaDoc {
446     if (mcl != null)
447       mcl.passivateMessageInput();
448   }
449 }
450
Popular Tags