KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > client > JmsSession


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright (c) 2001, 2002 Dan Greff
20  */

21 package com.presumo.jms.client;
22
23 import com.presumo.jms.message.AckHelper;
24 import com.presumo.jms.message.JmsBytesMessage;
25 import com.presumo.jms.message.JmsMessage;
26 import com.presumo.jms.message.JmsMapMessage;
27 import com.presumo.jms.message.JmsObjectMessage;
28 import com.presumo.jms.message.JmsStreamMessage;
29 import com.presumo.jms.message.JmsTextMessage;
30 import com.presumo.jms.message.MessageStateListener;
31
32 import com.presumo.jms.resources.Resources;
33 import com.presumo.jms.router.Router;
34 import com.presumo.jms.router.RouterAdapter;
35 import com.presumo.jms.router.RoutingTarget;
36 import com.presumo.jms.selector.Parser;
37 import com.presumo.jms.selector.JmsOperand;
38 import com.presumo.jms.plugin.implementation.MemoryMessageQueue;
39 import com.presumo.util.log.Logger;
40 import com.presumo.util.log.LoggerFactory;
41
42 import java.io.IOException JavaDoc;
43 import java.io.Serializable JavaDoc;
44 import java.util.BitSet JavaDoc;
45 import java.util.ArrayList JavaDoc;
46
47 import javax.jms.BytesMessage JavaDoc;
48 import javax.jms.Destination JavaDoc;
49 import javax.jms.ExceptionListener JavaDoc;
50 import javax.jms.IllegalStateException JavaDoc;
51 import javax.jms.JMSException JavaDoc;
52 import javax.jms.MapMessage JavaDoc;
53 import javax.jms.Message JavaDoc;
54 import javax.jms.MessageListener JavaDoc;
55 import javax.jms.ObjectMessage JavaDoc;
56 import javax.jms.Session JavaDoc;
57 import javax.jms.StreamMessage JavaDoc;
58 import javax.jms.TextMessage JavaDoc;
59
60
61 /**
62  * Implementation of <code>javax.jms.Session</code>. Also contains plenty
63  * of routing logic since the session talks to the Router.
64  *
65  * @see javax.jms.Session
66  * @author Dan Greff
67  */

