KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > MessageConsumerListener


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.client.jms;
24
25 import java.util.Vector JavaDoc;
26
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29
30 import org.objectweb.joram.shared.client.AbstractJmsReply;
31 import org.objectweb.joram.shared.client.ConsumerCloseSubRequest;
32 import org.objectweb.joram.shared.client.ConsumerMessages;
33 import org.objectweb.joram.shared.client.ConsumerSetListRequest;
34 import org.objectweb.joram.shared.client.ConsumerUnsetListRequest;
35 import org.objectweb.joram.shared.client.ConsumerAckRequest;
36 import org.objectweb.joram.shared.client.ActivateConsumerRequest;
37 import org.objectweb.joram.shared.client.ConsumerUnsubRequest;
38 import org.objectweb.joram.client.jms.connection.ReplyListener;
39 import org.objectweb.joram.client.jms.connection.AbortedRequestException;
40 import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
41
42 import fr.dyade.aaa.util.StoppedQueueException;
43
44 import org.objectweb.util.monolog.api.BasicLevel;
45 import org.objectweb.util.monolog.api.Logger;
46 import fr.dyade.aaa.util.Debug;
47
48 /**
49  * This class listens to replies asynchronously returned by the user proxy
50  * for a message consumer.
51  */

52 abstract class MessageConsumerListener implements ReplyListener {
53   
54   public static Logger logger =
55     Debug.getLogger(MessageConsumerListener.class.getName());
56
57   /**
58    * Status of the message consumer listener.
59    */

60   protected static class Status {
61     public static final int INIT = 0;
62     public static final int RUN = 1;
63     public static final int ON_MSG = 2;
64     public static final int CLOSE = 3;
65
66     private static final String JavaDoc[] names = {
67       "INIT", "RUN", "ON_MSG", "CLOSE"};
68
69     public static String JavaDoc toString(int status) {
70       return names[status];
71     }
72   }
73   
74   private static class ReceiveStatus {
75     public static final int INIT = 0;
76     
77     public static final int WAIT_FOR_REPLY = 1;
78
79     public static final int CONSUMING_REPLY = 2;
80
81     private static final String JavaDoc[] names = {
82         "INIT", "WAIT_FOR_REPLY", "CONSUMING_REPLY" };
83
84     public static String JavaDoc toString(int status) {
85       return names[status];
86     }
87   }
88
89   private boolean queueMode;
90   
91   private boolean durable;
92   
93   private String JavaDoc selector;
94   
95   private String JavaDoc targetName;
96
97   /**
98    * The identifier of the subscription request.
99    */

100   private volatile int requestId;
101
102   private int status;
103
104   private Vector JavaDoc messagesToAck;
105   
106   /**
107    * The number of messages which are in queue (Session.qin)
108    * waiting for being consumed.
109    */

110   private volatile int messageCount;
111   
112   /**
113    * The receive status of this message listener:
114    * - WAIT_FOR_REPLY if a reply is expected from the destination
115    * - CONSUMING_REPLY if a reply is being consumed and no new request has
116    * been sent
117    */

118   private volatile int receiveStatus;
119   
120   /**
121    * Indicates whether the topic message input has been passivated or not.
122    */

123   private boolean topicMsgInputPassivated;
124   
125   private int queueMessageReadMax;
126   
127   private RequestMultiplexer rm;
128   
129   private int topicActivationThreshold;
130   
131   private int topicPassivationThreshold;
132   
133   private int topicAckBufferMax;
134   
135   private MessageListener JavaDoc listener;
136   
137   MessageConsumerListener(boolean queueMode,
138                           boolean durable,
139                           String JavaDoc selector,
140                           String JavaDoc targetName,
141                           MessageListener JavaDoc listener,
142                           int queueMessageReadMax,
143                           int topicActivationThreshold,
144                           int topicPassivationThreshold,
145                           int topicAckBufferMax,
146                           RequestMultiplexer reqMultiplexer) {
147     if (logger.isLoggable(BasicLevel.DEBUG))
148       logger.log(BasicLevel.DEBUG,
149                  "MessageConsumerListener(" + queueMode +
150                  ',' + durable + ',' + selector + ',' + targetName +
151                  ',' + listener + ',' + queueMessageReadMax +
152                  ',' + topicActivationThreshold +
153                  ',' + topicPassivationThreshold +
154                  ',' + topicAckBufferMax + ',' + reqMultiplexer + ')');
155     this.queueMode = queueMode;
156     this.durable = durable;
157     this.selector = selector;
158     this.targetName = targetName;
159     this.listener = listener;
160     this.queueMessageReadMax = queueMessageReadMax;
161     this.topicActivationThreshold = topicActivationThreshold;
162     this.topicPassivationThreshold = topicPassivationThreshold;
163     this.topicAckBufferMax = topicAckBufferMax;
164     rm = reqMultiplexer;
165     messagesToAck = new Vector JavaDoc(0);
166     requestId = -1;
167     messageCount = 0;
168     topicMsgInputPassivated = false;
169     setStatus(Status.INIT);
170     setReceiveStatus(ReceiveStatus.INIT);
171   }
172   
173   protected final int getStatus() {
174     return status;
175   }
176
177   protected void setStatus(int status) {
178     if (logger.isLoggable(BasicLevel.DEBUG))
179       logger.log(BasicLevel.DEBUG,
180                  "MessageConsumerListener.setStatus(" + Status.toString(status) + ')');
181     this.status = status;
182   }
183   
184   private void setReceiveStatus(int s) {
185     if (logger.isLoggable(BasicLevel.DEBUG))
186       logger.log(BasicLevel.DEBUG,
187                  "MessageConsumerListener.setReceiveStatus(" + ReceiveStatus.toString(s) + ')');
188     receiveStatus = s;
189   }
190   
191   /**
192    * Decrease the message count.
193    * Synchronized with the method replyReceived() that increments the
194    * messageCount += cm.getMessageCount();
195    *
196    * @return the decreased value
197    */

198   private int decreaseMessageCount(int ackMode) throws JMSException JavaDoc {
199     if (logger.isLoggable(BasicLevel.DEBUG))
200       logger.log(BasicLevel.DEBUG,
201                  "MessageConsumerListener.decreaseMessageCount()");
202     
203     synchronized (this) {
204       messageCount--;
205     }
206     
207     if (queueMode) {
208       boolean subscribe = false;
209       String JavaDoc[] toAck = null;
210       synchronized (this) {
211         if (logger.isLoggable(BasicLevel.DEBUG))
212           logger.log(BasicLevel.DEBUG, " -> messageCount = " + messageCount);
213         // Consume in advance (default is one message in advance)
214
if (messageCount < queueMessageReadMax
215             && receiveStatus == ReceiveStatus.CONSUMING_REPLY) {
216           subscribe = true;
217           if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) {
218             synchronized (messagesToAck) {
219               if (messagesToAck.size() > 0) {
220                 toAck = new String JavaDoc[messagesToAck.size()];
221                 messagesToAck.copyInto(toAck);
222                 messagesToAck.clear();
223               }
224             }
225           }
226         }
227       }
228       if (subscribe) {
229         // out of the synchronized block
230
subscribe(toAck);
231       }
232     } else {
233       synchronized (this) {
234         if (topicMsgInputPassivated) {
235           if (messageCount < topicActivationThreshold) {
236             activateMessageInput();
237             topicMsgInputPassivated = false;
238           }
239         } else {
240           if (messageCount > topicPassivationThreshold) {
241             passivateMessageInput();
242             topicMsgInputPassivated = true;
243           }
244         }
245       }
246     }
247     
248     if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE
249         && messageCount == 0) {
250       // Need to acknowledge the received messages
251
// if we are in lazy mode (DUPS_OK)
252
acknowledge(0);
253     }
254     
255     return messageCount;
256   }
257  
258   /**
259    * Called by Session.
260    */

