KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > MultiSessionConsumer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.client.jms;
24
25 import java.util.Vector JavaDoc;
26
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.ServerSession JavaDoc;
30 import javax.jms.ServerSessionPool JavaDoc;
31
32 import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
33 import org.objectweb.joram.shared.client.ConsumerMessages;
34 import org.objectweb.util.monolog.api.BasicLevel;
35 import org.objectweb.util.monolog.api.Logger;
36
37 import fr.dyade.aaa.util.Daemon;
38 import fr.dyade.aaa.util.Debug;
39 import fr.dyade.aaa.util.Queue;
40
41 /**
42  * The MultiSessionConsumer is threaded (see MessageDispatcher)
43  * because the session pool can hang if there is no more
44  * available ServerSession.
45  *
46  */

47 public class MultiSessionConsumer extends MessageConsumerListener
48   implements javax.jms.ConnectionConsumer JavaDoc{
49   
50   private static final Logger logger =
51     Debug.getLogger(MultiSessionConsumer.class.getName());
52   
53   private ServerSessionPool JavaDoc sessPool;
54   
55   private Connection cnx;
56   
57   private int maxMsgs;
58   
59   private Queue repliesIn;
60   
61   /**
62    * Number of simultaneously activated
63    * listeners.
64    */

65   private int nbActivatedListeners;
66   
67   private MessageDispatcher msgDispatcher;
68
69   /**
70    * @param consumer
71    * @param listener
72    * @param ackMode
73    * @param queueMessageReadMax
74    * @param topicActivationThreshold
75    * @param topicPassivationThreshold
76    * @param topicAckBufferMax
77    * @param reqMultiplexer
78    */

79   MultiSessionConsumer(
80       boolean queueMode,
81       boolean durable,
82       String JavaDoc selector,
83       String JavaDoc targetName,
84       ServerSessionPool JavaDoc sessionPool,
85       int queueMessageReadMax,
86       int topicActivationThreshold, int topicPassivationThreshold,
87       int topicAckBufferMax,
88       RequestMultiplexer reqMultiplexer,
89       Connection connection,
90       int maxMessages) {
91     super(queueMode, durable, selector, targetName,
92         null, queueMessageReadMax,
93         topicActivationThreshold,
94         topicPassivationThreshold, topicAckBufferMax,
95         reqMultiplexer);
96     if (logger.isLoggable(BasicLevel.DEBUG))
97       logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.<init>(" +
98           queueMode + ',' + durable + ',' + selector + ',' +
99           targetName + ',' + sessionPool + ',' +
100           queueMessageReadMax + ',' +
101           topicActivationThreshold + ',' + topicPassivationThreshold + ',' +
102           topicAckBufferMax + ',' +
103           reqMultiplexer + ',' + maxMessages + ')');
104     sessPool = sessionPool;
105     cnx = connection;
106     maxMsgs = maxMessages;
107     msgDispatcher = new MessageDispatcher(
108         "MessageDispatcher[" + reqMultiplexer.getDemultiplexerDaemonName() + ']');
109     repliesIn = new Queue();
110     msgDispatcher.setDaemon(true);
111     msgDispatcher.start();
112   }
113
114   /* (non-Javadoc)
115    * @see org.objectweb.joram.client.jms.MessageConsumerListener#pushMessages(org.objectweb.joram.shared.client.ConsumerMessages)
116    */

117   public void pushMessages(ConsumerMessages cm) throws JMSException JavaDoc {
118     if (logger.isLoggable(BasicLevel.DEBUG))
119       logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.pushMessages(" + cm + ')');
120     repliesIn.push(cm);
121   }
122
123   /* (non-Javadoc)
124    * @see javax.jms.ConnectionConsumer#getServerSessionPool()
125    */

126   public ServerSessionPool JavaDoc getServerSessionPool() throws JMSException JavaDoc {
127     return sessPool;
128   }
129   
130   public void close() throws JMSException JavaDoc {
131     if (logger.isLoggable(BasicLevel.DEBUG))
132       logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.close()");
133     msgDispatcher.stop();
134     
135     if (logger.isLoggable(BasicLevel.DEBUG))
136       logger.log(BasicLevel.DEBUG,
137           "MultiSessionConsumer -> dispatcher stopped");
138     
139     super.close();
140     
141     if (logger.isLoggable(BasicLevel.DEBUG))
142       logger.log(BasicLevel.DEBUG,
143           "MultiSessionConsumer -> close connection consumer");
144     
145     cnx.closeConnectionConsumer(this);
146     
147     if (logger.isLoggable(BasicLevel.DEBUG))
148       logger.log(BasicLevel.DEBUG,
149           "MultiSessionConsumer -> connection consumer closed");
150   }
151   
152   public void onMessage(
153       Message msg, MessageListener JavaDoc listener, int ackMode)
154       throws JMSException JavaDoc {
155     if (logger.isLoggable(BasicLevel.DEBUG))
156       logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.onMessage(" + msg + ')');
157     try {
158       synchronized (this) {
159         if (getStatus() == Status.CLOSE) {
160           throw new javax.jms.IllegalStateException JavaDoc("Message listener closed");
161         } else {
162           if (nbActivatedListeners == 0) {
163             setStatus(Status.ON_MSG);
164           }
165           nbActivatedListeners++;
166         }
167       }
168       activateListener(msg, listener, ackMode);
169     } finally {
170       synchronized (this) {
171         nbActivatedListeners--;
172         if (nbActivatedListeners == 0) {
173           setStatus(Status.RUN);
174           // Notify threads trying to close the
175
// MessageConsumerListener.
176
notifyAll();
177         }
178       }
179     }
180   }
181   
182   class MessageDispatcher extends Daemon {
183     
184     MessageDispatcher(String JavaDoc name) {
185       super(name);
186     }
187
188     /* (non-Javadoc)
189      * @see fr.dyade.aaa.util.Daemon#close()
190      */

191     protected void close() {
192       // TODO Auto-generated method stub
193

194     }
195
196     /* (non-Javadoc)
197      * @see fr.dyade.aaa.util.Daemon#shutdown()
198      */

199     protected void shutdown() {
200       // TODO Auto-generated method stub
201

202     }
203     
204     /**
205      * Enables the daemon to stop itself.
206      */

207     public void stop() {
208       if (logger.isLoggable(BasicLevel.DEBUG))
209         logger.log(BasicLevel.DEBUG, "MessageDispatcher.stop()");
210       if (isCurrentThread()) {
211         finish();
212       } else {
213         super.stop();
214       }
215     }
216
217     /* (non-Javadoc)
218      * @see java.lang.Runnable#run()
219      */

220     public void run() {
221       try {
222         while (running) {
223           canStop = true;
224           ConsumerMessages cm = (ConsumerMessages) repliesIn.get();
225           canStop = false;
226
227           Vector JavaDoc msgs = cm.getMessages();
228           int sessionMsgCounter = maxMsgs + 1;
229           ServerSession JavaDoc serverSess = null;
230           Session sess = null;
231           for (int i = 0; i < msgs.size(); i++) {
232             if (sessionMsgCounter > maxMsgs) {
233               if (serverSess != null)
234                 serverSess.start();
235               serverSess = sessPool.getServerSession();
236               // This can hang if there is no more sessions
237
// in the pool
238
Object JavaDoc obj = serverSess.getSession();
239               if (obj instanceof Session) {
240                 sess = (Session) obj;
241               } else if (obj instanceof XASession) {
242                 sess = ((XASession) obj).sess;
243               } else {
244                 throw new Error JavaDoc("Unexpected session type: " + obj);
245               }
246               sess.setMessageConsumerListener(MultiSessionConsumer.this);
247               sessionMsgCounter = 1;
248             }
249             sess.onMessage((org.objectweb.joram.shared.messages.Message) msgs.get(i));
250             sessionMsgCounter++;
251           }
252           serverSess.start();
253           repliesIn.pop();
254         }
255       } catch (InterruptedException JavaDoc exc) {
256         if (logger.isLoggable(BasicLevel.DEBUG)) {
257           logger.log(BasicLevel.DEBUG, "", exc);
258         }
259       } catch (Exception JavaDoc exc) {
260         if (logger.isLoggable(BasicLevel.DEBUG)) {
261           logger.log(BasicLevel.DEBUG, "", exc);
262         }
263         try {
264           MultiSessionConsumer.this.close();
265         } catch (JMSException JavaDoc exc2) {
266         }
267       } finally {
268         finish();
269       }
270     }
271   }
272 }
273
Popular Tags