68 public abstract class JmsSession extends RouterAdapter
69   implements Session JavaDoc, RoutingTarget, MessageStateListener
70 {
71
72   // todo:: point 2 point
73
// note: also defined in QueueManager
74
static final int QUEUE_RECEIVER_CRT = 0;
75   static final int QUEUE_RECEIVER_CLOSE = 1;
76   static final int QUEUE_BROWSER_CRT = 2;
77   static final int QUEUE_BROWSER_CLOSE = 3;
78   static final int DURABLE_SUBSCRIBER_CRT = 4;
79   static final int DURABLE_SUBSCRIBER_CLOSE = 5;
80   static final int DURABLE_SUBSCRIBER_DELETE = 6;
81
82
83   /** Indicator for transacted Sessions **/
84   protected final boolean transacted;
85
86   /** Acknowledge mode for this session **/
87   protected final int acknowledgeMode;
88
89   /** Number of asynchronous consumers on this session **/
90   protected int asynchCount;
91
92   /** Set to true if this session has been closed **/
93   protected volatile boolean closed;
94
95   /** Router for this connection **/
96   protected final Router router;
97
98   /** Connection for this session **/
99   protected final JmsConnection connx;
100
101   /** Parser for evaulating selection filters **/
102   protected final Parser parser = Parser.getInstance();
103
104   /** Filter of all consumers's filters joined together **/
105   protected JmsOperand joinedFilter;
106  
107   /** Consumers on this session **/
108   private final ArrayList JavaDoc consumers = new ArrayList JavaDoc(0);
109
110   /** Number of Consumers on this session **/
111   private int numOfConsumers;
112
113   /** Producers on this session **/
114   private final ArrayList JavaDoc producers = new ArrayList JavaDoc(0);
115
116   private final ArrayList JavaDoc outgoingMsgs;
117   private final ArrayList JavaDoc unacknowledgedMsgs;
118   
119     ///////////////////////////////////////////////////////////////////////////
120
// Constructors //
121
///////////////////////////////////////////////////////////////////////////
122
public JmsSession(Router router,
123                     boolean transacted,
124                     int acknowledgeMode,
125                     JmsConnection connx)
126     throws JMSException JavaDoc
127   {
128     super(new MemoryMessageQueue(), 1, "JmsSession Router");
129     
130     logger.entry("JmsSession");
131                       
132     this.router = router;
133     this.transacted = transacted;
134     this.acknowledgeMode = acknowledgeMode;
135     this.connx = connx;
136
137     unacknowledgedMsgs = new ArrayList JavaDoc();
138     if (transacted == true) {
139       outgoingMsgs = new ArrayList JavaDoc();
140     } else {
141       outgoingMsgs = null;
142     }
143
144
145     // Note: This session is added to the Router as a RoutingTarget only when
146
// the first subscriber is added, and removed when the last subscriber
147
// is closed. This results in the router not knowing about sessions with
148
// only publishers.
149

150     logger.exit("JmsSession");
151   }
152  
153     ///////////////////////////////////////////////////////////////////////////
154
// Public Methods - javax.jms.Session //
155
///////////////////////////////////////////////////////////////////////////
156

157   public final BytesMessage createBytesMessage() throws JMSException JavaDoc
158   {
159     return new JmsBytesMessage(router.getName());
160   }
161
162
163   public final MapMessage createMapMessage() throws JMSException JavaDoc
164   {
165     return new JmsMapMessage(router.getName());
166   }
167
168
169   public final Message createMessage() throws JMSException JavaDoc
170   {
171     return new JmsMessage(router.getName());
172   }
173
174
175   public final ObjectMessage createObjectMessage() throws JMSException JavaDoc
176   {
177     return new JmsObjectMessage(router.getName());
178   }
179
180
181   public final ObjectMessage createObjectMessage(Serializable JavaDoc object)
182     throws JMSException JavaDoc
183   {
184     ObjectMessage msg = new JmsObjectMessage(router.getName());
185     msg.setObject(object);
186     return msg;
187   }
188
189
190   public final StreamMessage createStreamMessage() throws JMSException JavaDoc
191   {
192     return new JmsStreamMessage(router.getName());
193   }
194
195
196   public final TextMessage createTextMessage() throws JMSException JavaDoc
197   {
198     return new JmsTextMessage(router.getName());
199   }
200
201
202   public final TextMessage createTextMessage(String JavaDoc text) throws JMSException JavaDoc
203   {
204     TextMessage msg = new JmsTextMessage(router.getName());
205     msg.setText(text);
206     return msg;
207   }
208
209   public final boolean getTransacted() throws JMSException JavaDoc
210   {
211     return this.transacted;
212   }
213
214
215   public final void commit() throws JMSException JavaDoc
216   {
217     if (!transacted) {
218       throw new IllegalStateException JavaDoc("Commit called on non-transacted session");
219     }
220
221     // Ack unacknowledged messages
222
synchronized(unacknowledgedMsgs) {
223       for (int i=0; i < unacknowledgedMsgs.size(); i++) {
224         JmsMessage msg = (JmsMessage) unacknowledgedMsgs.get(i);
225         msg.getAckHelper().routedAck(this);
226       }
227     }
228
229     // Send unsent messages
230
JmsMessage [] msgs = new JmsMessage[outgoingMsgs.size()];
231     msgs = (JmsMessage[]) outgoingMsgs.toArray(msgs);
232     try {
233       router.routeMessages(msgs);
234     } catch(IOException JavaDoc ioe) {
235       JMSException JavaDoc jmsex = new JMSException JavaDoc("");
236       jmsex.setLinkedException(ioe);
237       throw jmsex;
238     }
239
240     outgoingMsgs.clear();
241
242     // Wait for acks to be confirmed
243
synchronized(unacknowledgedMsgs) {
244       while(unacknowledgedMsgs.size() != 0) {
245         try {
246           unacknowledgedMsgs.wait(3000);
247         } catch (InterruptedException JavaDoc ie) {}
248       }
249     }
250   }
251
252   public final void rollback()
253     throws JMSException JavaDoc
254   {
255     if (!transacted) {
256       throw new IllegalStateException JavaDoc("rollback called on non-transacted session");
257     }
258
259     outgoingMsgs.clear();
260     JmsMessage [] msgs = new JmsMessage[unacknowledgedMsgs.size()];
261     msgs = (JmsMessage[])unacknowledgedMsgs.toArray(msgs);
262     unacknowledgedMsgs.clear();
263     try {
264       queueMessages(msgs);
265     } catch (IOException JavaDoc ioe) {
266       // memory queue should never throw this
267
ioe.printStackTrace();
268     }
269     
270   }
271
272
273   public final void recover() throws JMSException JavaDoc
274   {
275     if (transacted) {
276       throw new IllegalStateException JavaDoc("Recover called on transacted session");
277     }
278
279     if (unacknowledgedMsgs != null) {
280       JmsMessage [] msgs = new JmsMessage[unacknowledgedMsgs.size()];
281       msgs = (JmsMessage[]) unacknowledgedMsgs.toArray(msgs);
282       unacknowledgedMsgs.clear();
283       try {
284         queueMessages(msgs);
285       } catch (IOException JavaDoc ioe) {
286         // memory queue should never throw this
287
ioe.printStackTrace();
288       }
289     }
290   }
291
292
293   public void close() throws JMSException JavaDoc
294   {
295     logger.entry("close");
296     
297     if (!closed) {
298       super.stopRouter();
299       super.closeRouter();
300       
301       closed = true;
302       synchronized (consumers) {
303         while( consumers.size() > 0) {
304           ( (JmsMessageConsumer) consumers.get(consumers.size()-1) ).close();
305         }
306       }
307
308       synchronized (producers) {
309         while( producers.size() > 0) {
310           ( (JmsMessageProducer) producers.get(producers.size()-1) ).close();
311         }
312       }
313       connx.removeSession(this);
314     }
315     logger.exit("close");
316   }
317
318
319   public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc
320   {
321     throw new JMSException JavaDoc("Operation not supported");
322   }
323
324   public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc
325   {
326     throw new JMSException JavaDoc("Operation not supported");
327   }
328
329
330   public void acknowledge()
331   {
332     logger.entry("acknowledge");
333     synchronized (unacknowledgedMsgs) {
334
335       // Get copy of array list because the messages may get deleted
336
// as we are iterating through and acking the message
337
Object JavaDoc [] acks = unacknowledgedMsgs.toArray();
338       for (int i=0; i < acks.length; i++) {
339         JmsMessage msg = (JmsMessage) acks[i];
340         logger.debug("acknowledge - Acking message: " + msg.toString());
341         msg.getAckHelper().routedAck(this);
342       }
343
344       if (acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE) {
345         while(unacknowledgedMsgs.size() != 0) {
346           try {
347             logger.debug("acknowledge - waiting for deletion acks");
348             unacknowledgedMsgs.wait(3000);
349           } catch (InterruptedException JavaDoc ie) {}
350         }
351       }
352     }
353     logger.exit("acknowledge");
354   }
355
356
357     /////////////////////////////////////////////////////////////////////////
358
// RoutingTarget Implemenation Methods //
359
/////////////////////////////////////////////////////////////////////////
360

361   /**
362    * @secret
363    */

364   public final JmsMessage takeMessage(JmsMessage msg)
365   {
366     if (logger.isDebugEnabled())
367       logger.entry("takeMessage " + msg.toString());
368
369     boolean cloned = false;
370     synchronized (consumers)
371     {
372       int size = consumers.size();
373       BitSet JavaDoc routingMask = null;;
374       for (int i=0; i < size; ++i) {
375         JmsMessageConsumer c = (JmsMessageConsumer) consumers.get(i);
376         if (parser.evaluate(c.getFilter(), msg)) {
377           if (!cloned) {
378             msg = (JmsMessage) msg.clone();
379             cloned = true;
380             routingMask = new BitSet JavaDoc(size);
381           }
382           routingMask.set(i);
383         }
384       }
385       if (cloned) {
386         msg.setRoutingMask(routingMask);
387
388         // Do everything necessary for persistent messages
389
if (msg.getJMSDeliveryMode() == javax.jms.DeliveryMode.PERSISTENT) {
390           AckHelper ack = msg.getAckHelper();
391           ack.addDeletionListener(this);
392           ack.safeAck(this);
393
394           if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
395             msg.setSessionCallback(this);
396           }
397         }
398
399         try {
400           queueMessage(msg);
401         } catch (IOException JavaDoc ioe) {
402           // memory queue should never throw this
403
ioe.printStackTrace();
404         }
405       }
406     }
407     
408     if (logger.isDebugEnabled())
409       logger.exit("takeMessage", new Boolean JavaDoc(cloned));
410     return msg;
411   }
412
413   /**
414    * @secret
415    */

