KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > jms > MantaSession


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Nimo 24-FEB-2004.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.api.jms;
47
48 import java.io.IOException JavaDoc;
49 import java.io.Serializable JavaDoc;
50 import java.util.ArrayList JavaDoc;
51 import java.util.Collection JavaDoc;
52 import java.util.Enumeration JavaDoc;
53 import java.util.Hashtable JavaDoc;
54 import java.util.Iterator JavaDoc;
55 import java.util.LinkedHashSet JavaDoc;
56 import java.util.List JavaDoc;
57 import java.util.Set JavaDoc;
58
59 import javax.jms.BytesMessage JavaDoc;
60 import javax.jms.Destination JavaDoc;
61 import javax.jms.IllegalStateException JavaDoc;
62 import javax.jms.InvalidDestinationException JavaDoc;
63 import javax.jms.JMSException JavaDoc;
64 import javax.jms.MapMessage JavaDoc;
65 import javax.jms.Message JavaDoc;
66 import javax.jms.MessageConsumer JavaDoc;
67 import javax.jms.MessageListener JavaDoc;
68 import javax.jms.MessageProducer JavaDoc;
69 import javax.jms.ObjectMessage JavaDoc;
70 import javax.jms.Queue JavaDoc;
71 import javax.jms.QueueBrowser JavaDoc;
72 import javax.jms.QueueReceiver JavaDoc;
73 import javax.jms.QueueSender JavaDoc;
74 import javax.jms.QueueSession JavaDoc;
75 import javax.jms.Session JavaDoc;
76 import javax.jms.StreamMessage JavaDoc;
77 import javax.jms.TemporaryQueue JavaDoc;
78 import javax.jms.TemporaryTopic JavaDoc;
79 import javax.jms.TextMessage JavaDoc;
80 import javax.jms.Topic JavaDoc;
81 import javax.jms.TopicPublisher JavaDoc;
82 import javax.jms.TopicSession JavaDoc;
83 import javax.jms.TopicSubscriber JavaDoc;
84
85 import org.apache.commons.logging.Log;
86 import org.apache.commons.logging.LogFactory;
87 import org.mr.IMessageListener;
88 import org.mr.MantaAgent;
89 import org.mr.MantaException;
90 import org.mr.api.jms.selector.syntax.Selector;
91 import org.mr.core.protocol.MantaBusMessage;
92 import org.mr.core.protocol.MantaBusMessageConsts;
93 import org.mr.core.util.Stage;
94 import org.mr.core.util.StageHandler;
95 import org.mr.core.util.StageParams;
96 import org.mr.core.util.SystemTime;
97 import org.mr.core.util.byteable.Byteable;
98 import org.mr.core.util.byteable.ByteableInputStream;
99 import org.mr.core.util.byteable.ByteableOutputStream;
100 import org.mr.core.util.byteable.ByteableRegistry;
101 import org.mr.kernel.security.MantaAuthorization;
102 import org.mr.kernel.security.SecurityActionTypes;
103 import org.mr.kernel.services.MantaService;
104 import org.mr.kernel.services.ServiceActor;
105 import org.mr.kernel.services.ServiceConsumer;
106 import org.mr.kernel.services.ServiceProducer;
107 import org.mr.kernel.services.topics.VirtualTopicManager;
108
109
110 /**
111  *
112  * @author Nimo 24-FEB-2004
113  *
114  * A Session object is a single-threaded context for producing and consuming messages. Although it may allocate
115  * provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
116  *
117  *
118  * A session serves several purposes:
119  * It is a factory for its message producers and consumers.
120  * It supplies provider-optimized message factories.
121  * It is a factory for TemporaryTopics and TemporaryQueues.
122  * It provides a way to create Queue or Topic objects for those clients that need to dynamically manipulate provider-specific destination names.
123  * It supports a single series of transactions that combine work spanning its producers and consumers into atomic units.
124  * It defines a serial order for the messages it consumes and the messages it produces.
125  * It retains messages it consumes until they have been acknowledged.
126  * It serializes execution of message listeners registered with its message consumers.
127  * It is a factory for QueueBrowsers.
128  * A session can create and service multiple message producers and consumers.
129  * One typical use is to have a thread block on a synchronous MessageConsumer until a message arrives. The thread may then use one or more of the Session's MessageProducers.
130  * If a client desires to have one thread produce messages while others consume them, the client should use a separate session for its producing thread.
131  * Once a connection has been started, any session with one or more registered message listeners is dedicated to the thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its constituent objects from another thread of control. The only exception to this rule is the use of the session or connection close method.
132  * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to start simply and incrementally add message processing complexity as their need for concurrency grows.
133  * The close method is the only session method that can be called while some other session method is being executed in another thread.
134  * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect, transactions organize a session's input message stream and output message stream into series of atomic units. When a transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically recovered.
135  * The content of a transaction's input and output units is simply those messages that have been produced and consumed within the session's current transaction.
136  * A transaction is completed using either its session's commit method or its session's rollback method. The completion of a session's current transaction automatically begins the next. The result is that a transacted session always has a current transaction within which its work is done.
137  * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are controlled via the Java Transaction API (JTA), use of the session's commit and rollback methods in this context is prohibited.
138  * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
139  * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the JMS API into their application server products.
140  *
141  */