261   synchronized void start() throws JMSException JavaDoc {
262     if (logger.isLoggable(BasicLevel.DEBUG))
263       logger.log(
264         BasicLevel.DEBUG, "MessageConsumerListener.start()");
265     if (status == Status.INIT) {
266       subscribe(null);
267       setStatus(Status.RUN);
268     } else {
269       // Should not happen
270
throw new IllegalStateException JavaDoc("Status error");
271     }
272   }
273
274   private void subscribe(String JavaDoc[] toAck) throws JMSException JavaDoc {
275     if (logger.isLoggable(BasicLevel.DEBUG))
276       logger.log(
277         BasicLevel.DEBUG, "MessageConsumerListener.subscribe()");
278     
279     ConsumerSetListRequest req =
280       new ConsumerSetListRequest(
281         targetName,
282         selector,
283         queueMode,
284         toAck,
285         queueMessageReadMax);
286     
287     // Change the receive status before sending
288
// the request. subscribe() is not synchronized
289
// so the reply can be received before the end
290
// of this method.
291
setReceiveStatus(ReceiveStatus.WAIT_FOR_REPLY);
292     rm.sendRequest(req, this);
293     requestId = req.getRequestId();
294   }
295
296   /**
297    * Called by Session.
298    */

299   public void close() throws JMSException JavaDoc {
300     if (logger.isLoggable(BasicLevel.DEBUG))
301       logger.log(
302         BasicLevel.DEBUG, "MessageConsumerListener.close()");
303
304     synchronized (this) {
305       while (status == Status.ON_MSG) {
306         try {
307           // Wait for the message listener to return from
308
// onMessage()
309
wait();
310         } catch (InterruptedException JavaDoc exc) {}
311       }
312       
313       if (status == Status.INIT ||
314           status == Status.CLOSE) return;
315       
316       rm.abortRequest(requestId);
317
318       // If session ack mode is DUPS_OK
319
acknowledge(0);
320
321       setStatus(Status.CLOSE);
322     }
323     
324     if (queueMode) {
325       // Out of the synchronized block because it could
326
// lead to a dead lock with
327
// the connection driver thread calling replyReceived.
328
ConsumerUnsetListRequest unsetLR = new ConsumerUnsetListRequest(
329           queueMode);
330       unsetLR.setTarget(targetName);
331       unsetLR.setCancelledRequestId(requestId);
332       rm.sendRequest(unsetLR);
333     }
334     // else useless for a topic
335
// because the subscription
336
// is deleted (see MessageConsumer.close())
337
}
338
339   private void acknowledge(int threshold) {
340     try {
341       synchronized (messagesToAck) {
342         if (messagesToAck.size() > threshold) {
343           ConsumerAckRequest ack = new ConsumerAckRequest(
344               targetName,
345               queueMode);
346           for (int i = 0; i < messagesToAck.size(); i++) {
347             String JavaDoc msgId = (String JavaDoc) messagesToAck.elementAt(i);
348             ack.addId(msgId);
349           }
350           rm.sendRequest(ack);
351           messagesToAck.clear();
352         }
353       }
354     } catch (JMSException JavaDoc exc) {
355       if (logger.isLoggable(BasicLevel.ERROR))
356         logger.log(
357           BasicLevel.ERROR, "", exc);
358     }
359   }
360   
361   /**
362    * Called by RequestMultiplexer.
363    */

