KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > client > JmsMessageConsumer


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright 2001 Dan Greff
20  */

21 package com.presumo.jms.client;
22
23 import com.presumo.util.log.Logger;
24 import com.presumo.util.log.LoggerFactory;
25 import com.presumo.jms.selector.JmsOperand;
26 import com.presumo.jms.selector.Parser;
27 import com.presumo.jms.resources.Resources;
28
29 import java.util.LinkedList JavaDoc;
30
31 import javax.jms.IllegalStateException JavaDoc;
32 import javax.jms.InvalidSelectorException JavaDoc;
33 import javax.jms.JMSException JavaDoc;
34 import javax.jms.Message JavaDoc;
35 import javax.jms.MessageConsumer JavaDoc;
36 import javax.jms.MessageListener JavaDoc;
37 import javax.jms.Session JavaDoc;
38
39 /**
40  * Implementation of the interface <code>javax.jms.MessageConsumer</code>.
41  * All code common to TopicSubscribers and QueueReceivers is contained
42  * within this class.
43  *
44  * @see javax.jms.MessageConsumer
45  * @author Dan Greff
46  */

47 public abstract class JmsMessageConsumer implements MessageConsumer JavaDoc
48 {
49
50     /////////////////////////////////////////////////////////////////////////
51
// Protected Instance Variables //
52
/////////////////////////////////////////////////////////////////////////
53
protected String JavaDoc selector;
54   protected final JmsSession mySession;
55   protected MessageListener JavaDoc messageListener;
56   protected LinkedList JavaDoc inbox;
57   protected volatile boolean closed;
58   
59     /////////////////////////////////////////////////////////////////////////
60
// Private Instance Variables //
61
/////////////////////////////////////////////////////////////////////////
62

63   /*
64    * Lock used to synchronize the inbox. It is a string with a useful
65    * name for so that you can tell what the lock is in a threaddump.
66    */

67   private final Object JavaDoc lock = new String JavaDoc("JmsMessageConsumer Lock");
68
69   private int consumerID;
70
71     ////////////////////////////////////////////////////////////////////////
72
// Constructors //
73
////////////////////////////////////////////////////////////////////////
74
JmsMessageConsumer(JmsSession session, String JavaDoc selector)
75     throws JMSException JavaDoc
76   {
77     logger.entry("JmsMessageConsumer");
78     
79     checkSelector(selector);
80     this.mySession = session;
81     this.selector = selector;
82     this.inbox = new LinkedList JavaDoc();
83
84     
85     // note: subclass constructors must add the consumer to the session
86
// once their initilization is complete.
87

88     logger.exit("JmsMessageConsumer");
89   }
90
91  
92
93     /////////////////////////////////////////////////////////////////////////
94
// Public Methods //
95
/////////////////////////////////////////////////////////////////////////
96

97
98   /**
99    * @see javax.jms.MessageConsumer#getMessageSelector
100    */

101   public final String JavaDoc getMessageSelector() throws JMSException JavaDoc
102   {
103     return this.selector;
104   }
105
106   
107   /**
108    * @see javax.jms.MessageConsumer#getMessageListener
109    */

110   public final MessageListener JavaDoc getMessageListener()
111   {
112     return this.messageListener;
113   }
114   
115   
116   /**
117    * @see javax.jms.MessageConsumer#setMessageListener
118    */

119   public final void setMessageListener(MessageListener JavaDoc listener)
120     throws JMSException JavaDoc
121   {
122     logger.entry("setMessageListener");
123     
124     if (listener == null)
125       throw new IllegalArgumentException JavaDoc("Attempt to register null MessageListener");
126
127     // If we didn't have a listener before we need to tell our session that
128
// it now has an asynchronous listener.
129
if (messageListener == null && listener != null) {
130       mySession.addAsynch();
131     }
132
133     this.messageListener = listener;
134     
135     logger.exit("setMessageListener");
136   }
137
138
139   /**
140    * @see javax.jms.MessageConsumer#receive
141    */

142   public final Message JavaDoc receive() throws JMSException JavaDoc
143   {
144     logger.entry("receive()");
145     
146     if (closed == true)
147       throw new IllegalStateException JavaDoc("receive called on a closed consumer");
148     
149     if (messageListener != null || mySession.hasAsynchronousListeners())
150       throw new IllegalStateException JavaDoc
151         ("receive called on a Session with an asynchronous listener");
152
153     Message JavaDoc message = null;
154     
155     synchronized (lock) {
156       
157       while (message == null && closed == false) {
158         if (inbox.size() != 0) {
159           message = (Message JavaDoc) inbox.removeFirst();
160         }
161         else {
162           try {
163             lock.wait();
164           } catch (InterruptedException JavaDoc ie) {}
165         }
166       }
167     }
168        
169     
170     if (mySession.autoAcknowledge()) {
171       mySession.acknowledge();
172     }
173     
174     logger.exit("receive()");
175     return message;
176   }
177
178
179   /**
180    * @see javax.jms.MessageConsumer#receive
181    */

182   public final Message JavaDoc receive(long timeout) throws JMSException JavaDoc
183   {
184     logger.entry("receive(long)");
185     
186     if (closed == true)
187       throw new IllegalStateException JavaDoc("receive called on a closed consumer");
188     
189     if (messageListener != null || mySession.hasAsynchronousListeners())
190       throw new IllegalStateException JavaDoc
191         ("receive called on a Session with an asynchronous listener");
192
193     Message JavaDoc message = null;
194     
195     synchronized (lock) {
196       
197       if (inbox.size() != 0)
198         message = (Message JavaDoc) inbox.removeFirst();
199       else {
200         try {
201           lock.wait(timeout);
202         } catch (InterruptedException JavaDoc ie) {}
203         
204         if (inbox.size() != 0)
205           message = (Message JavaDoc) inbox.removeFirst();
206       }
207     }
208     
209     
210     if (mySession.autoAcknowledge()) {
211       mySession.acknowledge();
212     }
213
214     logger.exit("receive(long)", message);
215     return message;
216   }
217
218
219   /**
220    * @see javax.jms.MessageConsumer#receiveNoWait
221    */

222   public final Message JavaDoc receiveNoWait() throws JMSException JavaDoc
223   {
224     // Can't put the value to zero since this will cause the value
225
// to be ignored.
226
return this.receive(1);
227   }
228
229
230   /**
231    * @see javax.jms.MessageConsumer#close
232    */

233   public final void close() throws JMSException JavaDoc
234   {
235     logger.entry("close");
236     
237     if (!closed) {
238       // End any pending receives
239
synchronized (lock) {
240         closed = true;
241         
242         mySession.removeConsumer(this);
243         if (messageListener != null) {
244           mySession.removeAsynch();
245           messageListener = null;
246         }
247         else {
248           lock.notifyAll();
249         }
250       }
251     }
252     
253     logger.exit("close");
254   }
255
256   
257   /**
258    * @secret
259    *
260    * Used by the routers for bookkeeping.
261    */

262   public final int getConsumerID()
263   {
264     return this.consumerID;
265   }
266   
267   /**
268    * @secret
269    *
270    * Used by the routers for bookkeeping.
271    */

272   public final void setConsumerID(int id)
273   {
274     this.consumerID = id;
275   }
276     
277     ///////////////////////////////////////////////////////////////////////////
278
// Package Methods //
279
///////////////////////////////////////////////////////////////////////////
280
abstract JmsOperand getFilter();
281   
282   /**
283    * Called by the session when it has a message to give to the consumer.
284    */

285   final void takeMessage(Message JavaDoc message)
286   {
287     logger.entry("consumer-takeMessage", message);
288
289     synchronized (lock) {
290
291       if (closed) return;
292            
293       if (messageListener == null) {
294         inbox.add(message);
295         lock.notifyAll();
296       }
297       else {
298       
299         // Messages could have been delivered before the setMessageListener
300
// was called, simply dequeue them here. Null out the inbox also
301
// since there is no way to remove a listener from a consumer. This
302
// should only be executed once
303
if (inbox != null) {
304           while (inbox.size() > 0 ) {
305             try {
306               messageListener.onMessage((Message JavaDoc)inbox.removeFirst());
307     
308               if (mySession.autoAcknowledge()) {
309                 mySession.acknowledge();
310               }
311             } catch (RuntimeException JavaDoc e) { mySession.reportException(e); }
312           }
313           inbox = null;
314         }
315
316         // Normal case of just handing the message off to the MessageListener
317
try {
318           messageListener.onMessage(message);
319
320           if (mySession.autoAcknowledge()) {
321             mySession.acknowledge();
322           }
323         } catch (RuntimeException JavaDoc e) { mySession.reportException(e); }
324       }
325     }
326     
327     logger.exit("consumer-takeMessage");
328   }
329     ///////////////////////////////////////////////////////////////////////////
330
// Protected Methods //
331
///////////////////////////////////////////////////////////////////////////
332

333   /**
334    * Used by QueueReceivers and DurableSubscriptions to interact with the
335    * QueueManager.
336    */

337   protected final String JavaDoc createUniqueID()
338   {
339     throw new RuntimeException JavaDoc("Not implemented");
340   }
341  
342   /**
343    * Used by QueueReceivers and DurableSubscriptions to interact with the
344    * QueueManager.
345    */

346   protected final String JavaDoc generateSystemFilter(String JavaDoc queueName, String JavaDoc id)
347   {
348     StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
349     buffer.append("QueueName = \'");
350     buffer.append(queueName);
351     buffer.append("\' AND ReceiverID = \'");
352     buffer.append(id);
353     buffer.append('\'');
354     return buffer.toString();
355   }
356   
357
358   
359     ///////////////////////////////////////////////////////////////////////////
360
// Private Methods //
361
///////////////////////////////////////////////////////////////////////////
362

363   /**
364    * Tests the validity of a user supplied selector.
365    */

366   private void checkSelector(String JavaDoc selector)
367     throws InvalidSelectorException JavaDoc
368   {
369     if (selector != null && selector.length() != 0) {
370       Parser parser = Parser.getInstance();
371       JmsOperand o = parser.parseFilter(selector);
372       parser.delete(o);
373     }
374   }
375   
376   ////////////////////////////// Misc stuff ////////////////////////////////
377

378   private static Logger logger =
379     LoggerFactory.getLogger(JmsMessageConsumer.class, Resources.getBundle());
380
381   ///////////////////////////////////////////////////////////////////////////
382

383 }
384
385
386
387
388
389
Popular Tags