KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > impl > AbstractConsumer


1 package com.ubermq.jms.client.impl;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.common.datagram.*;
6 import com.ubermq.jms.common.routing.*;
7 import com.ubermq.jms.common.routing.impl.*;
8 import com.ubermq.kernel.*;
9 import com.ubermq.kernel.overflow.*;
10 import java.io.*;
11 import java.util.*;
12 import javax.jms.*;
13
14 /**
15  * The abstract base class for any message consumer
16  * implementation. Implements logic for starting and
17  * stopping connections, and receiving and enqueuing incoming messages.<P>
18  */

19 abstract class AbstractConsumer
20     implements javax.jms.MessageConsumer JavaDoc,
21     IDatagramEndpoint,
22     IMessageSender,
23     IAcknowledgeHandler
24 {
25     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractConsumer.class);
26     
27     /**
28      * JMS properties
29      */

30     private String JavaDoc selectorValue;
31     private Selector selector;
32     private MessageListener messageListener;
33
34     /**
35      * The session
36      */

37     private Session session;
38     private IClientProcessor proc;
39     private IDeliveryManager delivery;
40
41     private Channel receiveQueue;
42     private boolean isClosing = false;
43
44     private List pausedQueue;
45     private boolean isPaused = false;
46
47     /**
48      * Constructs a consumer.
49      * @param session the session
50      * @param selector the message selector, or null if none
51      * @param delivery the delivery manager for this consumer
52      * @param receiveQueue a Channel in which to store messages queued
53      * for delivery.
54      */

55     AbstractConsumer(Session session,
56                      String JavaDoc selector,
57                      IDeliveryManager delivery,
58                      Channel receiveQueue)
59         throws InvalidSelectorException
60     {
61         if (selector != null &&
62             selector.length() > 0)
63         {
64             this.selectorValue = selector;
65             this.selector = new SimpleSelector(this.selectorValue);
66         }
67         this.session = session;
68         this.proc = session.conn.getClientProcessor();
69         this.delivery = delivery;
70
71         // setup receive q using natural message ordering,
72
// which honors priority, then sender-sequence.
73
this.receiveQueue = receiveQueue;
74
75         // add myself
76
session.addConsumer(this);
77     }
78
79     public String JavaDoc getMessageSelector() {return this.selectorValue;}
80
81     /**
82      * @return the parent session.
83      */

84     protected Session getSession() {return session;}
85
86     /**
87      * @return the parent connection.
88      */

89     protected Connection getConnection() {return session.conn;}
90
91     /**
92      * @return the client processor.
93      */

94     protected IClientProcessor getClientProcessor() {return proc;}
95
96     /**
97      * Get the message consumer's MessageListener.
98      *
99      * @return the listener for the message consumer, or null if there isn't
100      * one set.
101      *
102      * @exception JMSException if JMS fails to get message
103      * listener due to some JMS error
104      */

105     public MessageListener getMessageListener() throws JMSException
106     {
107         return messageListener;
108     }
109
110     /**
111      * Set the message consumer's MessageListener.
112      *
113      * <P>Setting the message listener to null is the equivalent of
114      * unsetting the message listener for the message consumer.
115      *
116      * <P>Calling the setMessageListener method of MessageConsumer
117      * while messages are being consumed by an existing listener
118      * or the consumer is being used to synchronously consume messages
119      * is undefined.
120      *
121      * @param messageListener the messages are delivered to this listener
122      */

123
124     public void setMessageListener( MessageListener listener )
125         throws JMSException
126     {
127         messageListener = listener;
128         session.requestDeliveryThread();
129         session.checkDeliveryThread();
130     }
131
132
133     /**
134      * Receive the next message produced for this message consumer.
135      *
136      * <P>This call blocks indefinitely until a message is produced.
137      *
138      * <P>If this receive is done within a transaction, the message
139      * remains on the consumer until the transaction commits.
140      *
141      * @exception JMSException if JMS fails to receive the next
142      * message due to some error.
143      */

144     public javax.jms.Message JavaDoc receive()
145         throws JMSException
146     {
147         if(isClosing() ||
148            session.isClosing())
149         {
150             throw new javax.jms.IllegalStateException JavaDoc("Subscriber is closed");
151         }
152
153         try {
154             javax.jms.Message JavaDoc m = (javax.jms.Message JavaDoc)receiveQueue.take();
155             return m;
156         } catch(InterruptedException JavaDoc ie) {
157             throw new JMSException(ie.toString());
158         }
159     }
160
161     /**
162      * Receive the next message that arrives within the specified
163      * timeout interval.
164      *
165      * <P>This call blocks until either a message arrives or the
166      * timeout expires.
167      *
168      * @param timeout the timeout value (in milliseconds)
169      *
170      */

171     public javax.jms.Message JavaDoc receive( long timeout ) throws JMSException
172     {
173         if(isClosing() ||
174            session.isClosing())
175         {
176             throw new javax.jms.IllegalStateException JavaDoc("Subscriber is closed");
177         }
178
179         try {
180             javax.jms.Message JavaDoc m = (javax.jms.Message JavaDoc)receiveQueue.poll(timeout);
181             return m;
182         } catch(InterruptedException JavaDoc ie) {
183             throw new JMSException(ie.toString());
184         }
185     }
186
187     /**
188      * Receive the next message if one is immediately available.
189      *
190      * @exception JMSException if JMS fails to receive the next
191      * message due to some error.
192      * @return the next message produced for this message consumer, or
193      * null if one is not available.
194      */