364   public synchronized boolean replyReceived(AbstractJmsReply reply)
365     throws AbortedRequestException {
366     if (logger.isLoggable(BasicLevel.DEBUG))
367       logger.log(
368         BasicLevel.DEBUG, "MessageConsumerListener.replyReceived(" +
369         reply + ')');
370     
371     if (status == Status.CLOSE) {
372       throw new AbortedRequestException();
373     } else {
374       if (queueMode) {
375         // 1- Change the status before pushing the
376
// messages into the session queue.
377
setReceiveStatus(ReceiveStatus.CONSUMING_REPLY);
378       }
379       try {
380         ConsumerMessages cm = (ConsumerMessages)reply;
381         // 2- increment messageCount (synchronized)
382
messageCount += cm.getMessageCount();
383         
384         pushMessages(cm);
385       } catch (StoppedQueueException exc) {
386         throw new AbortedRequestException();
387       } catch (JMSException JavaDoc exc) {
388         throw new AbortedRequestException();
389       }
390       if (queueMode) {
391         return true;
392       } else {
393         return false;
394       }
395     }
396   }
397   
398   /**
399    * Pushes the received messages.
400    * Currently two behaviors:
401    * 1- SingleSessionConsumer pushes the message
402    * in a single session (standard JMS)
403    * 2- MultiSessionConsumer pushes the message
404    * in several session (from a session pool)
405    *
406    * @param cm
407    */