142 public class MantaSession implements Serializable JavaDoc, Session JavaDoc, QueueSession JavaDoc, TopicSession JavaDoc, IMessageListener,StageHandler {
143
144
145     /**
146      * in order to be a good Serializable
147      */

148     private static final long serialVersionUID = -1698529734026002731L;
149     public Log log;
150     /**
151      * the fasade of the manta
152      */

153     private MantaAgent manta;
154     protected Counter listenersCount;
155
156     // When the session is stopped, this object is enqueued to the
157
// stage's queue to release the Execution Thread in case the
158
// queue is empty.
159
private Object JavaDoc stopEvent = new Object JavaDoc();
160
161     // support in JCA 1.5
162
private DeliveryListener deliveryListener;
163
164     protected TransactionContext transactionContext;
165
166     // used by the connection consumer only
167
private IMessageListener busListener = null;
168
169     //constructors --------------------------------------------------------
170

171    /**
172     * The main constructor for a Session object.
173     * This constructor needs to have a session id, supplied by the connection,
174     * along with the creating connection reference.
175     * Additionally - transaction and ack mode for the session.
176     */

177     MantaSession(String JavaDoc csessId, MantaConnection con, int ackMode, boolean trx) throws JMSException JavaDoc{
178
179         if (con == null)
180             throw new JMSException JavaDoc("MNJMS00072 : FAILED ON SESSION CREATION. Connection WAS NULL.");
181         owningConnection = con;
182         sessId = csessId;
183         //sessionTransactedMode=trx;
184
if (trx)
185             sessionAcknowledgementMode=Session.SESSION_TRANSACTED;
186         //check ack_mode in the range Session_Transacted=0, DUPS_OK = 3.
187
else if (ackMode<Session.SESSION_TRANSACTED || ackMode>Session.DUPS_OK_ACKNOWLEDGE)
188                 throw new IllegalStateException JavaDoc("MNJMS00073 : FAILED ON SESSION CREATION. INAVLID ACKNOWLEDGE MODE : "+ackMode);
189         else
190             sessionAcknowledgementMode=ackMode;
191
192         // we don't know now if this session will participate in XA transactions so
193
// we create the lists anyway.
194
//if (sessionAcknowledgementMode == Session.SESSION_TRANSACTED ||
195
// sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
196
unackedMessages = new LinkedHashSet JavaDoc();
197             heldMessages = new LinkedHashSet JavaDoc();
198         //}
199
//transactionContext = new TransactionContext(this);
200
transactionContext = new TransactionContext();
201         transactionContext.addSession(this);
202 // transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() {
203
// public void beginEvent(String txnID) {
204
// System.out.println("local txn started: "+txnID);
205
// }
206
// public void commitEvent(String txnID) {
207
// System.out.println("local txn commited: "+txnID);
208
// }
209
// public void rollbackEvent(String txnID) {
210
// System.out.println("local txn rolledback: "+txnID);
211
// }
212
// });
213

214         isClosed = false;
215         isClosing = false;
216         isStopped = !con.isStarted;
217         log=LogFactory.getLog("MantaSession");
218
219         //create the parms for the staging mechanism and the inner stage.
220
StageParams innerQueueParms = new StageParams();
221         innerQueueParms.setBlocking(false);
222         innerQueueParms.setHandler(this);
223         innerQueueParms.setMaxNumberOfThreads(1);
224         innerQueueParms.setNumberOfStartThreads(1);
225         innerQueueParms.setPersistent(false);
226         innerQueueParms.setStageName("Session["+sessId+"]@");
227         innerQueueParms.setStagePriority(0);
228         innerQueue = new Stage(innerQueueParms);
229         lockMonitor = new Object JavaDoc();
230         manta = owningConnection.getChannel();
231         consumerMessages = new ArrayList JavaDoc();
232         listenersCount = new Counter();
233     }//MantaSession
234

235
236     /**
237      * This method is used by the connection to start a session.
238      * when a connection is started, a session should know that it can
239      * start too, and notify its consumers who might be waiting.
240      *
241      * @throws JMSException
242      */

243     void start() throws JMSException JavaDoc
244     {
245         checkLegalOperation();
246         if (!isStopped)
247             return;
248         isStopped = false;
249
250         synchronized (lockMonitor) {
251             lockMonitor.notifyAll();
252         }
253     }
254
255     /**
256      * This method is used by the connection to stop all its sessions upon
257      * stopping on itself. this is done so the sessions wouldn't try to do
258      * work when the connection is not in an open state.
259      *
260      * @throws JMSException
261      */

262     void stop() throws JMSException JavaDoc
263     {
264         checkLegalOperation();
265         if (isStopped)
266             return;
267
268         // in order to make this method blocking, we wait on the
269
// lockMonitor object. When the Execution Thread finds
270
// out that the session stopped it notifies about it going
271
// to sleep and the stop thread resumes and return.
272
synchronized (lockMonitor) {
273             // we start synchronizing here because we don't want the
274
// Execution Thread to notify, before we invoke the wait().
275
isStopped = true;
276
277             // When the satage queue is empty, it blocks the Execution
278
// Thread. To prevent that we enqueue a demi object.
279
innerQueue.enqueue(stopEvent);
280             try {
281                 lockMonitor.wait();
282             } catch (InterruptedException JavaDoc e) {
283                 if (log.isErrorEnabled())
284                     log.error("Error while stopping the session. ", e);
285             }
286         }
287     }
288
289     /**
290      * Creates a new <CODE>BytesMessage</CODE> object.
291      * @exception JMSException
292      * if the JMS provider fails to create this message due to
293      * some internal error.
294      */

295     public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc {
296         checkLegalOperation();
297         return new MantaBytesMessage(this);
298     }//createBytesMessage
299

300     /**
301      * Creates a new <CODE>MapMessage</CODE> object.
302      * @exception JMSException
303      * if the JMS provider fails to create this message due to
304      * some internal error.
305      */

306     public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc {
307         checkLegalOperation();
308         return new MantaMapMessage(this);
309     }//createMapMessage
310

311     /**
312      * Creates a <CODE>Message</CODE> object.
313      * @exception JMSException
314      * if the JMS provider fails to create this message due to
315      * some internal error.
316      */

317     public Message JavaDoc createMessage() throws JMSException JavaDoc {
318         checkLegalOperation();
319         return new MantaMessage(this);
320     }//createMessage
321

322     /**
323      * Creates an <CODE>ObjectMessage</CODE> object.
324      * @exception JMSException
325      * if the JMS provider fails to create this message due to
326      * some internal error.
327      */

328     public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc {
329         checkLegalOperation();
330         return new MantaObjectMessage(this);
331     }//createObjectMessage
332

333     /**
334      * Creates an initialized <CODE>ObjectMessage</CODE> object.
335      *
336      * @param object
337      * the object to use to initialize this message
338      *
339      * @exception JMSException
340      * if the JMS provider fails to create this message due to
341      * some internal error.
342      */

343     public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc {
344         checkLegalOperation();
345         return new MantaObjectMessage(this,object);
346     }//createObjectMessage
347

348     /**
349      * Creates a <CODE>StreamMessage</CODE> object.
350      * @exception JMSException
351      * if the JMS provider fails to create this message due to
352      * some internal error.
353      */

354     public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc {
355         checkLegalOperation();
356         return new MantaStreamMessage(this);
357     }//createStreamMessage
358

359     /**
360      * Creates a <CODE>TextMessage</CODE> object.
361      * @exception JMSException
362      * if the JMS provider fails to create this message due to
363      * some internal error.
364      */

365     public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc {
366         checkLegalOperation();
367         return new MantaTextMessage(this);
368     }//createTextMessage
369

370     /**
371      * Creates an initialized <CODE>TextMessage</CODE> object.
372      * @param text
373      * the string used to initialize this message
374      *
375      * @exception JMSException
376      * if the JMS provider fails to create this message due to
377      * some internal error.
378      */

379     public TextMessage JavaDoc createTextMessage(String JavaDoc text) throws JMSException JavaDoc {
380         checkLegalOperation();
381         return new MantaTextMessage(this,text);
382     }//createTextMessage
383

384     /**
385      * Indicates whether the session is in transacted mode.
386      * @return true if the session is in transacted mode
387      * @exception JMSException
388      * if the JMS provider fails to return the transaction mode
389      * due to some internal error.
390      */

391     public final boolean getTransacted() throws JMSException JavaDoc {
392
393         checkLegalOperation();
394         //return sessionTransactedMode;
395
return sessionAcknowledgementMode == Session.SESSION_TRANSACTED ||
396                                              transactionContext.isInXATransaction();
397
398     }//getTransacted
399

400
401     /**
402      * Returns the acknowledgement mode of the session. The acknowledgement
403      * mode is set at the time that the session is created. If the session is
404      * transacted, the acknowledgement mode is ignored.
405      *
406      * @return If the session is not transacted, returns the current
407      * acknowledgement mode for the session. If the session is
408      * transacted, returns SESSION_TRANSACTED.
409      *
410      * @exception JMSException
411      * if the JMS provider fails to return the acknowledgment
412      * mode due to some internal error.
413      *
414      * @see MantaConnection#createSession
415      * @since 1.1
416      */

417     public final int getAcknowledgeMode() throws JMSException JavaDoc {
418
419         checkLegalOperation();
420         return sessionAcknowledgementMode;
421     }//getAcknowledgeMode
422

423     /**
424      * Commits all messages done in this transaction and releases any locks
425      * currently held.
426      *
427      * @exception JMSException
428      * if the JMS provider fails to commit the transaction due
429      * to some internal error.
430      * @exception TransactionRolledBackException
431      * if the transaction is rolled back due to some internal
432      * error during commit.
433      * @exception IllegalStateException
434      * if the method is not called by a transacted session.
435      */

436     public void commit() throws JMSException JavaDoc {
437 // checkLegalOperation();
438
// if (!sessionTransactedMode)
439
// throw new IllegalStateException("MNJMS00074 : FAILED ON METHOD commit(). SESSION IS NOT TRANSACTED.");
440
transactionContext.commit();
441     }
442
443     /**
444      * callback from the TransactionContext to do the actual commit
445      */

446     protected void commitSession() throws JMSException JavaDoc {
447         //ack all received messages.
448
synchronized (unackedMessages) {
449             Iterator JavaDoc ackIterator = unackedMessages.iterator();
450             int size = unackedMessages.size();
451             for (int i = 0;i<size;i++) {
452                 MantaBusMessage mbm = (MantaBusMessage)ackIterator.next();
453                 owningConnection.ack(mbm);
454             }
455             unackedMessages.clear();
456         }
457         //send all messages and clear
458
sendAllMessages(heldMessages);
459         heldMessages.clear();
460     }//commit
461

462     /**
463      * Rolls back any messages done in this transaction and releases any locks
464      * currently held.
465      *
466      * @exception JMSException
467      * if the JMS provider fails to roll back the transaction
468      * due to some internal error.
469      * @exception IllegalStateException
470      * if the method is not called by a transacted session.
471      *
472      */

473     public void rollback() throws JMSException JavaDoc {
474 // checkLegalOperation();
475
// if (!sessionTransactedMode)
476
// throw new IllegalStateException("MNJMS00075 : FAILED ON METHOD rollback(). SESSION IS NOT TRANSACTED.");
477
transactionContext.rollback();
478     }//rollback
479

480
481     /**
482      * callback from the TransactionContext to do the actual rollback
483      */

484     protected void rollbackSession() throws JMSException JavaDoc {
485         heldMessages.clear();
486
487         synchronized(listenersCount) {
488             // wait for all listeners to finish dealing with messages
489
while (listenersCount.val() != 0) {
490                 try {
491                     listenersCount.wait();
492                 }
493                 catch (InterruptedException JavaDoc ie) {}
494             }
495
496             // redeliver all unacked messages to the consumers.
497
// mark all resent messages as redelivered.
498
sendUnackedMessages();
499         }
500     }//rollbackSession
501

502     /**
503      * Delivers all unacked messages to the consumers.
504      */

505     protected void sendUnackedMessages() throws JMSException JavaDoc {
506         MantaBusMessage mbm;
507         String JavaDoc consumer;
508         MantaMessageConsumer destConsumer;
509         synchronized(lockMonitor) {
510             List JavaDoc unackedMessagesCopy = new ArrayList JavaDoc();
511             synchronized(unackedMessages) {
512                 unackedMessagesCopy.addAll(unackedMessages);
513                 unackedMessages.clear();
514             }
515             Iterator JavaDoc unacked = unackedMessagesCopy.iterator();
516             while (unacked.hasNext()) {
517                 mbm = (MantaBusMessage) unacked.next();
518                 consumer = ((ServiceActor)mbm.getRecipient()).getId();
519                 destConsumer = (MantaMessageConsumer)messageConsumers.get(consumer);
520                 if (destConsumer != null) {
521                     synchronized (destConsumer) {
522                         if (!destConsumer.isClosed) {
523                             MantaMessage result = (MantaMessage) mbm.getPayload();
524                             result.flags = result.flags | MantaMessage.IS_REDELIVERED;
525                             destConsumer.feedMessageListener(mbm);
526                         }
527                     }
528                 }
529                 else {
530                     if (log.isInfoEnabled())
531                         log.info("A message cannot be sent to a closed consumer. Returning to wait.");
532                     unackedMessages.add(mbm);
533                 }
534             }
535         }
536     }//sendUnackedMessages
537

538
539     void startLocalTransactionIfNeeded() throws JMSException JavaDoc {
540         if (sessionAcknowledgementMode == Session.SESSION_TRANSACTED &&
541             !transactionContext.isInLocalTransaction() &&
542             !transactionContext.isInXATransaction()) {
543             transactionContext.begin();
544         }
545     }
546
547
548     /**
549      * Closes the session.
550      *
551      * This call will block until a <CODE>receive</CODE> call or message
552      * listener in progress has completed. A blocked message consumer <CODE>
553      * receive</CODE> call returns <CODE>null</CODE> when this session is
554      * closed.
555      *
556      * <P>
557      * Closing a transacted session must roll back the transaction in progress.
558      *
559      * <P>
560      * This method is the only <CODE>MantaSession</CODE> method that is allowed
561      * to come from a different control thread. hence - it is synchronized.
562      *
563      * <P>
564      * Invoking any other <CODE>MantaSession</CODE> method on a closed
565      * session must throw a <CODE>JMSException.IllegalStateException</CODE>.
566      * Closing a closed session must <I>not</I> throw an exception.
567      *
568      * @exception JMSException
569      * if the JMS provider fails to close the session due to
570      * some internal error.
571      */

572     public synchronized void close() throws JMSException JavaDoc {
573
574         if (isClosed || isClosing)
575             return;
576
577         stop();
578
579         // shai: we must rollback the session if it is transacted.
580
// However it's illegal to call it on XASession.
581
// Another way to do it is to override this method in the
582
// MantaXASession and remove this call. But then every change
583
// in this code will need to be copied to the MantaXASession
584
// class too. mmm.
585
//if (sessionTransactedMode && !(this instanceof MantaXASession))
586
if (transactionContext.isInLocalTransaction() && !(this instanceof MantaXASession))
587             rollback();
588
589         isClosing = true;
590
591         // close all producers
592
synchronized(messageProducers) {
593             List JavaDoc l = new ArrayList JavaDoc(messageProducers.size());
594             l.addAll(messageProducers.values());
595             Iterator JavaDoc producers = l.iterator();
596             MantaMessageProducer mmp;
597             while (producers.hasNext()) {
598                 mmp = (MantaMessageProducer)producers.next();
599                 if (mmp != null) {
600                     mmp.close();
601                 }
602             }
603             // log producers that stayed opened
604
if (messageProducers.size() > 0) {
605                 reportRemainingToLog(messageProducers, "producers");
606             }
607             messageProducers.clear();
608         }
609
610         // close all consumers
611
synchronized(messageConsumers) {
612             List JavaDoc l = new ArrayList JavaDoc(messageConsumers.size());
613             l.addAll(messageConsumers.values());
614             Iterator JavaDoc consumers = l.iterator();
615             MantaMessageConsumer mmc;
616             while (consumers.hasNext()) {
617                 mmc = (MantaMessageConsumer)consumers.next();
618                 if (mmc != null) {
619                     mmc.close();
620                 }
621             }
622             // log consumers that stayed opened
623
if (messageConsumers.size() > 0) {
624                 reportRemainingToLog(messageConsumers, "consumers");
625             }
626             messageConsumers.clear();
627         }
628
629         busListener = null;
630
631         owningConnection.deleteSession(this);
632         owningConnection = null;
633
634 // if (transactionContext.isInXATransaction() || transactionContext.isInLocalTransaction()) {
635
// transactionContext.saveTransactionNoPersist(heldMessages, unackedMessages);
636
// }
637

638         if (heldMessages != null) {
639             heldMessages.clear();
640         }
641
642         //mark the session as closed:
643
isClosed = true;
644         isClosing = false;
645         innerQueue.stop();
646
647         synchronized (lockMonitor) {
648             lockMonitor.notifyAll();
649         }
650         //no consumers are on right now, so all types are allowed.
651
}
652
653     //txn moved from MantaXASession due to RA requirements
654
void saveMessages(MantaXADescriptor descriptor) {
655         descriptor.addHeldMessages(heldMessages);
656         heldMessages.clear();
657         synchronized (unackedMessages) {
658             descriptor.addUnackedMessages(unackedMessages);
659             unackedMessages.clear();
660         }
661     }
662
663     // used to write to the log any producers/consumers the remained
664
// after session close.
665
private void reportRemainingToLog(Hashtable JavaDoc table, String JavaDoc listName) {
666         if (log.isInfoEnabled()) {
667             String JavaDoc delimiter = ", ";
668             StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
669             Iterator JavaDoc i = table.values().iterator();
670             while (i.hasNext()) {
671                 buf.append(i.next());
672                 buf.append(delimiter);
673             }
674             log.info("Some "+listName+" remained after closing the session. These "+listName+" will be removed: "+buf.toString());
675         }
676     }
677
678     /**
679      * Stops message delivery in this session, and restarts message delivery
680      * with the oldest unacknowledged message.
681      *
682      * <P>
683      * All consumers deliver messages in a serial order. Acknowledging a
684      * received message automatically acknowledges all messages that have been
685      * delivered to the client.
686      *
687      * <P>
688      * Restarting a session causes it to take the following actions:
689      *
690      * <UL>
691      * <LI>Stop message delivery
692      * <LI>Mark all messages that might have been delivered but not
693      * acknowledged as "redelivered"
694      * <LI>Restart the delivery sequence including all unacknowledged messages
695      * that had been previously delivered. Redelivered messages do not have to
696      * be delivered in exactly their original delivery order.
697      * </UL>
698      *
699      * @exception JMSException
700      * if the JMS provider fails to stop and restart message
701      * delivery due to some internal error.
702      * @exception IllegalStateException
703      * if the method is called by a transacted session.
704      */

705     public void recover() throws JMSException JavaDoc {
706
707         checkLegalOperation();
708         //if (sessionTransactedMode)
709
if (this.getTransacted())
710             throw new IllegalStateException JavaDoc("MNJMS00078 : FAILED ON METHOD recover(). SESSION IS TRANSACTED.");
711
712         if (sessionAcknowledgementMode != Session.CLIENT_ACKNOWLEDGE)
713             return;
714
715         sendUnackedMessages();
716     }//recover
717

718
719     // used by the MantaConnectionConsumer to bypass a message consumer
720
// because it needs to get messges as MantaBusMessages
721
void setBusMessageListener(IMessageListener listener) {
722         busListener = listener;
723     }
724
725
726     /**
727      * This is for application servers. they use connection consumers,
728      * and therefore will want to use this and the run() method.
729      */

730     public void run() {
731         synchronized (consumerMessages) {
732             Message JavaDoc message;
733             MantaBusMessage busMessage;
734             while (!consumerMessages.isEmpty()) {
735                 busMessage = (MantaBusMessage) consumerMessages.remove(0);
736                 try {
737                     message = MantaMessageConsumer.convertToJMSMessage(busMessage, this);
738                     ackOrHold(busMessage);
739                 } catch (JMSException JavaDoc e) {
740                     e.printStackTrace();
741                     busMessage = null;
742                     message = null;
743                     continue;
744                 }
745
746
747                 // notify the application server before sending
748
// a message to the MDB (JCA 1.5)
749
if (deliveryListener != null)
750                     deliveryListener.beforeDelivery(this, message);
751
752                 // start local transaction if needed
753
try {
754                     this.startLocalTransactionIfNeeded();
755                 } catch (JMSException JavaDoc e) {
756                     e.printStackTrace();
757                 }
758
759                 // send the message
760
this.sessionListener.onMessage(message);
761
762                 // notify the application server after sending
763
// a message to the MDB (JCA 1.5)
764
if (deliveryListener != null)
765                     deliveryListener.afterDelivery(this, message);
766             }
767         }
768     }
769
770
771
772     /**
773      * Returns the session's distinguished message listener (optional).
774      *
775      * @return the message listener associated with this session
776      *
777      * @exception JMSException
778      * if the JMS provider fails to get the message listener due
779      * to an internal error.
780      *
781      * @see javax.jms.MantaSession#setMessageListener
782      * @see javax.jms.ServerSessionPool
783      * @see javax.jms.ServerSession
784      */

785     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
786
787         checkLegalOperation();
788         return this.sessionListener;
789     }//getMessageListener
790