416   public final JmsOperand getRoutingFilter()
417   {
418     synchronized(consumers) {
419       return joinedFilter;
420     }
421   }
422   
423   /**
424    * @secret
425    */

426   public final void setRemoteRoutingFilter(JmsOperand filter, boolean add)
427   {
428     // Do nothing since this routing target does not need
429
// filter updates, only RemoteSessions need filter updates
430
}
431   
432   /**
433    * @secret Not used by JmsSession, but used by RemoteSession
434    */

435   public final void setTargetID(int id)
436   {
437   }
438   
439   /**
440    * @secret
441    */

442   public final boolean needsFilterUpdates()
443   {
444     return false;
445   }
446
447
448
449     /////////////////////////////////////////////////////////////////////////
450
// MessageStateListener Implementation Methods //
451
/////////////////////////////////////////////////////////////////////////
452

453   public final void messageRouted(JmsMessage msg)
454   {
455   }
456
457   public final void messageDeleted(JmsMessage msg)
458   {
459     logger.entry("messageDeleted", msg);
460
461     synchronized (unacknowledgedMsgs) {
462
463       logger.debug("messageDeleted - unacks size: " + unacknowledgedMsgs.size());
464       unacknowledgedMsgs.remove(msg);
465       if (unacknowledgedMsgs.size() == 0) {
466         logger.debug("messageDelted - notifyall");
467         unacknowledgedMsgs.notifyAll();
468       } else {
469         logger.debug("messageDeleted - unacks size: " + unacknowledgedMsgs.size());
470       }
471
472     }
473     logger.exit("messageDeleted");
474   }
475
476
477     ///////////////////////////////////////////////////////////////////////////
478
// Package Methods //
479
///////////////////////////////////////////////////////////////////////////
480

