KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > scalagent > kjoram > ConnectionConsumer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): Nicolas Tachker (ScalAgent)
23  */

24 package com.scalagent.kjoram;
25
26 import com.scalagent.kjoram.jms.*;
27 import com.scalagent.kjoram.excepts.*;
28
29 import java.util.Vector JavaDoc;
30 import java.util.Enumeration JavaDoc;
31
32
33 public class ConnectionConsumer
34 {
35   /** The connection the consumer belongs to. */
36   private Connection cnx;
37   /** <code>true</code> if the consumer is a durable topic subscriber. */
38   private boolean durable = false;
39   /** The selector for filtering messages. */
40   private String JavaDoc selector;
41   /** The session pool provided by the application server. */
42   private ServerSessionPool sessionPool;
43   /** The maximum number of messages a session may process at once. */
44   private int maxMessages;
45   /**
46    * The daemon taking care of distributing the asynchronous deliveries to
47    * the sessions.
48    */

49   private CCDaemon ccDaemon;
50   /** The current consuming request. */
51   private com.scalagent.kjoram.jms.AbstractJmsRequest currentReq = null;
52   /** <code>true</code> if the connection consumer is closed. */
53   private boolean closed = false;
54
55   /** The name of the queue or of the subscription the deliveries come from. */
56   String JavaDoc targetName;
57   /** <code>true</code> if the deliveries come from a queue. */
58   boolean queueMode = true;
59   /**
60    * The FIFO queue where the connection pushes the asynchronous server
61    * deliveries.
62    */

63   com.scalagent.kjoram.util.Queue repliesIn;
64
65
66   /**
67    * Constructs a <code>ConnectionConsumer</code>.
68    *
69    * @param cnx The connection the consumer belongs to.
70    * @param dest The destination where consuming messages.
71    * @param subName The durable consumer name, if any.
72    * @param selector The selector for filtering messages.
73    * @param sessionPool The session pool provided by the application server.
74    * @param maxMessages The maximum number of messages to be passed at once
75    * to a session.
76    *
77    * @exception InvalidSelectorException If the selector syntax is wrong.
78    * @exception InvalidDestinationException If the target destination does not
79    * exist.
80    * @exception JMSSecurityException If the user is not a READER on the
81    * destination.
82    * @exception JMSException If one of the parameters is wrong.
83    */

84   ConnectionConsumer(Connection cnx, Destination dest, String JavaDoc subName,
85                      String JavaDoc selector, ServerSessionPool sessionPool,
86                      int maxMessages) throws JMSException
87   {
88     if (sessionPool == null)
89       throw new JMSException("Invalid ServerSessionPool parameter: "
90                              + sessionPool);
91     if (maxMessages <= 0)
92       throw new JMSException("Invalid maxMessages parameter: " + maxMessages);
93     
94     this.cnx = cnx;
95     this.selector = selector;
96     this.sessionPool = sessionPool;
97     this.maxMessages = maxMessages;
98
99     if (dest instanceof Queue)
100       targetName = dest.getName();
101     else if (subName == null) {
102       queueMode = false;
103       targetName = cnx.nextSubName();
104     }
105     else {
106       queueMode = false;
107       targetName = subName;
108       durable = true;
109     }
110
111     repliesIn = new com.scalagent.kjoram.util.Queue();
112
113     if (cnx.cconsumers == null)
114       cnx.cconsumers = new Vector JavaDoc();
115  
116     cnx.cconsumers.addElement(this);
117
118     ccDaemon = new CCDaemon(this);
119     ccDaemon.setDaemon(true);
120     ccDaemon.start();
121
122     // If the consumer is a subscriber, subscribing to the target topic:
123
if (! queueMode)
124       cnx.syncRequest(new ConsumerSubRequest(dest.getName(), targetName,
125                                              selector, false, durable));
126
127     // Sending a listener request:
128
currentReq = new ConsumerSetListRequest(targetName, selector, queueMode);
129     currentReq.setRequestId(cnx.nextRequestId());
130     cnx.requestsTable.put(currentReq.getKey(), this);
131     cnx.asyncRequest(currentReq);
132
133     if (JoramTracing.dbgClient)
134       JoramTracing.log(JoramTracing.DEBUG, this + ": created.");
135   }
136
137   /**
138    * Constructs a <code>ConnectionConsumer</code>.
139    *
140    * @param cnx The connection the consumer belongs to.
141    * @param dest The destination where consuming messages.
142    * @param selector The selector for filtering messages.
143    * @param sessionPool The session pool provided by the application server.
144    * @param maxMessages The maximum number of messages to be passed at once
145    * to a session.
146    *
147    * @exception InvalidSelectorException If the selector syntax is wrong.
148    * @exception InvalidDestinationException If the target destination does not
149    * exist.
150    * @exception JMSSecurityException If the user is not a READER on the
151    * destination.
152    * @exception JMSException If one of the parameters is wrong.
153    */

154   ConnectionConsumer(Connection cnx, Destination dest, String JavaDoc selector,
155                      ServerSessionPool sessionPool,
156                      int maxMessages) throws JMSException
157   {
158     this(cnx, dest, null, selector, sessionPool, maxMessages);
159   }
160
161   /** Returns a string image of the connection consumer. */
162   public String JavaDoc toString()
163   {
164     return "ConnCons:" + cnx.toString();
165   }
166
167
168   /**
169    * API method.
170    *
171    * @exception IllegalStateException If the ConnectionConsumer is closed.
172    */

173   public ServerSessionPool getServerSessionPool() throws JMSException
174   {
175     if (closed)
176       throw new com.scalagent.kjoram.excepts.
177
        IllegalStateException("Forbidden call on a closed"
178                               + " ConnectionConsumer.");
179     return sessionPool;
180   }
181
182
183   /**
184    * API method.
185    *
186    * @exception JMSException Actually never thrown.
187    */

188   public void close() throws JMSException
189   {
190     cnx.requestsTable.remove(currentReq.getKey());
191     ccDaemon.stop();
192
193     // If the consumer is a subscriber, managing the subscription closing:
194
if (! queueMode) {
195       try {
196         if (durable)
197           cnx.syncRequest(new ConsumerCloseSubRequest(targetName));
198         else
199           cnx.syncRequest(new ConsumerUnsubRequest(targetName));
200       }
201       // A JMSException might be caught if the connection is broken.
202
catch (JMSException jE) {}
203     }
204     cnx.cconsumers.removeElement(this);
205   }
206
207 /**
208  * The <code>CCDaemon</code> distributes the server's asynchronous
209  * deliveries to the application server's sessions.
210  */

211 class CCDaemon extends com.scalagent.kjoram.util.Daemon
212 {
213   /** The connection consumer the daemon belongs to. */
214   private ConnectionConsumer cc;
215
216   /**
217    * Constructs the <code>CCDaemon</code> belonging to this connection
218    * consumer.
219    */

220   CCDaemon(ConnectionConsumer cc)
221   {
222     super(cc.toString());
223     this.cc = cc;
224   }
225
226   /** The daemon's loop. */
227   public void run()
228   {
229     ConsumerMessages reply;
230     Vector JavaDoc deliveries = new Vector JavaDoc();
231     ServerSession serverSess;
232     Session sess;
233     int counter;
234
235     try {
236       while (running) {
237         canStop = true;
238
239         try {
240           // Expecting a reply:
241
repliesIn.get();
242         }
243         catch (Exception JavaDoc iE) {
244           continue;
245         }
246         canStop = false;
247
248         // Processing the delivery:
249
try {
250           if (JoramTracing.dbgClient)
251             JoramTracing.log(JoramTracing.DEBUG, "--- " + cc
252                              + ": got a delivery.");
253
254           // Getting a server's session:
255
serverSess = sessionPool.getServerSession();
256           sess = (Session) serverSess.getSession();
257           sess.connectionConsumer = cc;
258           counter = 1;
259
260           // As long as there are messages to deliver, passing to session(s)
261
// as many messages as possible:
262
while (counter <= maxMessages && repliesIn.size() > 0) {
263             
264             // If the consumer is a queue consumer, sending a new request:
265
if (queueMode) {
266               cnx.requestsTable.remove(currentReq.getKey());
267               currentReq = new ConsumerSetListRequest(targetName, selector,
268                                                       queueMode);
269               currentReq.setRequestId(cnx.nextRequestId());
270               cnx.requestsTable.put(currentReq.getKey(), cc);
271               cnx.asyncRequest(currentReq);
272             }
273
274             reply = (ConsumerMessages) repliesIn.pop();
275             for (Enumeration JavaDoc e = reply.getMessages().elements(); e.hasMoreElements(); ) {
276               deliveries.addElement(e.nextElement());
277             }
278
279             while (! deliveries.isEmpty()) {
280               while (counter <= maxMessages && ! deliveries.isEmpty()) {
281                 if (JoramTracing.dbgClient)
282                   JoramTracing.log(JoramTracing.DEBUG, "Passes a"
283                                    + " message to a session.");
284                 Object JavaDoc obj = deliveries.elementAt(0);
285                 deliveries.removeElementAt(0);
286                 sess.repliesIn.push(obj);
287                 counter++;
288               }
289               if (counter > maxMessages) {
290                 if (JoramTracing.dbgClient)
291                   JoramTracing.log(JoramTracing.DEBUG, "Starts the"
292                                    + " session.");
293                 serverSess.start();
294                 counter = 1;
295
296                 if (! deliveries.isEmpty() || repliesIn.size() > 0) {
297                   serverSess = sessionPool.getServerSession();
298                   sess =
299                     (Session) serverSess.getSession();
300                   sess.connectionConsumer = cc;
301                 }
302               }
303             }
304           }
305           // There is no more message to deliver and no more delivery,
306
// starting the last session to which messages have been passed:
307
if (JoramTracing.dbgClient)
308             JoramTracing.log(JoramTracing.DEBUG, "No more delivery.");
309           if (counter > 1) {
310             if (JoramTracing.dbgClient)
311               JoramTracing.log(JoramTracing.DEBUG, "Starts the"
312                                + " session.");
313             counter = 1;
314             serverSess.start();
315           }
316         }
317         // A JMSException will be caught if the application server failed
318
// to provide a session: closing the consumer.
319
catch (JMSException jE) {
320           canStop = true;
321           try {
322             cc.close();
323           }
324           catch (JMSException jE2) {}
325         }
326       }
327     }
328     finally {
329       finish();
330     }
331   }
332
333   /** Shuts the daemon down. */
334   public void shutdown()
335   {}
336
337   /** Releases the daemon's resources. */
338   public void close()
339   {
340     if (JoramTracing.dbgClient)
341       JoramTracing.log(JoramTracing.DEBUG, "CCDaemon finished.");
342   }
343 }
344 }
345
Popular Tags