791     /**
792      * Sets the session's distinguished message listener (optional).
793      *
794      * <P>
795      * When the distinguished message listener is set, no other form of message
796      * receipt in the session can be used; however, all forms of sending
797      * messages are still supported.
798      *
799      * <P>
800      * This is an expert facility not used by regular JMS clients.
801      *
802      * @param listener
803      * the message listener to associate with this session
804      *
805      * @exception JMSException
806      * if the JMS provider fails to set the message listener due
807      * to an internal error.
808      *
809      * @see javax.jms.MantaSession#getMessageListener
810      * @see javax.jms.ServerSessionPool
811      * @see javax.jms.ServerSession
812      */

813     public final void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc {
814
815         checkLegalOperation();
816         sessionListener = listener;
817
818     }//setMessageListener
819

820
821
822     /**
823      * Creates a <CODE>MantaMessageProducer</CODE> to send messages to the
824      * specified destination.
825      *
826      * <P>
827      * A client uses a <CODE>MantaMessageProducer</CODE> object to send
828      * messages to a destination. Since <CODE>MantaQueue</CODE> and <CODE>
829      * MantaTopic</CODE> both inherit from <CODE>Destination</CODE>, they
830      * can be used in the destination parameter to create a <CODE>
831      * MantaMessageProducer</CODE> object.
832      *
833      * @param destination
834      * the <CODE>Destination</CODE> to send to, or null if this is
835      * a producer which does not have a specified destination.
836      *
837      * @exception JMSException
838      * if the session fails to create a MantaMessageProducer
839      * due to some internal error.
840      * @exception InvalidDestinationException
841      * if an invalid destination is specified.
842      *
843      * @since 1.1
844      *
845      */

846     public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc {
847
848     checkLegalOperation();
849
850         MantaMessageProducer mmp = null;
851         if (destination == null ) //need producer with no specific service
852
mmp = new MantaMessageProducer(manta.getMessageId(),this);
853
854         else if (!destination.toString().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) {
855             MantaService service = null;
856             if(destination instanceof Queue JavaDoc){
857                 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_QUEUE,destination.toString() );
858                 service = manta.getService(destination.toString(),MantaService.SERVICE_TYPE_QUEUE);
859             }else{
860                 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_TOPIC,destination.toString() );
861                 service = manta.getService(destination.toString(),MantaService.SERVICE_TYPE_TOPIC);
862             // can't create producer to a hierarchy topic with wildcards
863
}if (service == null){
864                 throw new JMSException JavaDoc("MNJMS00079 : FAILED ON METHOD createProducer() FOR DESTINATION "+destination);
865             }
866             ServiceProducer sActor = ServiceProducer.createNew(service);
867
868             try {
869                 if (log.isInfoEnabled()) {
870                     log.info("Created local producer "+sActor);
871                 }
872                 manta.advertiseService(sActor);
873                 mmp = new MantaMessageProducer(sActor.getId(),this,destination,sActor);
874             }
875             catch (MantaException me) {
876                 mmp = null;
877                 sActor = null;
878                 throw new JMSException JavaDoc("MNJMS0007A : FAILED ON METHOD createProducer(). ERROR TEXT : "+me.getMessage());
879             }
880         }
881
882         else { //this is a temp destination!
883
ServiceProducer producer =null ;
884             if(destination instanceof Queue JavaDoc){
885                 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_QUEUE);
886                 producer = new ServiceProducer( manta.getAgentName(),destination.toString(), MantaService.SERVICE_TYPE_QUEUE);
887             }else{
888                 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_TOPIC);
889                 producer = new ServiceProducer( manta.getAgentName(),destination.toString(), MantaService.SERVICE_TYPE_TOPIC);
890             }
891             mmp = new MantaMessageProducer(
892                     manta.getMessageId(),
893                     this, destination,
894                     producer);
895
896         }
897         messageProducers.put(mmp.getClientId(),mmp);
898
899         // this delay is added to prevent a situation that in
900
// NAD producer will start sending messages before consumers
901
// are ready to receive thyem. (Bug 543)
902
int delay = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getIntProperty("jms.producer_discovery_delay",100);
903         try {
904             Thread.sleep(delay);
905         } catch (InterruptedException JavaDoc e) {
906             if (log.isWarnEnabled()) {
907                 log.warn("Interrupted during sleep.");
908             }
909         }
910         return mmp;
911     }//createProducer
912

913     /**
914      * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
915      * Since <CODE>Queue</CODE> and <CODE>MantaTopic</CODE> both inherit
916      * from <CODE>Destination</CODE>, they can be used in the destination
917      * parameter to create a <CODE>MessageConsumer</CODE>.
918      *
919      * @param destination
920      * the <CODE>Destination</CODE> to access.
921      *
922      * @exception JMSException
923      * if the session fails to create a consumer due to some
924      * internal error.
925      * @exception InvalidDestinationException
926      * if an invalid destination is specified.
927      *
928      * @since 1.1
929      */

930     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
931
932         return createConsumer(destination,null,false);
933
934     }//createConsumer
935