408   public abstract void pushMessages(ConsumerMessages cm) throws JMSException JavaDoc;
409   
410   public void replyAborted(int requestId) {
411     // Nothing to do.
412
}
413
414   public synchronized boolean isClosed() {
415     return (status == Status.CLOSE);
416   }
417   
418   public final MessageListener JavaDoc getMessageListener() {
419     return listener;
420   }
421   
422   public final boolean getQueueMode() {
423     return queueMode;
424   }
425   
426   public final String JavaDoc getTargetName() {
427     return targetName;
428   }
429
430   
431   protected void activateListener(
432       Message msg, MessageListener JavaDoc listener, int ackMode)
433     throws JMSException JavaDoc {
434     if (logger.isLoggable(BasicLevel.DEBUG))
435       logger.log(BasicLevel.DEBUG,
436                  "MessageConsumerListener.onMessage(" + msg + ')');
437     
438     // Consume one message
439
decreaseMessageCount(ackMode);
440
441     try {
442       listener.onMessage(msg);
443
444       if (logger.isLoggable(BasicLevel.DEBUG))
445         logger.log(BasicLevel.DEBUG,
446                    " -> consumer.onMessage(" + msg + ") returned");
447     } catch (RuntimeException JavaDoc re) {
448       if (logger.isLoggable(BasicLevel.DEBUG))
449         logger.log(BasicLevel.DEBUG, "", re);
450       JMSException JavaDoc exc = new JMSException JavaDoc(re.toString());
451       exc.setLinkedException(re);
452       throw exc;
453     }
454   }
455   
456   public abstract void onMessage(
457       Message msg, MessageListener JavaDoc listener, int ackMode)
458     throws JMSException JavaDoc;
459   
460   /**
461    * Called by Session (standard JMS, mono-threaded
462    */

463   public void onMessage(Message msg, int ackMode) throws JMSException JavaDoc {
464     if (logger.isLoggable(BasicLevel.DEBUG))
465       logger.log(BasicLevel.DEBUG, "MessageConsumerListener.onMessage(" + msg + ')');
466     if (listener != null) {
467         synchronized (this) {
468           if (status == Status.RUN) {
469             setStatus(Status.ON_MSG);
470           } else {
471             // Notify threads trying to close the listener.
472
notifyAll();
473             throw new javax.jms.IllegalStateException JavaDoc("Message listener closed");
474           }
475         }
476
477       try {
478         activateListener(msg, listener, ackMode);
479       } finally {
480         synchronized (this) {
481           if (status == Status.ON_MSG)
482             setStatus(Status.RUN);
483
484           // Notify threads trying to close the listener.
485
notifyAll();
486         }
487       }
488     } else {
489       throw new JMSException JavaDoc("Null listener");
490     }
491   }
492
493   void ack(String JavaDoc msgId, int ackMode)
494       throws JMSException JavaDoc {
495     if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) {
496       // All the operations on messagesToAck are synchronized
497
// on the vector (see subscribe() and acknowledge()).
498
messagesToAck.addElement(msgId);
499       if (! queueMode) {
500         acknowledge(topicAckBufferMax);
501       }
502     } else {
503       ConsumerAckRequest ack = new ConsumerAckRequest(targetName, queueMode);
504       ack.addId(msgId);
505       rm.sendRequest(ack);
506     }
507   }
508
509   void activateMessageInput() throws JMSException JavaDoc {
510     rm.sendRequest(
511       new ActivateConsumerRequest(targetName, true));
512   }
513
514   void passivateMessageInput() throws JMSException JavaDoc {
515     rm.sendRequest(
516       new ActivateConsumerRequest(targetName, false));
517   }
518 }
519
Popular Tags