KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > ConnectionConsumer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.client.jms;
25
26 import org.objectweb.joram.shared.client.*;
27 import org.objectweb.joram.client.jms.connection.ReplyListener;
28 import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
29 import org.objectweb.joram.client.jms.connection.Requestor;
30
31 import java.util.Vector JavaDoc;
32
33 import javax.jms.IllegalStateException JavaDoc;
34 import javax.jms.InvalidSelectorException JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36
37 import org.objectweb.util.monolog.api.BasicLevel;
38
39 /**
40  * Implements the <code>javax.jms.ConnectionConsumer</code> interface.
41  */

42 public class ConnectionConsumer implements javax.jms.ConnectionConsumer JavaDoc {
43
44   private static class Status {
45     public static final int OPEN = 0;
46     public static final int CLOSE = 1;
47
48     private static final String JavaDoc[] names = {
49       "OPEN", "CLOSE"};
50
51     public static String JavaDoc toString(int status) {
52       return names[status];
53     }
54   }
55
56   /** The connection the consumer belongs to. */
57   private Connection cnx;
58
59   /** <code>true</code> if the consumer is a durable topic subscriber. */
60   private boolean durable = false;
61
62   /** The selector for filtering messages. */
63   private String JavaDoc selector;
64
65   /** The session pool provided by the application server. */
66   private javax.jms.ServerSessionPool JavaDoc sessionPool;
67
68   /** The maximum number of messages a session may process at once. */
69   private int maxMessages;
70
71   /**
72    * The daemon taking care of distributing the asynchronous deliveries to
73    * the sessions.
74    */

75   private CCDaemon ccDaemon;
76
77   /** The name of the queue or of the subscription the deliveries come from. */
78   private String JavaDoc targetName;
79
80   /** <code>true</code> if the deliveries come from a queue. */
81   private boolean queueMode;
82
83   /**
84    * The FIFO queue where the connection pushes the asynchronous server
85    * deliveries.
86    */

87   fr.dyade.aaa.util.Queue repliesIn;
88
89   private RequestMultiplexer mtpx;
90
91   private Requestor requestor;
92
93   private int requestId;
94
95   private int status;
96
97   /**
98    * Constructs a <code>ConnectionConsumer</code>.
99    *
100    * @param cnx The connection the consumer belongs to.
101    * @param dest The destination where consuming messages.
102    * @param subName The durable consumer name, if any.
103    * @param selector The selector for filtering messages.
104    * @param sessionPool The session pool provided by the application server.
105    * @param maxMessages The maximum number of messages to be passed at once
106    * to a session.
107    *
108    * @exception InvalidSelectorException If the selector syntax is wrong.
109    * @exception InvalidDestinationException If the target destination does not
110    * exist.
111    * @exception JMSSecurityException If the user is not a READER on the
112    * destination.
113    * @exception JMSException If one of the parameters is wrong.
114    */

115   ConnectionConsumer(Connection cnx,
116                      Destination dest,
117                      String JavaDoc subName,
118                      String JavaDoc selector,
119                      javax.jms.ServerSessionPool JavaDoc sessionPool,
120                      int maxMessages,
121                      RequestMultiplexer mtpx) throws JMSException JavaDoc {
122     try {
123       org.objectweb.joram.shared.selectors.Selector.checks(selector);
124     }
125     catch (org.objectweb.joram.shared.excepts.SelectorException sE) {
126       throw new InvalidSelectorException JavaDoc("Invalid selector syntax: " + sE);
127     }
128
129     if (sessionPool == null)
130       throw new JMSException JavaDoc("Invalid ServerSessionPool parameter: "
131                              + sessionPool);
132     if (maxMessages <= 0)
133       throw new JMSException JavaDoc("Invalid maxMessages parameter: " + maxMessages);
134     
135     this.cnx = cnx;
136     this.selector = selector;
137     this.sessionPool = sessionPool;
138     this.maxMessages = maxMessages;
139
140     this.mtpx = mtpx;
141     this.requestor = new Requestor(mtpx);
142
143     setStatus(Status.OPEN);
144
145     if (dest instanceof Queue) {
146       queueMode = true;
147       targetName = dest.getName();
148     } else if (subName == null) {
149       queueMode = false;
150       targetName = cnx.nextSubName();
151     } else {
152       queueMode = false;
153       targetName = subName;
154       durable = true;
155     }
156
157     repliesIn = new fr.dyade.aaa.util.Queue();
158
159     ccDaemon = new CCDaemon(toString());
160     ccDaemon.setDaemon(true);
161     ccDaemon.start();
162
163     // If the consumer is a subscriber, subscribing to the target topic:
164
if (! queueMode) {
165       requestor.request(
166         new ConsumerSubRequest(
167           dest.getName(), targetName,
168           selector, false, durable));
169     }
170
171     // Sending a listener request:
172
subscribe();
173   }
174
175   /**
176    * Constructs a <code>ConnectionConsumer</code>.
177    *
178    * @param cnx The connection the consumer belongs to.
179    * @param dest The destination where consuming messages.
180    * @param selector The selector for filtering messages.
181    * @param sessionPool The session pool provided by the application server.
182    * @param maxMessages The maximum number of messages to be passed at once
183    * to a session.
184    *
185    * @exception InvalidSelectorException If the selector syntax is wrong.
186    * @exception InvalidDestinationException If the target destination does not
187    * exist.
188    * @exception JMSSecurityException If the user is not a READER on the
189    * destination.
190    * @exception JMSException If one of the parameters is wrong.
191    */

192   ConnectionConsumer(Connection cnx,
193                      Destination dest,
194                      String JavaDoc selector,
195                      javax.jms.ServerSessionPool JavaDoc sessionPool,
196                      int maxMessages,
197                      RequestMultiplexer mtpx)
198     throws JMSException JavaDoc {
199     this(cnx, dest, null, selector,
200          sessionPool, maxMessages, mtpx);
201   }
202
203   public final String JavaDoc getTargetName() {
204     return targetName;
205   }
206
207   public final boolean getQueueMode() {
208     return queueMode;
209   }
210
211   /** Returns a string image of the connection consumer. */
212   public String JavaDoc toString()
213   {
214     return "ConnCons:" + cnx.toString();
215   }
216
217   private void setStatus(int status) {
218     this.status = status;
219   }
220
221   private synchronized void checkClosed()
222     throws IllegalStateException JavaDoc {
223     if (status == Status.CLOSE)
224       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
225   }
226
227   /**
228    * API method.
229    *
230    * @exception IllegalStateException If the ConnectionConsumer is closed.
231    */

232   public javax.jms.ServerSessionPool JavaDoc getServerSessionPool() throws JMSException JavaDoc {
233     checkClosed();
234     return sessionPool;
235   }
236
237   /**
238    * API method.
239    *
240    * @exception JMSException Actually never thrown.
241    */

242   public synchronized void close() throws JMSException JavaDoc {
243     if (status == Status.CLOSE) return;
244
245     mtpx.abortRequest(requestId);
246
247     ccDaemon.stop();
248
249     // If the consumer is a subscriber, managing the subscription closing:
250
if (! queueMode) {
251       if (durable) {
252         requestor.request(new ConsumerCloseSubRequest(targetName));
253       } else {
254         requestor.request(new ConsumerUnsubRequest(targetName));
255       }
256     }
257
258     cnx.closeConnectionConsumer(this);
259
260     setStatus(Status.CLOSE);
261   }
262
263   private void subscribe() throws JMSException JavaDoc {
264     ConsumerSetListRequest req =
265       new ConsumerSetListRequest(
266         targetName, selector, queueMode, null, 1);
267     mtpx.sendRequest(req, new ReplyListener() {
268         public boolean replyReceived(AbstractJmsReply reply) {
269           repliesIn.push(reply);
270           return queueMode;
271         }
272         
273         public void replyAborted(int requestId) {}
274       });
275     requestId = req.getRequestId();
276   }
277
278   /**
279    * The <code>CCDaemon</code> distributes the server's asynchronous
280    * deliveries to the application server's sessions.
281    */

282   class CCDaemon extends fr.dyade.aaa.util.Daemon {
283
284     /**
285      * Constructs the <code>CCDaemon</code> belonging to this connection
286      * consumer.
287      */

288     CCDaemon(String JavaDoc name){
289       super(name);
290     }
291
292     /** The daemon's loop. */
293     public void run() {
294       Vector JavaDoc deliveries = new Vector JavaDoc();
295       try {
296         while (running) {
297           canStop = true;
298           try {
299             repliesIn.get();
300           } catch (Exception JavaDoc iE) {
301             continue;
302           }
303           canStop = false;
304           
305           // Processing the delivery:
306
try {
307             javax.jms.ServerSession JavaDoc serverSess = sessionPool.getServerSession();
308
309             Session sess;
310             Object JavaDoc obj = serverSess.getSession();
311             if (obj instanceof Session) {
312               sess = (Session)obj;
313             } else if (obj instanceof XASession) {
314               sess = ((XASession)obj).sess;
315             } else throw new Error JavaDoc("Unexpected session type: " + obj);
316
317             sess.setConnectionConsumer(ConnectionConsumer.this);
318             int counter = 1;
319             
320             // As long as there are messages to deliver, passing to session(s)
321
// as many messages as possible:
322
while (counter <= maxMessages && repliesIn.size() > 0) {
323               
324               // If the consumer is a queue consumer, sending a new request:
325
if (queueMode) {
326                 subscribe();
327               }
328               
329               ConsumerMessages reply = (ConsumerMessages) repliesIn.pop();
330               Vector JavaDoc msgs = reply.getMessages();
331               for (int i = 0; i < msgs.size(); i++) {
332                 deliveries.add(
333                   (org.objectweb.joram.shared.messages.Message) msgs.get(i));
334               }
335
336               while (! deliveries.isEmpty()) {
337                 while (counter <= maxMessages && ! deliveries.isEmpty()) {
338                   if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
339                     JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Passes a"
340                                                + " message to a session.");
341                   sess.onMessage(
342                     (org.objectweb.joram.shared.messages.Message) deliveries.remove(0));
343                   counter++;
344                 }
345
346                 if (counter > maxMessages) {
347                   if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
348                     JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Starts the"
349                                                + " session.");
350                   serverSess.start();
351                   counter = 1;
352
353                   if (! deliveries.isEmpty() || repliesIn.size() > 0) {
354                     serverSess = sessionPool.getServerSession();
355                     obj = serverSess.getSession();
356                     if (obj instanceof Session) {
357                       sess = (Session)obj;
358                     } else if (obj instanceof XASession) {
359                       sess = ((XASession)obj).sess;
360                     } else throw new Error JavaDoc("Unexpected session type: " + obj);
361                     sess.setConnectionConsumer(ConnectionConsumer.this);
362                   }
363                 }
364               }
365             }
366             // There is no more message to deliver and no more delivery,
367
// starting the last session to which messages have been passed:
368
if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
369               JoramTracing.dbgClient.log(BasicLevel.DEBUG, "No more delivery.");
370             if (counter > 1) {
371               if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
372                 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Starts the"
373                                            + " session.");
374               counter = 1;
375               serverSess.start();
376             }
377           }
378           // A JMSException will be caught if the application server failed
379
// to provide a session: closing the consumer.
380
catch (JMSException JavaDoc jE) {
381             canStop = true;
382             try {
383               ConnectionConsumer.this.close();
384             }
385             catch (JMSException JavaDoc jE2) {}
386           }
387         }
388       }
389       finally {
390         finish();
391       }
392     }
393
394     protected void shutdown() {}
395
396     protected void close() {}
397   }
398 }
399
Popular Tags