481
482   final void start()
483   {
484     logger.entry("start");
485     
486     this.startRouter();
487     
488     logger.exit("start");
489   }
490   
491   final void stop()
492   {
493     logger.entry("stop");
494     
495     this.stopRouter();
496
497     logger.exit("stop");
498   }
499
500   
501   final void send(JmsMessage msg) throws JMSException JavaDoc
502   {
503     if (transacted) {
504       outgoingMsgs.add(msg);
505     } else {
506       try {
507         router.routeMessage(msg);
508       } catch(IOException JavaDoc ioe) {
509         JMSException JavaDoc jmsex = new JMSException JavaDoc("");
510         jmsex.setLinkedException(ioe);
511         throw jmsex;
512       }
513     }
514   }
515
516   final boolean autoAcknowledge()
517   {
518     return ( (!transacted) &&
519              ((acknowledgeMode == Session.AUTO_ACKNOWLEDGE) ||
520               (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)) );
521   }
522   
523   final void addProducer(JmsMessageProducer producer)
524   {
525     synchronized (producers) {
526       producers.add(producer);
527     }
528   }
529   
530   final void removeProducer(JmsMessageProducer producer)
531   {
532     synchronized (producers) {
533       producers.remove(producer);
534     }
535   }
536
537   /**
538    * Call back used by the MessageConsumerConstructor
539    */

540   void addConsumer(JmsMessageConsumer consumer)
541   {
542     synchronized (consumers) {
543       consumers.add(consumer);
544       ++numOfConsumers;
545       this.recalculateJoinedFilter();
546       if (numOfConsumers == 1)
547         router.addTarget(this);
548       else
549         router.recalculateFilters(true);
550     }
551   }
552   
553   /**
554    * Callback used by the MessageConsumer close() method
555    */

556   void removeConsumer(JmsMessageConsumer consumer)
557   {
558     synchronized (consumers) {
559       
560       int index = consumers.indexOf(consumer);
561       if (index == consumers.size()-1)
562         consumers.remove(consumer);
563       else
564         consumers.set(index, null); // can't change the relative location
565
// of the other consumers.
566
--numOfConsumers;
567       this.recalculateJoinedFilter();
568       if (numOfConsumers == 0)
569         router.removeTarget(this);
570       else
571         router.recalculateFilters(false);
572     }
573   }
574
575   /**
576    * reports an exception to the exception listener
577    */

578   final void reportException(Exception JavaDoc e)
579   {
580     logger.exception(e);
581     ExceptionListener JavaDoc el = null;
582     try {
583       el = connx.getExceptionListener();
584     } catch (JMSException JavaDoc jmsex) {}
585
586     if (el != null) {
587       JMSException JavaDoc jmsex;
588       if (e instanceof JMSException JavaDoc) {
589         jmsex = (JMSException JavaDoc) e;
590       } else {
591         jmsex = new JMSException JavaDoc("An exception within the connection: " +
592                                  e.toString());
593         jmsex.setLinkedException(e);
594       }
595       el.onException(jmsex);
596     }
597   }
598
599   /**
600    * Every consumer that has a message listener set for the first time will
601    * call this method.
602    */