936     /**
937      * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
938      * using a message selector. Since <CODE>Queue</CODE> and <CODE>
939      * MantaTopic</CODE> both inherit from <CODE>Destination</CODE>, they
940      * can be used in the destination parameter to create a <CODE>
941      * MessageConsumer</CODE>.
942      *
943      * <P>
944      * A client uses a <CODE>MessageConsumer</CODE> object to receive
945      * messages that have been sent to a destination.
946      *
947      * @param destination
948      * the <CODE>Destination</CODE> to access
949      * @param messageSelector
950      * only messages with properties matching the message selector
951      * expression are delivered. A value of null or an empty string
952      * indicates that there is no message selector for the message
953      * consumer.
954      *
955      *
956      * @exception JMSException
957      * if the session fails to create a MessageConsumer due to
958      * some internal error.
959      * @exception InvalidDestinationException
960      * if an invalid destination is specified.
961      *
962      * @exception InvalidSelectorException
963      * if the message selector is invalid.
964      *
965      * @since 1.1
966      */

967     public final MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector) throws JMSException JavaDoc {
968
969         return createConsumer(destination,messageSelector,false);
970
971     }//createConsumer
972

973
974     /**
975      * Creates <CODE>MessageConsumer</CODE> for the specified destination,
976      * using a message selector. This method can specify whether messages
977      * published by its own connection should be delivered to it, if the
978      * destination is a topic.
979      * <P>
980      * Since <CODE>Queue</CODE> and <CODE>MantaTopic</CODE> both inherit
981      * from <CODE>Destination</CODE>, they can be used in the destination
982      * parameter to create a <CODE>MessageConsumer</CODE>.
983      * <P>
984      * A client uses a <CODE>MessageConsumer</CODE> object to receive
985      * messages that have been published to a destination.
986      *
987      * <P>
988      * In some cases, a connection may both publish and subscribe to a topic.
989      * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
990      * inhibit the delivery of messages published by its own connection. The
991      * default value for this attribute is False. The <CODE>noLocal</CODE>
992      * value must be supported by destinations that are topics.
993      *
994      * @param destination
995      * the <CODE>Destination</CODE> to access
996      * @param messageSelector
997      * only messages with properties matching the message selector
998      * expression are delivered. A value of null or an empty string
999      * indicates that there is no message selector for the message
1000     * consumer.
1001     * @param NoLocal -
1002     * if true, and the destination is a topic, inhibits the
1003     * delivery of messages published by its own connection. The
1004     * behavior for <CODE>NoLocal</CODE> is not specified if the
1005     * destination is a queue.
1006     *
1007     * @exception JMSException
1008     * if the session fails to create a MessageConsumer due to
1009     * some internal error.
1010     * @exception InvalidDestinationException
1011     * if an invalid destination is specified.
1012     *
1013     * @exception InvalidSelectorException
1014     * if the message selector is invalid.
1015     *
1016     * @since 1.1
1017     *
1018     */

1019    public final MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1020
1021        checkLegalOperation();
1022        if (destination==null)
1023            throw new InvalidDestinationException JavaDoc("MNJMS0007B : FAILED ON METHOD createConsumer(). NULL DESTINATION WAS SUPPLIED.");
1024
1025        //this will throw an InvalidSelectorException if the selector is bad.
1026
if(messageSelector!= null)
1027            messageSelector = messageSelector.trim();
1028        Selector s = new Selector(messageSelector);
1029        MantaMessageConsumer mmc;
1030        //Check that the destination exists on the network.
1031
MantaService service = null;
1032        byte type;
1033        if(destination instanceof Queue JavaDoc){
1034            owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_CONSUMER_FOR_QUEUE,destination.toString() );
1035            service = manta.getService(destination.toString(), MantaService.SERVICE_TYPE_QUEUE);
1036            type = MantaService.SERVICE_TYPE_QUEUE;
1037        }else{
1038            owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_CONSUMER_FOR_TOPIC,destination.toString() );
1039            service = manta.getService(destination.toString(), MantaService.SERVICE_TYPE_TOPIC);
1040            type = MantaService.SERVICE_TYPE_TOPIC;
1041        }
1042
1043        // For topics we get a service object only if the topic is real,
1044
// meaning the name doesn't contain wildcards.
1045
boolean isWildCard = (type == MantaService.SERVICE_TYPE_TOPIC) &&
1046                             VirtualTopicManager.isWildCardTopic(destination.toString());
1047        if (service == null && !isWildCard)
1048            throw new JMSException JavaDoc("MNJMS0007C : FAILED ON METHOD createConsumer(). COULD NOT REGISTER ON DESTINATION "+destination);
1049
1050        ServiceConsumer sActor;
1051        try {
1052            sActor = new ServiceConsumer(manta.getAgentName(), manta.getDomainName(),
1053                    destination.toString() , type,(byte)getAcknowledgeMode());
1054            if (type==MantaService.SERVICE_TYPE_TOPIC) {
1055                //register the session as a listener on that service
1056

1057                registerListener(sActor.getId());
1058
1059
1060            }
1061            sActor.setSelectorStatment(messageSelector);
1062
1063            mmc = new MantaMessageConsumer(sActor.getId(),this, destination,messageSelector,noLocal,sActor);
1064            messageConsumers.put(mmc.getClientId(),mmc);
1065            if (log.isInfoEnabled()) {
1066                log.info("Created local consumer "+sActor);
1067            }
1068            manta.advertiseService(sActor);
1069        }
1070        catch (MantaException me) {
1071            mmc = null;
1072            sActor = null;
1073            throw new JMSException JavaDoc("MNJMS0007D : FAILED ON METHOD createConsumer(). ERROR TEXT : "+me.getMessage());
1074
1075        }
1076        return mmc;
1077    }//createConsumer
1078

1079    /**
1080     * Creates a queue identity given a <CODE>Queue</CODE> name.
1081     *
1082     * <P>
1083     * This facility is provided for the rare cases where clients need to
1084     * dynamically manipulate queue identity. It allows the creation of a queue
1085     * identity with a provider-specific name. Clients that depend on this
1086     * ability are not portable.
1087     *
1088     * <P>
1089     * Note that this method is not for creating the physical queue. The
1090     * physical creation of queues is an administrative task and is not to be
1091     * initiated by the JMS API. The one exception is the creation of temporary
1092     * queues, which is accomplished with the <CODE>createTemporaryQueue
1093     * </CODE> method.
1094     *
1095     * @param queueName
1096     * the name of this <CODE>Queue</CODE>
1097     *
1098     * @return a <CODE>Queue</CODE> with the given name
1099     *
1100     * @exception JMSException
1101     * if the session fails to create a queue due to some
1102     * internal error.
1103     * @since 1.1
1104     */

1105    public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc {
1106
1107        checkLegalOperation();
1108        return new MantaQueue(queueName);
1109    }//createQueue
1110

1111    /**
1112     * Creates a topic identity given a <CODE>MantaTopic</CODE> name.
1113     *
1114     * <P>
1115     * This facility is provided for the rare cases where clients need to
1116     * dynamically manipulate topic identity. This allows the creation of a
1117     * topic identity with a provider-specific name. Clients that depend on
1118     * this ability are not portable.
1119     *
1120     * <P>
1121     * Note that this method is not for creating the physical topic. The
1122     * physical creation of topics is an administrative task and is not to be
1123     * initiated by the JMS API. The one exception is the creation of temporary
1124     * topics, which is accomplished with the <CODE>createTemporaryTopic
1125     * </CODE> method.
1126     *
1127     * @param topicName
1128     * the name of this <CODE>MantaTopic</CODE>
1129     *
1130     * @return a <CODE>MantaTopic</CODE> with the given name
1131     *
1132     * @exception JMSException
1133     * if the session fails to create a topic due to some
1134     * internal error.
1135     * @since 1.1
1136     */

1137    public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc {
1138
1139        checkLegalOperation();
1140        return new MantaTopic(topicName);
1141    }//createTopic
1142

1143
1144    /**
1145     * Creates a durable subscriber to the specified topic.
1146     *
1147     * <P>
1148     * If a client needs to receive all the messages published on a topic,
1149     * including the ones published while the subscriber is inactive, it uses a
1150     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1151     * record of this durable subscription and insures that all messages from
1152     * the topic's publishers are retained until they are acknowledged by this
1153     * durable subscriber or they have expired.
1154     *
1155     * <P>
1156     * Sessions with durable subscribers must always provide the same client
1157     * identifier. In addition, each client must specify a name that uniquely
1158     * identifies (within client identifier) each durable subscription it
1159     * creates. Only one session at a time can have a <CODE>TopicSubscriber
1160     * </CODE> for a particular durable subscription.
1161     *
1162     * <P>
1163     * A client can change an existing durable subscription by creating a
1164     * durable <CODE>TopicSubscriber</CODE> with the same name and a new
1165     * topic and/or message selector. Changing a durable subscriber is
1166     * equivalent to unsubscribing (deleting) the old one and creating a new
1167     * one.
1168     *
1169     * <P>
1170     * In some cases, a connection may both publish and subscribe to a topic.
1171     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1172     * inhibit the delivery of messages published by its own connection. The
1173     * default value for this attribute is false.
1174     *
1175     * @param topic
1176     * the non-temporary <CODE>MantaTopic</CODE> to subscribe to
1177     * @param name
1178     * the name used to identify this subscription
1179     *
1180     * @exception JMSException
1181     * if the session fails to create a subscriber due to some
1182     * internal error.
1183     * @exception InvalidDestinationException
1184     * if an invalid topic is specified.
1185     *
1186     * @since 1.1
1187     */

1188    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name) throws JMSException JavaDoc {
1189
1190        return createDurableSubscriber(topic,name,null,false);
1191    }//createDurableSubscriber
1192

1193
1194    /**
1195     * Creates a durable subscriber to the specified topic, using a message
1196     * selector and specifying whether messages published by its own connection
1197     * should be delivered to it.
1198     *
1199     * <P>
1200     * If a client needs to receive all the messages published on a topic,
1201     * including the ones published while the subscriber is inactive, it uses a
1202     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1203     * record of this durable subscription and insures that all messages from
1204     * the topic's publishers are retained until they are acknowledged by this
1205     * durable subscriber or they have expired.
1206     *
1207     * <P>
1208     * Sessions with durable subscribers must always provide the same client
1209     * identifier. In addition, each client must specify a name which uniquely
1210     * identifies (within client identifier) each durable subscription it
1211     * creates. Only one session at a time can have a <CODE>TopicSubscriber
1212     * </CODE> for a particular durable subscription. An inactive durable
1213     * subscriber is one that exists but does not currently have a message
1214     * consumer associated with it.
1215     *
1216     * <P>
1217     * A client can change an existing durable subscription by creating a
1218     * durable <CODE>TopicSubscriber</CODE> with the same name and a new
1219     * topic and/or message selector. Changing a durable subscriber is
1220     * equivalent to unsubscribing (deleting) the old one and creating a new
1221     * one.
1222     *
1223     * @param topic
1224     * the non-temporary <CODE>MantaTopic</CODE> to subscribe to
1225     * @param name
1226     * the name used to identify this subscription
1227     * @param messageSelector
1228     * only messages with properties matching the message selector
1229     * expression are delivered. A value of null or an empty string
1230     * indicates that there is no message selector for the message
1231     * consumer.
1232     * @param noLocal
1233     * if set, inhibits the delivery of messages published by its
1234     * own connection
1235     *
1236     * @exception JMSException
1237     * if the session fails to create a subscriber due to some
1238     * internal error.
1239     * @exception InvalidDestinationException
1240     * if an invalid topic is specified.
1241     * @exception InvalidSelectorException
1242     * if the message selector is invalid.
1243     *
1244     * @since 1.1
1245     */

