KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > connector > InboundSession


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 - 2006 Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  * Benoit Pelletier (Bull SA)
24  */

25 package org.objectweb.joram.client.connector;
26
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.XAConnection JavaDoc;
30 import javax.jms.XASession JavaDoc;
31 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
32 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
33 import javax.resource.spi.work.WorkManager JavaDoc;
34 import javax.transaction.xa.XAResource JavaDoc;
35
36 import org.objectweb.util.monolog.api.BasicLevel;
37
38 /**
39  * An <code>InboundSession</code> instance is responsible for processing
40  * delivered messages within a <code>javax.resource.spi.Work</code> instance,
41  * and passing them to a set of application server endpoints.
42  */

43 class InboundSession implements javax.jms.ServerSession JavaDoc,
44                                 javax.resource.spi.work.Work JavaDoc,
45                                 javax.jms.MessageListener JavaDoc
46 {
47   /** <code>InboundConsumer</code> instance this session belongs to. */
48   private InboundConsumer consumer;
49
50   /** Application server's <code>WorkManager</code> instance. */
51   private WorkManager JavaDoc workManager;
52   /** Application's endpoints factory. */
53   private MessageEndpointFactory JavaDoc endpointFactory;
54
55   /**
56    * <code>javax.jms.Session</code> instance dedicated to processing
57    * the delivered messages.
58    */

59   private Session JavaDoc session;
60   /** <code>XAResource</code> instance, if any. */
61   private XAResource JavaDoc xaResource = null;
62
63
64   /**
65    * Constructs an <code>InboundSession</code> instance.
66    *
67    * @param consumer InboundConsumer creating this session.
68    * @param workManager Application server's <code>WorkManager</code>
69    * instance.
70    * @param endpointFactory Application's endpoints factory.
71    * @param cnx Connection to the underlying JORAM server.
72    * @param transacted <code>true</code> if deliveries occur within a
73    * XA transaction.
74    */

75   InboundSession(InboundConsumer consumer,
76                  WorkManager JavaDoc workManager,
77                  MessageEndpointFactory JavaDoc endpointFactory,
78                  XAConnection JavaDoc cnx,
79                  boolean transacted,
80                  int ackMode) {
81     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
82       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
83                                     "InboundSession(" + consumer +
84                                     "," + workManager +
85                                     "," + endpointFactory +
86                                     "," + cnx +
87                                     "," + transacted +
88                                     "," + ackMode + ")");
89     
90     this.consumer = consumer;
91     this.workManager = workManager;
92     this.endpointFactory = endpointFactory;
93
94     try {
95       if (transacted) {
96         session = cnx.createXASession();
97         xaResource = ((XASession JavaDoc) session).getXAResource();
98         if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
99           AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
100                                         "InboundSession xaResource = " + xaResource);
101       }
102       else {
103         session = cnx.createSession(false, ackMode);
104       }
105
106       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
107         AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
108                                       "InboundSession session = " + session);
109       session.setMessageListener(this);
110     }
111     catch (JMSException JavaDoc exc) {}
112   }
113
114
115   /**
116    * Provides the wrapped <code>javax.jms.Session</code> instance for
117    * processing delivered messages.
118    *
119    * @exception JMSException Never thrown.
120    */

121   public Session JavaDoc getSession() throws JMSException JavaDoc
122   {
123     return session;
124   }
125
126   /**
127    * Notifies that the messages are ready to be processed.
128    *
129    * @exception JMSException If submitting the processing work fails.
130    */

131   public void start() throws JMSException JavaDoc
132   {
133     try {
134       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
135         AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
136                                       "ServerSession submits Work instance.");
137       workManager.scheduleWork(this);
138     }
139     catch (Exception JavaDoc exc) {
140       throw new JMSException JavaDoc("Can't start the adapter session for processing "
141                              + "the delivered messages: " + exc);
142     }
143   }
144
145   /** <code>javax.resource.spi.Work</code> method, not effective. */
146   public void release() {
147     try {
148       session.close();
149     } catch (JMSException JavaDoc exc) {
150     }
151   }
152
153   /** Runs the wrapped session for processing the messages. */
154   public void run()
155   {
156     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
157       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
158                                     "ServerSession runs wrapped Session.");
159     session.run();
160     consumer.releaseSession(this);
161   }
162
163   /** Forwards a processed message to an endpoint. */
164   public void onMessage(javax.jms.Message JavaDoc message) {
165     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
166       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
167                                     this + " onMessage(" + message + ")");
168
169     MessageEndpoint JavaDoc endpoint = null;
170     try {
171       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
172         AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
173                                       "ServerSession passes message to listener.");
174       endpoint = endpointFactory.createEndpoint(xaResource);
175       ((javax.jms.MessageListener JavaDoc) endpoint).onMessage(message);
176       endpoint.release();
177     } catch (Exception JavaDoc exc) {
178       try {
179         // try to clean the context for next invocation
180
if (endpoint != null) endpoint.release();
181       } catch (Exception JavaDoc e) {
182         // ignore the exception
183
}
184       throw new java.lang.IllegalStateException JavaDoc("Could not get endpoint "
185                                                 + "instance: " + exc);
186     }
187   }
188 }
189
Popular Tags