603   final void addAsynch()
604   {
605     ++asynchCount;
606   }
607
608   /**
609    * Called by Consumer when it is closed if it had a message listener
610    * set.
611    */

612   final void removeAsynch()
613   {
614     --asynchCount;
615   }
616
617   /**
618    * Used by by by consumers to determine if a receive() call is erroneous
619    * due to other asynchronus consumers on the session.
620    */

621   final boolean hasAsynchronousListeners()
622   {
623     return asynchCount != 0;
624   }
625
626  
627   /**
628    * Used by QueueReceivers and DurableSubscriptions to interact with the
629    * QueueManager.
630    */

631   final void sendQueueRequest(String JavaDoc queueName,
632                               String JavaDoc receiverID,
633                               String JavaDoc filter,
634                               int type)
635     throws JMSException JavaDoc
636   {
637     throw new RuntimeException JavaDoc("Not implemented");
638   }
639   
640     ///////////////////////////////////////////////////////////////////////////
641
// Protected Methods //
642
///////////////////////////////////////////////////////////////////////////
643

644   /*
645    * Called by the thread within RouterAdapater when there are messages
646    * sitting on the MessageQueue.
647    */

648   protected final void routeMessages(int batchSize)
649   {
650     logger.entry("routeMessages");
651     
652     synchronized (consumers) {
653       int i, j;
654       JmsMessage [] msgs = null;
655       try {
656         msgs = super.getNext(batchSize);
657       } catch (IOException JavaDoc ioe) {
658         // In memory message queue shouldn't throw this
659
ioe.printStackTrace();
660       }
661
662       if (msgs == null) return;
663       for (i=0; i < msgs.length; i++) {
664
665         JmsMessage msg = msgs[i];
666         
667         logger.debug("routeMessages: Processing message: " + msg);
668         BitSet JavaDoc routingMask = msg.getRoutingMask();
669
670         if (msg.getJMSDeliveryMode() == javax.jms.DeliveryMode.PERSISTENT) {
671           // Persistent message send
672
synchronized(unacknowledgedMsgs) {
673             unacknowledgedMsgs.add(msg);
674             boolean msgSent = false;
675             for (j = 0; j < consumers.size(); j++) {
676               if (routingMask.get(j)) {
677                 JmsMessageConsumer target = (JmsMessageConsumer) consumers.get(j);
678                 if (target != null) { // possible if the consumer was removed
679
target.takeMessage(msg);
680                   msgSent = true;
681                 }
682               }
683             }
684
685             // If it wasn't sent it must be acked, if it was sent the consumer
686
// will take care of acknowledgement.
687
if (!msgSent) {
688               unacknowledgedMsgs.remove(msg);
689               msg.getAckHelper().routedAck(this);
690             }
691           }
692         }
693         else {
694           // non-perstent message send
695
for (j = 0; j < consumers.size(); j++) {
696             if (routingMask.get(j)) {
697               JmsMessageConsumer target = (JmsMessageConsumer) consumers.get(j);
698               if (target != null) { // possible if the consumer was removed
699
target.takeMessage(msg);
700               }
701             }
702           }
703         }
704
705         
706       } // end for(i
707
} // end synchronized
708

709     logger.exit("routeMessages");
710   }
711
712   /**
713    * DO NOT rely on this method.
714    */

715   protected void finalize() throws Throwable JavaDoc
716   {
717     this.close();
718   }
719
720     ///////////////////////////////////////////////////////////////////////////
721
// Private Methods //
722
///////////////////////////////////////////////////////////////////////////
723

724   private void recalculateJoinedFilter()
725   {
726     if (numOfConsumers == 0) {
727       joinedFilter = null;
728       return;
729     }
730     
731     JmsOperand [] allFilters = new JmsOperand[numOfConsumers];
732     for (int i=0, j=0; i < consumers.size(); ++i) {
733       JmsMessageConsumer c = (JmsMessageConsumer) consumers.get(i);
734       if (c != null) {
735         allFilters[j] = c.getFilter();
736         ++j;
737       }
738     }
739     joinedFilter = parser.orTogether(allFilters);
740   }
741   
742   ////////////////////////////// Misc stuff ////////////////////////////////
743

744   private static Logger logger =
745     LoggerFactory.getLogger(JmsSession.class, Resources.getBundle());
746
747   ///////////////////////////////////////////////////////////////////////////
748

749 }
750
751
752
753
Popular Tags