1246    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1247
1248        checkLegalOperation();
1249        owningConnection.authorize(SecurityActionTypes.ACTION_SUBSCRIBE_DURABLE_ON_TOPIC,topic.toString() );
1250
1251        if (topic==null)
1252            throw new InvalidDestinationException JavaDoc("MNJMS0007E : FAILED ON METHOD createDurableSubscriber(). A NULL TOPIC WAS SPECIFIED.");
1253
1254        //this will throw an InvalidSelectorException when the selector is bad
1255
//Aviad - add the same fix for empty selector String as for createConsumer
1256
if(messageSelector!= null) {
1257            messageSelector = messageSelector.trim();
1258        }
1259        Selector s = new Selector(messageSelector);
1260
1261        // removing an old subscription before creating a new one
1262
cleanSubscriptionIfNeeded(topic, name, messageSelector, noLocal);
1263
1264        // now we can create a new subscription.
1265
MantaTopicSubscriber newSub = null;
1266        MantaService service = manta.getService(topic.toString(), MantaService.SERVICE_TYPE_TOPIC);
1267        // For topics we get a service object only if the topic is real,
1268
// meaning the name doesn't contain wildcards.
1269
boolean isWildCard = VirtualTopicManager.isWildCardTopic(topic.toString());
1270        if (service == null && !isWildCard)
1271            throw new InvalidDestinationException JavaDoc("MNJMS0007F : FAILED ON METHOD createDurableSubscriber(). TOPIC "+topic+" NOT VALID.");
1272
1273        //register as a durable subscriber for that topic.
1274
ServiceConsumer sActor;
1275
1276        try {
1277            sActor = new ServiceConsumer(manta.getAgentName(), manta.getDomainName(),
1278                    topic.toString() , MantaService.SERVICE_TYPE_TOPIC,(byte)getAcknowledgeMode(),name);
1279            DurableSubscribers.put(name,sActor);
1280            registerListener(sActor.getId());
1281            newSub = new MantaTopicSubscriber(sActor.getId(),this,topic,noLocal,true,name,messageSelector,sActor);
1282            //register ServiceActor and subscriber in the internal registries.
1283
messageConsumers.put(newSub.getClientId(),newSub);
1284            sActor.setSelectorStatment(messageSelector);
1285            //bug:421 sActor.setNoLocal(noLocal);
1286
if (log.isInfoEnabled()) {
1287                log.info("Created local durable subscriber "+sActor);
1288            }
1289            manta.advertiseService(sActor);
1290
1291        }
1292        catch (MantaException me) {
1293            newSub = null;
1294            sActor = null;
1295            throw new JMSException JavaDoc("MNJMS00080 : FAILED ON METHOD CreateDurableSubscriber(). ERROR TEXT : "+me.getMessage());
1296        }
1297        return newSub;
1298
1299    }//createDurableSubscriber
1300

1301
1302    private void cleanSubscriptionIfNeeded(Topic JavaDoc topic, String JavaDoc name, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1303        MantaTopicSubscriber old = (MantaTopicSubscriber)messageConsumers.get(name);
1304        if (old != null) {
1305            // if the subscription's topic/selector/noLocal were changed
1306
// we need to recreate the subscription. If the application tries
1307
// to create a new durable subscriber but with the same name
1308
// an exception is thrown.
1309
if (//bug:421 old.getNoLocal() != noLocal ||
1310
!checkEqual(old.getMessageSelector(), messageSelector) ||
1311                !((Topic JavaDoc)old.getDestination()).getTopicName().equals(topic.getTopicName())) {
1312                if (log.isDebugEnabled()) {
1313                    log.debug("The durable subscriber '"+name+"' was changed. Deleting old subscription and creating new subscription.");
1314                }
1315                old.close();
1316                unsubscribe(name);
1317                return;
1318            }
1319            else {
1320                throw new JMSException JavaDoc("A durable subscriber with the name '"+name+"' already exists.");
1321            }
1322        }
1323        // if we are here no message consumers was up.
1324
// see that a subscription exists. if it's parameters are chaged delete it.
1325
ServiceConsumer durable = (ServiceConsumer)DurableSubscribers.get(name);
1326        if(durable != null) {
1327            if (//bug:421 durable.getNoLocal() != noLocal ||
1328
!checkEqual(durable.getSelectorStatment(), messageSelector) ||
1329                !durable.getServiceName().equals(topic.getTopicName())) {
1330                if (log.isDebugEnabled()) {
1331                    log.debug("The durable subscriber '"+name+"' was changed. Deleting old subscription and creating new subscription.");
1332                }
1333                unsubscribe(name);
1334            }
1335        }
1336    }
1337
1338    private boolean checkEqual(Object JavaDoc o1, Object JavaDoc o2) {
1339        if (o1 == null) {
1340            return o2 == null;
1341        }
1342        if (o2 != null) {
1343            return o1.equals(o2);
1344        }
1345        return false;
1346    }
1347
1348    /**
1349     *
1350     * this method is the receiving method for getting messages.
1351     * all consumers (that are not listeners) use this method in order
1352     * to receive their messages.
1353     * corresponding to the spec, this is synchronized, as only one
1354     * consumer is allowed access to the session at any given time
1355     * listeners should not be defined on a session that has synch (blocking)
1356     * receivers - so that's alright.
1357     *
1358     * @param destination - the queue/topic to listen on
1359     * @param timeout - -1 : indefinite.
1360     * - 0 : nowait receive.
1361     * - positive millis to wait.
1362     * @return a Message.
1363     */

1364    public MantaBusMessage receive(ServiceConsumer consumer, long timeout) throws JMSException JavaDoc, MantaException
1365    {
1366        checkLegalOperation();
1367        MantaBusMessage msg = null;
1368        synchronized (lockMonitor) {
1369            if (isClosing || isClosed)
1370                return null;
1371
1372            if (!owningConnection.isStarted()) {
1373                long startTime = System.currentTimeMillis();
1374                try {
1375                    lockMonitor.wait(timeout);
1376                } catch (InterruptedException JavaDoc e) {
1377                    if (log.isErrorEnabled()) {
1378                        log.error("Error while waiting for the session to resume. ", e);
1379                    }
1380                }
1381                timeout = timeout-(System.currentTimeMillis()-startTime);
1382                if (timeout < 1000) //not enough for a receiveNoWait even
1383
return null;
1384            }
1385
1386            //check first if this is a topic. if it is - there's a special
1387
//handling for a topic's receive.
1388
if ( (manta.getService(consumer.getServiceName(), consumer.getType())).getServiceType()==MantaService.SERVICE_TYPE_TOPIC) {
1389
1390                if (timeout==-1) {
1391                    return null; //we can not do that in our imp.
1392
}
1393
1394
1395                //deregister the topic subscriber as a listener on the topic.
1396
//this is done because, when going into a receive on a topic, you
1397
//can not use it to get asynch, but - if we leave it in - then the consumer
1398
//itself will get the message and not our internal listener.
1399
removeSessionFrom(consumer.getId());
1400                //register a nice listener.
1401
ReceiveListener listen = new ReceiveListener();
1402                manta.subscribeMessageListener(listen,consumer.getId());
1403                msg = listen.waitForInfo(timeout);
1404            }
1405            else { //this is a queue - a regular receive is good enough.
1406
if (timeout == 0) {
1407                    msg = manta.receive(consumer);
1408                }
1409                else if (timeout == -1) {
1410                    msg = manta.receiveNoWait(consumer);
1411                }
1412                else { //timeout was given
1413
msg = manta.receive(consumer,timeout);
1414                }
1415            }
1416        }
1417
1418        if (msg != null) {
1419            // start a transaction if needed
1420
this.startLocalTransactionIfNeeded();
1421
1422            if (sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE || this.getTransacted()) {
1423                synchronized (unackedMessages) {
1424                    unackedMessages.add(msg);
1425                }
1426            }
1427        }
1428
1429        return msg;
1430    }
1431
1432    /**
1433     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1434     * the specified queue.
1435     *
1436     * @param queue
1437     * the <CODE>queue</CODE> to access
1438     *
1439     *
1440     * @exception JMSException
1441     * if the session fails to create a browser due to some
1442     * internal error.
1443     * @exception InvalidDestinationException
1444     * if an invalid destination is specified
1445     *
1446     * @since 1.1
1447     */

1448    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc
1449    {
1450        return createBrowser(queue, null);
1451    }
1452
1453    /**
1454     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1455     * the specified queue using a message selector.
1456     *
1457     * @param queue
1458     * the <CODE>queue</CODE> to access
1459     *
1460     * @param messageSelector
1461     * only messages with properties matching the message selector
1462     * expression are delivered. A value of null or an empty string
1463     * indicates that there is no message selector for the message
1464     * consumer.
1465     *
1466     * @exception JMSException
1467     * if the session fails to create a browser due to some
1468     * internal error.
1469     * @exception InvalidDestinationException
1470     * if an invalid destination is specified
1471     * @exception InvalidSelectorException
1472     * if the message selector is invalid.
1473     *
1474     * @since 1.1
1475     */

1476    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc
1477    {
1478        checkLegalOperation();
1479        owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_BROSWER_FOR_QUEUE,queue.toString() );
1480
1481        if (queue == null)
1482            throw new InvalidDestinationException JavaDoc("MNJMS00081 : FAILED ON METHOD createBrowser(). A NULL QUEUE WAS SPECIFIED.");
1483
1484        MantaQueueBrowser mqb = null;
1485        MantaService service = manta.getService(queue.toString(), MantaService.SERVICE_TYPE_QUEUE);
1486        if (service == null)
1487            throw new InvalidDestinationException JavaDoc("MNJMS00082 : FAILED ON METHOD createBrowser(). QUEUE "+queue+" NOT VALID.");
1488
1489        ServiceConsumer sActor;
1490
1491        try {
1492            sActor = new ServiceConsumer(manta.getAgentName(), manta.getDomainName(),
1493                    service.getServiceName() , service.getServiceType(),(byte)getAcknowledgeMode());
1494            mqb = new MantaQueueBrowser(sActor.getId(),this, queue,messageSelector,sActor);
1495            sActor.setSelectorStatment(messageSelector);
1496            if (log.isInfoEnabled()) {
1497                log.info("Created local queue browser "+sActor);
1498            }
1499            manta.advertiseService(sActor);
1500
1501        }
1502        catch (MantaException me) {
1503            mqb = null;
1504            sActor = null;
1505            throw new JMSException JavaDoc("MNJMS00083 : FAILED ON METHOD createBrowser(). ERROR TEXT : "+me.getMessage());
1506
1507        }
1508
1509
1510
1511        return mqb;
1512
1513    }
1514
1515
1516    /**
1517     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be
1518     * that of the <CODE>MantaConnection</CODE> unless it is deleted
1519     * earlier.
1520     *
1521     * @return a temporary queue identity
1522     *
1523     * @exception JMSException
1524     * if the session fails to create a temporary queue due to
1525     * some internal error.
1526     *
1527     * @since 1.1
1528     */

