KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > connector > InboundConsumer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - Bull SA
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): Frederic Maistre (Bull SA)
21  * Contributor(s): Nicolas Tachker (Bull SA)
22  */

23 package org.objectweb.joram.client.connector;
24
25 import javax.jms.*;
26 import javax.resource.NotSupportedException JavaDoc;
27 import javax.resource.ResourceException JavaDoc;
28 import javax.resource.spi.CommException JavaDoc;
29 import javax.resource.spi.SecurityException JavaDoc;
30 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
31 import javax.resource.spi.work.WorkManager JavaDoc;
32
33 import java.util.Vector JavaDoc;
34
35 import org.objectweb.util.monolog.api.BasicLevel;
36
37
38 /**
39  * An <code>InboundConsumer</code> instance is responsible for consuming
40  * messages from a given JORAM destination and through a given JORAM
41  * connection.
42  */

43 class InboundConsumer implements javax.jms.ServerSessionPool JavaDoc
44 {
45   /** Application server's <code>WorkManager</code> instance. */
46   private WorkManager JavaDoc workManager;
47   /** Application's endpoints factory. */
48   private MessageEndpointFactory JavaDoc endpointFactory;
49
50   /** The provided connection to the underlying JORAM server. */
51   private XAConnection cnx;
52   /** The durable subscription name, if provided. */
53   private String JavaDoc subName = null;
54
55   /** <code>true</code> if message consumption occurs in a transaction. */
56   private boolean transacted;
57
58   /** Maximum number of Work instances to be submitted (0 for infinite). */
59   private int maxWorks;
60
61   private int ackMode;
62
63   /** for closing durable subscription */
64   private boolean closeDurSub;
65
66   /** Wrapped <code>ConnectionConsumer</code> instance. */
67   private ConnectionConsumer cnxConsumer;
68   /** Number of created server sessions. */
69   private int serverSessions = 0;
70
71   /** Pool of server sessions. */
72   private Vector JavaDoc pool;
73
74
75   /**
76    * Constructs an <code>InboundConsumer</code> instance.
77    *
78    * @param workManager Application server's <code>WorkManager</code>
79    * instance.
80    * @param endpointFactory Application's endpoints factory.
81    * @param cnx Connection to the JORAM server.
82    * @param dest Destination to get messages from.
83    * @param selector Selector for filtering messages.
84    * @param durable <code>true</code> for durably subscribing.
85    * @param subName Durable subscription name.
86    * @param transacted <code>true</code> if deliveries will occur in a
87    * XA transaction.
88    * @param maxWorks Max number of Work instances to be submitted.
89    *
90    * @exception NotSupportedException If the activation parameters are
91    * invalid.
92    * @exception SecurityException If the target destination is not
93    * readable.
94    * @exception CommException If the connection with the JORAM server
95    * is lost.
96    * @exception ResourceException Generic exception.
97    */

98   InboundConsumer(WorkManager JavaDoc workManager,
99                   MessageEndpointFactory JavaDoc endpointFactory,
100                   XAConnection cnx,
101                   Destination dest,
102                   String JavaDoc selector,
103                   boolean durable,
104                   String JavaDoc subName,
105                   boolean transacted,
106                   int maxWorks,
107                   int maxMessages,
108                   int ackMode,
109                   boolean closeDurSub) throws ResourceException JavaDoc {
110     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
111       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, "InboundConsumer(" + workManager +
112                                     ", " + endpointFactory +
113                                     ", " + cnx +
114                                     ", " + dest +
115                                     ", " + selector +
116                                     ", " + durable +
117                                     ", " + subName +
118                                     ", " + transacted +
119                                     ", " + maxWorks +
120                                     ", " + maxMessages +
121                                     "," + ackMode +
122                                     "," + closeDurSub + ")");
123
124     this.workManager = workManager;
125     this.endpointFactory = endpointFactory;
126     this.cnx = cnx;
127     this.transacted = transacted;
128     this.ackMode = ackMode;
129
130     if (maxWorks < 0) maxWorks = 0;
131     this.maxWorks = maxWorks;
132
133     pool = new Vector JavaDoc(maxWorks);
134
135     try {
136       if (durable) {
137         if (! (dest instanceof javax.jms.Topic JavaDoc))
138           throw new NotSupportedException JavaDoc("Can't set a durable subscription "
139                                           + "on a JMS queue.");
140
141         if (subName == null)
142           throw new NotSupportedException JavaDoc("Missing durable "
143                                           + "subscription name.");
144
145         this.subName = subName;
146
147         cnxConsumer =
148           cnx.createDurableConnectionConsumer((javax.jms.Topic JavaDoc) dest,
149                                               subName,
150                                               selector,
151                                               this,
152                                               maxMessages);
153       } else {
154         cnxConsumer = cnx.createConnectionConsumer(dest,
155                                                    selector,
156                                                    this,
157                                                    maxMessages);
158       }
159
160       cnx.start();
161     }
162     catch (JMSSecurityException exc) {
163       throw new SecurityException JavaDoc("Target destination not readble: "
164                                   + exc);
165     }
166     catch (javax.jms.IllegalStateException JavaDoc exc) {
167       throw new CommException JavaDoc("Connection with the JORAM server is lost.");
168     }
169     catch (JMSException exc) {
170       throw new ResourceException JavaDoc("Could not set asynchronous consumer: "
171                                   + exc);
172     }
173   }
174
175   /**
176    * Provides a new <code>InboundSession</code> instance for processing
177    * incoming messages.
178    *
179    * @exception JMSException Never thrown.
180    */

181   public ServerSession getServerSession()
182     throws JMSException {
183     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
184       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getServerSession()");
185
186     try {
187       synchronized (pool) {
188         if (pool.isEmpty()) {
189           if (maxWorks > 0) {
190             if (serverSessions < maxWorks) {
191               // Allocates a new ServerSession
192
return newSession();
193             } else {
194               // Wait for a free ServerSession
195
if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
196                 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
197                                               "ServerSessionPool waits for "
198                                               + "a free ServerSession.");
199               pool.wait();
200               return (ServerSession) pool.remove(0);
201             }
202           } else {
203             // Allocates a new ServerSession
204
return newSession();
205           }
206         } else {
207           return (ServerSession) pool.remove(0);
208         }
209       }
210     } catch (Exception JavaDoc exc) {
211       throw new JMSException("Error while getting server session from pool: "
212                              + exc);
213     }
214   }
215
216   private InboundSession newSession() {
217     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
218       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
219                                     "ServerSessionPool provides "
220                                     + "new ServerSession.");
221     serverSessions++;
222     return new InboundSession(this,
223                               workManager,
224                               endpointFactory,
225                               cnx,
226                               transacted,
227                               ackMode);
228   }
229
230   /** Releases an <code>InboundSession</code> instance. */
231   void releaseSession(InboundSession session) {
232     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
233       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " releaseSession(" + session + ")");
234
235     try {
236       synchronized (pool) {
237         pool.add(session);
238         pool.notify();
239       }
240     } catch (Exception JavaDoc exc) {}
241   }
242
243   /** Closes the consumer. */
244   void close() {
245       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
246           AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " close() unsubscribe subscription: "+ closeDurSub);
247
248       try {
249           cnxConsumer.close();
250
251           if (closeDurSub) {
252               if (subName != null) {
253                   Session session = cnx.createSession(true, 0);
254                   session.unsubscribe(subName);
255               }
256           }
257
258           cnx.close();
259       }
260       catch (JMSException exc) {}
261   }
262 }
263
Popular Tags