195     public javax.jms.Message JavaDoc receiveNoWait() throws JMSException
196     {
197         if(isClosing() ||
198            session.isClosing() )
199         {
200             throw new javax.jms.IllegalStateException JavaDoc("Subscriber is closed");
201         }
202
203         try {
204             javax.jms.Message JavaDoc m = (javax.jms.Message JavaDoc)receiveQueue.poll(0);
205             return m;
206         } catch(InterruptedException JavaDoc ie) {
207             return null;
208         }
209     }
210
211     public void close() throws JMSException
212     {
213         isClosing = true;
214         session.removeConsumer(this);
215     }
216
217     /**
218      * @return Whether the consumer is closing.
219      */

220     boolean isClosing()
221     {
222         return isClosing;
223     }
224
225     /**
226      * Delivers a datagram to this subscriber.
227      */

228     public void deliver(IDatagram d)
229     {
230         // wrap the datagram with a javax.jms.message
231
// and queue it in our subscriber buffer
232
IMessageDatagram md = (IMessageDatagram)d;
233         javax.jms.Message JavaDoc msg = LocalMessage.getMessage(md, this);
234
235         // do acknowledgement if we are auto ack mode.
236
if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) {
237             try {
238                 internalAcknowledge(md);
239             }
240             catch (IOException e) {
241                 // we are auto ACK, so we can't do anything here.
242
}
243         }
244
245         // also check our local selector, if necessary
246
if (selector != null &&
247             !selector.accept(md))
248         {
249             return;
250         }
251
252         // otherwise we forward it using our gap/dup detector.
253
delivery.deliver(md.getSenderId(),
254                          md.getSequence(),
255                          msg,
256                          this);
257     }
258
259     /**
260      * Sends an acknowledgement for the given message datagram. The default
261      * implementation sends an acknowledgement to the remote side using
262      * <code>outputAndWait</code> for all subscriber types.
263      * <P>
264      * Subclasses may choose not to send acknowledgements in cases where
265      * it is not necessary.
266      * <P>
267      *
268      * @param md the message datagram to acknowledge
269      * @throws IOException if the acknowledgement cannot be sent
270      * due to transport layer problems.
271      */

272     protected void internalAcknowledge(IMessageDatagram md)
273         throws IOException
274     {
275         getConnection().output(getConnection().factories.ackFactory().ack(md.getIncomingMessageId()),
276                                new ExponentialBackoff());
277     }
278
279     /**
280      * Acknowledges a message datagram.
281      */

282     public void acknowledge(IMessageDatagram md)
283         throws IOException, javax.jms.IllegalStateException JavaDoc
284     {
285         if (session.isClosing())
286             throw new javax.jms.IllegalStateException JavaDoc("session is closed");
287
288         if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) {
289             // noop because we have already done this for the client
290
} else if (session.ackMode == TopicSession.CLIENT_ACKNOWLEDGE) {
291             internalAcknowledge(md);
292         } else if (session.ackMode == TopicSession.DUPS_OK_ACKNOWLEDGE) {
293             // never ack here.
294
}
295     }
296
297     /**
298      * Sends a message to this subscriber, by enqueuing it in the session
299      * delivery thread or enqueuing it locally, as required by the current
300      * delivery strategy (async or sync).
301      */

302     public void sendMessage(javax.jms.Message JavaDoc msg)
303     {
304         try {
305             if (messageListener != null)
306                 session.asyncDelivery(msg, this, messageListener);
307             else {
308                 receiveQueue.put(msg);
309             }
310         } catch(Exception JavaDoc ie) {
311             log.error("", ie);
312         }
313     }
314
315     /**
316      * pauses delivery of messages to this subscriber. This
317      * applies when <code>receive</code> methods are being used.
318      */

319     synchronized void pause()
320     {
321         if (!isPaused)
322         {
323             isPaused = true;
324
325             // accumulate the list of things
326
pausedQueue = new ArrayList();
327
328             try
329             {
330                 Object JavaDoc o;
331                 while((o = receiveQueue.poll(0)) != null)
332                 {
333                     pausedQueue.add(o);
334                 }
335             }
336             catch (InterruptedException JavaDoc e) {}
337         }
338     }
339
340     /**
341      * Resumes delivery of messages to this subscriber. This
342      * applies when <code>receive</code> methods are being used.
343      */

344     synchronized void resume()
345     {
346         if (isPaused)
347         {
348             isPaused = false;
349
350             Iterator iter = pausedQueue.iterator();
351             while (iter.hasNext())
352             {
353                 try
354                 {
355                     receiveQueue.offer(iter.next(), 0);
356                 }
357                 catch (InterruptedException JavaDoc e) {}
358                 iter.remove();
359             }
360         }
361     }
362
363 }
364
Popular Tags