1529    public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
1530
1531        checkLegalOperation();
1532
1533        return owningConnection.addTempQueue();
1534
1535    }//createTemporaryQueue
1536

1537
1538    /**
1539     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be
1540     * that of the <CODE>MantaConnection</CODE> unless it is deleted
1541     * earlier.
1542     *
1543     * @return a temporary topic identity
1544     *
1545     * @exception JMSException
1546     * if the session fails to create a temporary topic due to
1547     * some internal error.
1548     *
1549     * @since 1.1
1550     */

1551    public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
1552
1553        checkLegalOperation();
1554        TemporaryTopic JavaDoc t = owningConnection.addTempTopic();
1555        return new MantaTemporaryTopic(t.getTopicName(),this.owningConnection);
1556    }//createTemporaryTopic
1557

1558
1559    /**
1560     * Unsubscribes a durable subscription that has been created by a client.
1561     *
1562     * <P>
1563     * This method deletes the state being maintained on behalf of the
1564     * subscriber by its provider.
1565     *
1566     * <P>
1567     * It is erroneous for a client to delete a durable subscription while
1568     * there is an active <CODE>MessageConsumer</CODE> or <CODE>
1569     * TopicSubscriber</CODE> for the subscription, or while a consumed
1570     * message is part of a pending transaction or has not been acknowledged in
1571     * the session.
1572     *
1573     * @param name
1574     * the name used to identify this subscription
1575     *
1576     * @exception JMSException
1577     * if the session fails to unsubscribe to the durable
1578     * subscription due to some internal error.
1579     * @exception InvalidDestinationException
1580     * if an invalid subscription name is specified.
1581     *
1582     * @since 1.1
1583     */

1584    public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
1585
1586        checkLegalOperation();
1587        // implemented by Amir Shevat
1588
ServiceActor durable = (ServiceActor) DurableSubscribers.remove(name);
1589        if(durable != null){
1590            if (log.isInfoEnabled())
1591                log.info("Unsubscribing durable service consumer: "+durable);
1592            try {
1593                manta.recallDurableSubscription(durable);
1594            } catch (MantaException me) {
1595                throw new JMSException JavaDoc("MNJMS00083 : FAILED ON METHOD unsubscribe(). FROM TOPIC "+name+" ERROR TEXT : "+me.getMessage());
1596
1597            }
1598        }
1599
1600    }//unsubscribe
1601

1602    final void removeSessionFrom(String JavaDoc dest) {
1603       manta.unsubscribeFromTopic(this,dest);
1604    }
1605
1606    /*
1607     * The send()method is used by all MessageProviders to send messages to their various
1608     * destinations via the session.
1609     * The spec enforces a messaging order. Therefore, the send method must be
1610     * synchronized.
1611     *
1612     * @param msg - the message to send.
1613     */

1614     synchronized void sendMessage(ServiceProducer sp, Message JavaDoc orig) throws JMSException JavaDoc {
1615
1616        checkLegalOperation();
1617
1618        MantaMessage msg;
1619        if(orig instanceof MantaMessage){
1620            msg = ((MantaMessage)orig).makeCopy();
1621        }else{
1622            msg = fromForeignMsgToManta(orig);
1623        }
1624
1625        Destination JavaDoc dest = msg.getJMSDestination();
1626
1627        //if transacted - cache message without sending until commit.
1628

1629        this.startLocalTransactionIfNeeded();
1630
1631        if (this.sessionAcknowledgementMode == SESSION_TRANSACTED) {
1632            msg.setJMSMessageID("ID:in-transaction");
1633            orig.setJMSMessageID("ID:in-transaction");
1634            heldMessages.add(new HeldMessage(sp,msg));
1635            if (log.isDebugEnabled()) {
1636                log.debug("Transacted session: Caching message until commit is invoked. Message="+msg);
1637            }
1638        }
1639        else{
1640            //send the message to the owning connection for sending:
1641
//differentiate if topic or queue.
1642
//Aviad - the time stamp should be set duringthe send for all session types - moving to MantaMessageProducer.send
1643
//msg.setJMSTimestamp(SystemTime.gmtCurrentTimeMillis());
1644
MantaBusMessage mbm = prepareMessageForSending(msg);
1645            orig.setJMSMessageID(msg.JMSMessageId);
1646            msg.setWriteableState(false);
1647            if (sp.getServiceType()==MantaService.SERVICE_TYPE_QUEUE) {
1648                try {
1649                    if (log.isDebugEnabled()) {
1650                        log.debug("About to send massage to queue. Message ID="+mbm.getMessageId()+", Queue="+sp.getServiceName());
1651                    }
1652                    manta.enqueueMessage(mbm,
1653                                         sp,
1654                                         (byte)msg.getJMSDeliveryMode(),
1655                                         (byte)msg.getJMSPriority(),
1656                                         msg.getJMSExpiration());
1657                }
1658                catch (MantaException me) {
1659                    mbm = null;
1660                    throw new JMSException JavaDoc("MNJMS00084 : FAILED ON METHOD sendMessage(). ERROR TEXT : "+me.getMessage());
1661                }
1662            }
1663            //this is a topic - use the publish method.
1664
else {
1665                try {
1666                    if (log.isDebugEnabled()) {
1667                        log.debug("Sending massage to topic. Message ID="+mbm.getMessageId()+", Topic="+sp.getServiceName());
1668                    }
1669                    manta.publish(mbm,sp);
1670                }
1671                catch (MantaException me) {
1672                    mbm = null;
1673                    throw new JMSException JavaDoc("MNJMS00085 : COULD NOT PUBLISH TO "+sp.getServiceName()+" FAILED ON METHOD sendMessage(). ERROR TEXT : "+me.getMessage());
1674                }
1675            }
1676            //after the client has sent the message, and upon its
1677
//return, the message should be writable again.
1678
}
1679        if (orig instanceof MantaMessage)
1680            ((MantaMessage) orig).setWriteableState(true);
1681     }
1682
1683
1684     void ackAllMessages(Collection JavaDoc msgs) throws JMSException JavaDoc {
1685         Iterator JavaDoc ackIterator = msgs.iterator();
1686         while (ackIterator.hasNext()) {
1687             MantaBusMessage mbm = (MantaBusMessage)ackIterator.next();
1688             owningConnection.ack(mbm);
1689         }
1690         msgs.clear();
1691     }
1692
1693     //used to send all messages in case of a transacted session.
1694
//this goes over the repository and sends all messages
1695
//withheld - waiting for commit.
1696

1697     protected void sendAllMessages(Collection JavaDoc msgs) throws JMSException JavaDoc {
1698
1699        Destination JavaDoc dest;
1700        MantaMessage msg;
1701        Iterator JavaDoc msgIterator = msgs.iterator();
1702        HeldMessage hm;
1703
1704        while (msgIterator.hasNext()) {
1705            hm = (HeldMessage)msgIterator.next();
1706            msg = hm.msg;
1707            dest = msg.getJMSDestination();
1708//Aviad this should be done only during send and not during commit
1709
// msg.setJMSTimestamp(System.currentTimeMillis());
1710
MantaBusMessage mbm = prepareMessageForSending(msg);
1711            ServiceProducer sp = hm.service;
1712            if (dest instanceof Queue JavaDoc) {
1713                try {
1714                    manta.enqueueMessage(mbm,
1715                                         sp,
1716                                         (byte)msg.getJMSDeliveryMode(),
1717                                         (byte)msg.getJMSPriority(),
1718                                         msg.getJMSExpiration());
1719                }
1720                catch (MantaException me) {
1721                    mbm = null;
1722                    sp = null;
1723                    throw new JMSException JavaDoc("MNJMS00086 : FAILED ON METHOD sendAllMessages(). ERROR TEXT : "+me.getMessage());
1724                }
1725            }
1726            //this is a topic - use the publish method.
1727
else {
1728                try {
1729                    manta.publish(mbm,sp);
1730                }
1731                catch (Exception JavaDoc me) { //need to be MantaException
1732
mbm = null;
1733                    throw new JMSException JavaDoc("MNJMS00087 : FAILED ON METHOD sendAllMessages(). ERROR TEXT : "+me.getMessage());
1734                }
1735            }
1736            //delete the message that was just sent
1737
msgIterator.remove();
1738        }
1739     }
1740
1741
1742    /*
1743     * This method is called by every other method on the session, and only allows
1744     * operations when the session is functional.
1745     * If the session is closed, or in the process of closing - this method
1746     * will indicate so with an exception.
1747     *
1748     * @throws JMSException - when the session is in an invalid state.
1749     */

1750     void checkLegalOperation() throws JMSException JavaDoc
1751    {
1752        if (isClosed || isClosing)
1753          throw new IllegalStateException JavaDoc("MNJMS00088 : OPERATION FAILED ON METHOD checkLegalOperation(). SESSION IS CLOSED.");
1754    }
1755
1756     /**
1757     * When registering a session as a listener - you have to have this
1758     * method - so the listened objects can call on it.
1759     * After doing so, the session should pass the message on to the correct
1760     * consumer, as identified by the message credentials.
1761     */

1762    public void onMessage(MantaBusMessage msg) {
1763        innerQueue.enqueue(msg);
1764    }
1765
1766
1767    void ackOrHold(MantaBusMessage mbm) throws JMSException JavaDoc {
1768        if (getTransacted() || sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
1769            if (mbm != null) {
1770                synchronized(unackedMessages) {
1771                    unackedMessages.add(mbm);
1772                }
1773            }
1774        }
1775        else {
1776            ackMessage(mbm);
1777        }
1778    }
1779    /**
1780     * This method is used to ack messages that the client wishes to
1781     * ack by himself - the ACKNOWLEDGE messages.
1782     * All it actually does is ask the MantaAgent itself for the ack
1783     * method, after checking that the mode is on.
1784     *
1785     * @param msg
1786     * @throws JMSException
1787     */

1788     void ackMessage (MantaBusMessage msg) throws JMSException JavaDoc {
1789        checkLegalOperation();
1790
1791        //if (sessionTransactedMode)
1792
if (getTransacted())
1793            return;
1794
1795        if (sessionAcknowledgementMode != Session.CLIENT_ACKNOWLEDGE) {
1796            if (msg != null)
1797                owningConnection.ack(msg);
1798        }
1799        else {
1800            synchronized (unackedMessages) {
1801                Iterator JavaDoc ackIterator = unackedMessages.iterator();
1802                while (ackIterator.hasNext()) {
1803                    MantaBusMessage cbm = (MantaBusMessage)ackIterator.next();
1804                    owningConnection.ack(cbm);
1805                }
1806                unackedMessages.clear();
1807            }
1808        }
1809    }
1810
1811
1812    //topic specific methods:
1813

1814    /**
1815     * Creates a TopicPublisher for this session
1816     */

1817
1818    public TopicPublisher JavaDoc createPublisher(Topic JavaDoc topic) throws JMSException JavaDoc{
1819        return (TopicPublisher JavaDoc) createProducer(topic);
1820    }//createPublisher
1821

1822    /**
1823     * Creates a TopicSubscriber for this session
1824     */

1825    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic) throws JMSException JavaDoc {
1826        return (TopicSubscriber JavaDoc) createConsumer(topic);
1827    }
1828
1829    /**
1830     * Creates a TopicSubscriber for this session
1831     */

1832    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1833
1834        //create the consumer as subscriber
1835
TopicSubscriber JavaDoc theSub = (TopicSubscriber JavaDoc)createConsumer(topic, messageSelector, noLocal);
1836
1837        return theSub;
1838    }
1839
1840
1841    //queue specific stuff
1842

1843    /**
1844     * Creates a QueueReceiver on this session.
1845     */

1846    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc {
1847      return (QueueReceiver JavaDoc) createConsumer(queue, messageSelector);
1848    }
1849
1850    /**
1851     * Creates a QueueReceiver on this session.
1852     */

1853    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue) throws JMSException JavaDoc
1854    {
1855      return (QueueReceiver JavaDoc) createConsumer(queue);
1856    }
1857
1858    /**
1859     * Creates a QueueSender on this session.
1860     */

1861    public QueueSender JavaDoc createSender(Queue JavaDoc queue) throws JMSException JavaDoc
1862    {
1863      return (QueueSender JavaDoc) createProducer(queue);
1864    }
1865
1866
1867    /*
1868     * This method will prepare a MantaBusMessage for sending.
1869     *
1870     * @param message the JMS message to be sent
1871     * @return the new MantaBusMessage
1872     */

1873    private MantaBusMessage prepareMessageForSending(Message JavaDoc message) throws JMSException JavaDoc {
1874
1875
1876        MantaBusMessage mantaBusMessage = manta.getMantaBusMessage();
1877        message.setJMSMessageID("ID:"+mantaBusMessage.getMessageId());
1878        mantaBusMessage.setPayload((Byteable)message);
1879        mantaBusMessage.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
1880        mantaBusMessage.setPriority((byte)(message.getJMSPriority()));
1881        mantaBusMessage.addHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE,MantaBusMessageConsts.PAYLOAD_TYPE_JMS);
1882        mantaBusMessage.setDeliveryMode((byte) message.getJMSDeliveryMode());
1883        mantaBusMessage.setValidUntil(message.getJMSExpiration());
1884
1885        return mantaBusMessage;
1886
1887    }//prepareMessageForSend
1888

1889    /*
1890     * a consumer uses this method to remove itself from the session upon closing.
1891     */

1892     void removeConsumer(MantaMessageConsumer mc) throws JMSException JavaDoc {
1893
1894         //if (isClosed || isClosing)
1895
if (isClosed) {
1896             return;
1897         }
1898
1899         messageConsumers.remove(mc.getClientId());
1900         manta.unsubscribeMessageListener(this, mc.getService().getId());
1901
1902         if (!mc.getService().getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) {
1903             // allow acks get there before the recall does
1904
try {
1905                 Thread.sleep(500);
1906             } catch (InterruptedException JavaDoc ie) {
1907                 if(log.isInfoEnabled()) {
1908                     log.info("removeConsumer() : acks may have been lost for this consumer - "+mc);
1909                 }
1910             }
1911         }
1912
1913         // remove from manta agent
1914
try {
1915             if(mc.getService()!= null){
1916                 if (log.isInfoEnabled()) {
1917                     if (!mc.getService().isDurable()) {
1918                         log.info("Recalling local consumer "+mc.getService());
1919                     }
1920                     else {
1921                         log.info("Recalling local durable subscriber "+mc.getService());
1922                     }
1923                 }
1924                 manta.recallService(mc.getService());
1925             }
1926         }
1927         catch (MantaException ce) {
1928             if (log.isErrorEnabled()) {
1929                 log.error("removeConsumer(): could not remove service "+mc.getService().getServiceName(), ce);
1930             }
1931             throw new JMSException JavaDoc("MNJMS00077 : FAILED ON close(). CONSUMER ON SERVICE "+
1932                        mc.getService().getServiceName()+" WAS NOT RECALLED. ERROR TEXT : "+ce.getMessage());
1933         }
1934    }
1935
1936    /*
1937     * this is used specifically for a queue browser, since all consumers
1938     * go to the same repository, but a queue browser is not a consumer.
1939     */

1940     void removeBrowser(MantaQueueBrowser qb) throws JMSException JavaDoc {
1941
1942         if (log.isInfoEnabled()) {
1943            log.info("Recalling local queue browser "+qb.service);
1944         }
1945        try {
1946            manta.recallService(qb.service);
1947        }
1948        catch (MantaException me) {
1949            if (log.isErrorEnabled()) {
1950                log.error("removeBrowser(): could not remove browser "+qb.getService(),me);
1951            }
1952            throw new JMSException JavaDoc(me.getMessage());
1953        }
1954    }
1955
1956     /*
1957      * used to remove a producer from the session.
1958      *
1959      * @param mp - the producer to remove
1960      */

1961     void removeProducer(MantaMessageProducer mp) throws JMSException JavaDoc {
1962
1963         //if (isClosed||isClosing)
1964
if (isClosed)
1965            return;
1966
1967        messageProducers.remove(mp.getClientId());
1968
1969        //remove from the connection.
1970
try {
1971            if(mp.getService()!= null){
1972                if (log.isInfoEnabled()) {
1973                    log.info("Recalling local producer "+mp.getService());
1974                }
1975                manta.recallService(mp.getService());
1976            }
1977        }
1978        catch (MantaException ce) {
1979            if (log.isErrorEnabled()) {
1980                log.error("removeProducer(): could not remove service "+mp.getService().getServiceName(), ce);
1981            }
1982            throw new JMSException JavaDoc("MNJMS00076 : FAILED ON close(). PRODUCER ON SERVICE "+
1983                    mp.getService().getServiceName()+" WAS NOT RECALLED. ERROR TEXT : "+ce.getMessage());
1984        }
1985
1986    }
1987
1988     /*
1989      * Get messages for a queue browser.
1990      * @param qb - the QueueBrowser to get the messages for
1991      * @return - an Enumeration containing the messages
1992      *
1993      * @throws JMSException
1994      */

1995     Enumeration JavaDoc getMessagesFor(MantaQueueBrowser qb) throws JMSException JavaDoc {
1996        try {
1997
1998            return manta.peekAtQueue(qb.getService());
1999        }
2000        catch (MantaException me) {
2001           throw new JMSException JavaDoc("MNJMS0008A : FAILED ON METHOD getMessagesFor(). ERROR TEXT : "+me.getMessage());
2002        }
2003
2004     }
2005
2006     /*
2007      * Add a ConnectionConsumer message.
2008      */

2009     void addConsumerMessage(MantaBusMessage msg) {
2010        synchronized (consumerMessages) {
2011            this.consumerMessages.add(msg);
2012        }
2013     }
2014
2015
2016     // called during rollback to redeliver unacked messaged
2017
// the messages are added to the beginning of the list.
2018
void addConsumerMessages(Collection JavaDoc msgs) {
2019        synchronized (consumerMessages) {
2020            this.consumerMessages.addAll(0,msgs);
2021        }
2022     }
2023
2024     public boolean hasConsumerMessages() {
2025         synchronized (consumerMessages) {
2026             return !consumerMessages.isEmpty();
2027         }
2028     }
2029
2030
2031    /*
2032     * register a listener for a specific service.
2033     */

2034    void registerListener(String JavaDoc regString) throws MantaException {
2035
2036            manta.subscribeMessageListener(this,regString);
2037    }
2038
2039    /*
2040     * Registers as a queue-listener for a MessageConsumer
2041     *
2042     * @param mmc - the MantaMessageConsumer object.
2043     */

2044    void listenToQueue(MantaMessageConsumer mmc) throws JMSException JavaDoc {
2045
2046        try {
2047             this.manta.subscribeToQueue(mmc.theService, this);
2048
2049          } catch (Exception JavaDoc e) {
2050              throw new InvalidDestinationException JavaDoc("MNJMS0008B : FAILED ON METHOD listenToQueue(). ERROR TEXT : "+e.getMessage());
2051
2052          }
2053
2054    }
2055
2056    void deregisterFromQueue(MantaMessageConsumer mmc) {
2057        try {
2058            this.manta.unsubscribeFromQueue(mmc.theService,this);
2059        } catch (Exception JavaDoc e) {
2060            e.printStackTrace();
2061        }
2062    }
2063
2064
2065    protected int sessionAcknowledgementMode = AUTO_ACKNOWLEDGE;
2066    //protected boolean sessionTransactedMode = false;
2067
protected boolean isStopped;
2068    protected boolean isClosed = false;
2069    protected boolean isClosing = false;
2070    protected MantaConnection owningConnection;
2071    protected MessageListener JavaDoc sessionListener = null;
2072    protected Object JavaDoc lockMonitor = new Object JavaDoc();
2073    protected static long internalId = 0;
2074    protected String JavaDoc sessId;
2075    protected Set JavaDoc heldMessages;
2076    protected ArrayList JavaDoc consumerMessages;
2077    protected LinkedHashSet JavaDoc unackedMessages;
2078    protected Hashtable JavaDoc messageConsumers = new Hashtable JavaDoc();
2079    protected Hashtable JavaDoc messageProducers = new Hashtable JavaDoc();
2080    protected Hashtable JavaDoc DurableSubscribers = new Hashtable JavaDoc();
2081    protected Stage innerQueue;
2082
2083    /*
2084     *
2085     * @author Nimo
2086     *
2087     * A class that encapsulates a message held by this session when committed.
2088     */

2089    class HeldMessage implements Byteable{
2090        ServiceProducer service;
2091        MantaMessage msg;
2092
2093        public HeldMessage(ServiceProducer sp, MantaMessage m) {
2094            msg = m;
2095            service = sp;
2096
2097        }
2098
2099        public String JavaDoc getByteableName() {
2100
2101            return "org.mr.api.jms.MantaSession$HeldMessage";
2102        }
2103
2104        public void toBytes(ByteableOutputStream out) throws IOException JavaDoc {
2105
2106            out.writeByteable(service);
2107            out.writeByteable(msg);
2108        }
2109
2110        public Byteable createInstance(ByteableInputStream in) throws IOException JavaDoc {
2111
2112
2113            HeldMessage hm = new HeldMessage((ServiceProducer) in.readByteable(),(MantaMessage)in.readByteable());
2114            return hm;
2115        }
2116
2117        public void registerToByteableRegistry() {
2118            ByteableRegistry.registerByteableFactory(getByteableName() , this);
2119
2120        }
2121    }
2122
2123    /* (non-Javadoc)
2124     * @see org.mr.core.util.StageHandler#handle(java.lang.Object)
2125     */

2126    public boolean handle(Object JavaDoc event) {
2127
2128        while (isStopped) {
2129            // this is a loop in order to prevent the case when the
2130
// thread is interrupted and released prematurely.
2131
try {
2132                synchronized(lockMonitor) {
2133                    // The stop() thread is waiting to hear that the
2134
// Execution thread is suspened. Notify it.
2135
lockMonitor.notify();
2136
2137                    lockMonitor.wait();
2138                }
2139            }
2140            catch(InterruptedException JavaDoc ie) {
2141                // can that happen? not taking any risks!
2142
if (log.isErrorEnabled()) {
2143                    log.error("Error while waiting for the session to resume. ", ie);
2144                }
2145            }
2146
2147            // If the session was closed after it was stopped,
2148
// return and kill the thread
2149
if (this.isClosing || this.isClosed) {
2150                return false;
2151            }
2152        }
2153
2154        // when stopping the session, the stopEvent object was entered
2155
// into the stage queue. If this is the current object return
2156
// and dequeue again.
2157
if (event == stopEvent) {
2158            return true;
2159        }
2160
2161        MantaBusMessage msg = (MantaBusMessage) event;
2162
2163        if (msg!=null) {
2164
2165            // this code is for connection consumers to get bus messages
2166
// to forward to the server session's session.
2167
if (busListener != null) {
2168                busListener.onMessage(msg);
2169                return true;
2170            }
2171
2172            String JavaDoc consumer = ((ServiceActor)msg.getRecipient()).getId();
2173            MantaMessageConsumer destConsumer =
2174            (MantaMessageConsumer)messageConsumers.get(consumer);
2175
2176            if (destConsumer != null) {
2177                synchronized (listenersCount) {
2178                    listenersCount.add();
2179                }
2180                synchronized (destConsumer) {
2181                    if (!destConsumer.isClosed) {
2182                        try {
2183                            destConsumer.feedMessageListener(msg);
2184                        }
2185                        catch (JMSException JavaDoc jmse) {
2186                            log.error("Exception occured in listeners feeding stage", jmse);
2187                        }
2188                    }
2189                }
2190                synchronized (listenersCount) {
2191                    listenersCount.remove();
2192                    if (listenersCount.val() == 0) {
2193                        listenersCount.notifyAll();
2194                    }
2195                }
2196            }
2197            else {
2198                if (log.isDebugEnabled())
2199                    log.debug("A message arrived for a recipient that's closed or not registered on this session.");
2200            }
2201        }
2202        return true;
2203    }
2204
2205    /**
2206     * converts non mantaRay message to mantaray message
2207     * @return
2208     * @throws JMSException
2209     */

2210    private MantaMessage fromForeignMsgToManta(Message JavaDoc foreignMessage) throws JMSException JavaDoc {
2211
2212            MantaMessage mantaResult = null;
2213            if (foreignMessage instanceof TextMessage JavaDoc) {
2214                // covert from Foreign text to Manta Text
2215
TextMessage JavaDoc mantaTextMsg = (TextMessage JavaDoc) foreignMessage;
2216                MantaTextMessage msg = new MantaTextMessage();
2217                msg.setText(mantaTextMsg.getText());
2218                mantaResult = msg;
2219            }
2220            else if (foreignMessage instanceof ObjectMessage JavaDoc)
2221            {
2222// covert from Foreign object to Manta object
2223
ObjectMessage JavaDoc mantaObjectMsg = (ObjectMessage JavaDoc) foreignMessage;
2224                MantaObjectMessage msg = new MantaObjectMessage();
2225                msg.setObject(mantaObjectMsg.getObject());
2226                mantaResult = msg;
2227            }
2228            else if (foreignMessage instanceof MapMessage JavaDoc)
2229            {
2230// covert from Foreign map to Manta map
2231
MapMessage JavaDoc mantaMapMsg = (MapMessage JavaDoc) foreignMessage;
2232                MantaMapMessage msg = new MantaMapMessage();
2233                for (Enumeration JavaDoc iter = mantaMapMsg.getMapNames(); iter.hasMoreElements();) {
2234                    String JavaDoc name = iter.nextElement().toString();
2235                    msg.setObject(name, mantaMapMsg.getObject(name));
2236                }
2237                mantaResult = msg;
2238            }
2239            else if (foreignMessage instanceof BytesMessage JavaDoc)
2240            {
2241// covert from Foreign bytes to Manta bytes
2242
BytesMessage JavaDoc mantaBytesMsg = (BytesMessage JavaDoc) foreignMessage;
2243                mantaBytesMsg.reset();
2244                MantaBytesMessage msg = new MantaBytesMessage();
2245                try {
2246                    while(true) {
2247                        msg.writeByte(mantaBytesMsg.readByte());
2248                    }
2249                }
2250                catch (JMSException JavaDoc e) {
2251                    // do nothing
2252
}
2253                mantaResult = msg;
2254            }
2255            else if (foreignMessage instanceof StreamMessage JavaDoc)
2256            {
2257                StreamMessage JavaDoc mantaStreamMessage = (StreamMessage JavaDoc) foreignMessage;
2258// covert from Foreign stream to Manta stream
2259
mantaStreamMessage.reset();
2260                MantaStreamMessage mantaStreamMsg = new MantaStreamMessage();
2261                Object JavaDoc obj = null;
2262                try {
2263                    while ((obj = mantaStreamMessage.readObject()) != null) {
2264                        mantaStreamMsg.writeObject(obj);
2265                    }
2266                }
2267                catch (JMSException JavaDoc e) {
2268                    // do nothing
2269
}
2270                mantaResult = mantaStreamMsg;
2271            }
2272
2273            mantaResult.setJMSTimestamp(foreignMessage.getJMSTimestamp());
2274            mantaResult.setJMSReplyTo(fromForeignDesToManta(foreignMessage.getJMSReplyTo()));
2275            mantaResult.setJMSMessageID(foreignMessage.getJMSMessageID());
2276            mantaResult.setJMSCorrelationID(foreignMessage.getJMSCorrelationID());
2277            mantaResult.setJMSExpiration(foreignMessage.getJMSExpiration());
2278            mantaResult.setJMSDestination(fromForeignDesToManta(foreignMessage.getJMSDestination()));
2279            mantaResult.setJMSPriority(foreignMessage.getJMSPriority());
2280            mantaResult.setJMSDeliveryMode(foreignMessage.getJMSDeliveryMode());
2281
2282            if (foreignMessage.getJMSRedelivered())
2283                mantaResult.flags=mantaResult.flags|MantaMessage.IS_REDELIVERED;
2284
2285            mantaResult.setJMSPriority(foreignMessage.getJMSPriority());
2286            Enumeration JavaDoc propertyKeys = foreignMessage.getPropertyNames();
2287
2288            while( propertyKeys.hasMoreElements())
2289            {
2290                String JavaDoc key = propertyKeys.nextElement().toString();
2291                Object JavaDoc obj = foreignMessage.getObjectProperty(key);
2292                mantaResult.setObjectProperty(key, obj);
2293            }
2294            return mantaResult;
2295    }
2296
2297    public boolean isStopped() {
2298        return isStopped;
2299    }
2300    /**
2301     * @param destination
2302     * @return a MantaDestination
2303     * @throws JMSException if an error occurs
2304     */

2305    private MantaDestination fromForeignDesToManta(Destination JavaDoc destination) throws JMSException JavaDoc {
2306
2307        if (destination==null)
2308            return null;
2309
2310        if (destination instanceof MantaDestination)
2311            return (MantaDestination)destination;
2312
2313        MantaDestination result = null;
2314        if (destination instanceof TemporaryQueue JavaDoc) {
2315            result = new MantaTemporaryQueue(((Queue JavaDoc) destination).getQueueName(),null);
2316        }
2317        else if (destination instanceof TemporaryTopic JavaDoc) {
2318            result = new MantaTemporaryTopic(((Topic JavaDoc) destination).getTopicName(),null);
2319        }
2320        else if (destination instanceof Queue JavaDoc) {
2321            result = new MantaQueue(((Queue JavaDoc) destination).getQueueName());
2322        }
2323        else if (destination instanceof Topic JavaDoc) {
2324            result = new MantaTopic(((Topic JavaDoc) destination).getTopicName());
2325        }
2326
2327        return result;
2328
2329    }//fromForeignDesToManta
2330

2331
2332    public DeliveryListener getDeliveryListener() {
2333        return deliveryListener;
2334    }
2335
2336
2337    public void setDeliveryListener(DeliveryListener deliveryListener) {
2338        this.deliveryListener = deliveryListener;
2339    }
2340
2341    /**
2342     * The JCA resource adapter needs to replace the transaction context
2343     * of the session with another that operates at the connection level.
2344     * @param transactionContext
2345     */

2346    public void setTransactionContext(TransactionContext newContext) {
2347        if (transactionContext != null) {
2348            transactionContext.removeSession(this);
2349        }
2350        transactionContext = newContext;
2351        transactionContext.addSession(this);
2352    }
2353
2354    /**
2355     * Returns the transaction context.
2356     * @return
2357     */

2358    public TransactionContext getTransactionContext() {
2359        return transactionContext;
2360    }
2361
2362
2363    protected class Counter {
2364        int count=0;
2365        void add() {
2366            count++;
2367        }
2368        void remove() {
2369            count--;
2370        }
2371        int val() {
2372            return count;
2373        }
2374
2375    }
2376
2377    /**
2378     * This interface is used to support the JCA 1.5 standard, to notify
2379     * the application server before and after sending a message
2380     * to an endpoint (MDB)
2381     * @author shaiw
2382     */

2383    public static interface DeliveryListener {
2384        public void beforeDelivery(MantaSession session, Message JavaDoc msg);
2385        public void afterDelivery(MantaSession session, Message JavaDoc msg);
2386    }
2387}//MantaSession
2388
Popular Tags