KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > jms > asf > StdServerSessionPool


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.jms.asf;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27
28 import javax.jms.Connection JavaDoc;
29 import javax.jms.Destination JavaDoc;
30 import javax.jms.JMSException JavaDoc;
31 import javax.jms.MessageListener JavaDoc;
32 import javax.jms.Queue JavaDoc;
33 import javax.jms.QueueConnection JavaDoc;
34 import javax.jms.ServerSession JavaDoc;
35 import javax.jms.ServerSessionPool JavaDoc;
36 import javax.jms.Session JavaDoc;
37 import javax.jms.Topic JavaDoc;
38 import javax.jms.TopicConnection JavaDoc;
39 import javax.jms.XAQueueConnection JavaDoc;
40 import javax.jms.XAQueueSession JavaDoc;
41 import javax.jms.XASession JavaDoc;
42 import javax.jms.XATopicConnection JavaDoc;
43 import javax.jms.XATopicSession JavaDoc;
44 import javax.transaction.TransactionManager JavaDoc;
45
46 import org.jboss.logging.Logger;
47 import org.jboss.tm.XidFactoryMBean;
48
49 import EDU.oswego.cs.dl.util.concurrent.Executor;
50 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
51 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
52
53 /**
54  * Implementation of ServerSessionPool.
55  *
56  * @author <a HREF="mailto:peter.antman@tim.se">Peter Antman</a> .
57  * @author <a HREF="mailto:hiram.chirino@jboss.org">Hiram Chirino</a> .
58  * @author <a HREF="mailto:adrian@jboss.com">Adrian Brock</a>
59  * @version $Revision: 38364 $
60  */

61 public class StdServerSessionPool implements ServerSessionPool JavaDoc
62 {
63    /** The thread group which session workers will run. */
64    private static ThreadGroup JavaDoc threadGroup = new ThreadGroup JavaDoc("ASF Session Pool Threads");
65
66    /** Instance logger. */
67    private final Logger log = Logger.getLogger(this.getClass());
68
69    /** The minimum size of the pool */
70    private int minSize;
71
72    /** The size of the pool. */
73    private int poolSize;
74
75    /** The message acknowledgment mode. */
76    private int ack;
77
78    /** Is the bean container managed? */
79    private boolean useLocalTX;
80
81    /** True if this is a transacted session. */
82    private boolean transacted;
83
84    /** The destination. */
85    private Destination JavaDoc destination;
86
87    /** The session connection. */
88    private Connection JavaDoc con;
89
90    /** The message listener for the session. */
91    private MessageListener JavaDoc listener;
92
93    /** The list of ServerSessions. */
94    private List JavaDoc sessionPool;
95
96    /** The executor for processing messages? */
97    private PooledExecutor executor;
98
99    /** Used to signal when the Pool is being closed down */
100    private boolean closing = false;
101
102    /** Used during close down to wait for all server sessions to be returned and closed. */
103    private int numServerSessions = 0;
104
105    private XidFactoryMBean xidFactory;
106
107    private TransactionManager JavaDoc tm;
108
109    /**
110     * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
111     *
112     * @param destination the destination
113     * @param con connection to get sessions from
114     * @param transacted transaction mode when not XA (
115     * @param ack ackmode when not XA
116     * @param listener the listener the sessions will call
117     * @param minSession minumum number of sessions in the pool
118     * @param maxSession maximum number of sessions in the pool
119     * @param keepAlive the time to keep sessions alive
120     * @param xidFactory the xid factory
121     * @param tm the transaction manager
122     * @exception JMSException Description of Exception
123     */

124    public StdServerSessionPool(final Destination JavaDoc destination,
125                                final Connection JavaDoc con,
126                                final boolean transacted,
127                                final int ack,
128                                final boolean useLocalTX,
129                                final MessageListener JavaDoc listener,
130                                final int minSession,
131                                final int maxSession,
132                                final long keepAlive,
133                                final XidFactoryMBean xidFactory,
134                                final TransactionManager JavaDoc tm)
135       throws JMSException JavaDoc
136    {
137       this.destination = destination;
138       this.con = con;
139       this.ack = ack;
140       this.listener = listener;
141       this.transacted = transacted;
142       this.minSize = minSession;
143       this.poolSize = maxSession;
144       this.sessionPool = new ArrayList JavaDoc(maxSession);
145       this.useLocalTX = useLocalTX;
146       this.xidFactory = xidFactory;
147       this.tm = tm;
148       // setup the worker pool
149
executor = new MyPooledExecutor(poolSize);
150       executor.setMinimumPoolSize(minSize);
151       executor.setKeepAliveTime(keepAlive);
152       executor.waitWhenBlocked();
153       executor.setThreadFactory(new DefaultThreadFactory());
154
155       // finish initializing the session
156
create();
157       log.debug("Server Session pool set up");
158    }
159
160    /**
161     * Get a server session.
162     *
163     * @return A server session.
164     * @throws JMSException Failed to get a server session.
165     */

166    public ServerSession JavaDoc getServerSession() throws JMSException JavaDoc
167    {
168       if( log.isTraceEnabled() )
169          log.trace("getting a server session");
170       ServerSession JavaDoc session = null;
171
172       try
173       {
174          while (true)
175          {
176             synchronized (sessionPool)
177             {
178                if (closing)
179                {
180                   throw new JMSException JavaDoc("Cannot get session after pool has been closed down.");
181                }
182                else if (sessionPool.size() > 0)
183                {
184                   session = (ServerSession JavaDoc)sessionPool.remove(0);
185                   break;
186                }
187                else
188                {
189                   try
190                   {
191                      sessionPool.wait();
192                   }
193                   catch (InterruptedException JavaDoc ignore)
194                   {
195                   }
196                }
197             }
198          }
199       }
200       catch (Exception JavaDoc e)
201       {
202          throw new JMSException JavaDoc("Failed to get a server session: " + e);
203       }
204
205       if( log.isTraceEnabled() )
206          log.trace("using server session: " + session);
207       return session;
208    }
209
210    /**
211     * Clear the pool, clear out both threads and ServerSessions,
212     * connection.stop() should be run before this method.
213     */

214    public void clear()
215    {
216       synchronized (sessionPool)
217       {
218          // FIXME - is there a runaway condition here. What if a
219
// ServerSession are taken by a ConnecionConsumer? Should we set
220
// a flag somehow so that no ServerSessions are recycled and the
221
// ThreadPool won't leave any more threads out.
222
closing = true;
223
224          log.debug("Clearing " + sessionPool.size() + " from ServerSessionPool");
225
226          Iterator JavaDoc iter = sessionPool.iterator();
227          while (iter.hasNext())
228          {
229             StdServerSession ses = (StdServerSession)iter.next();
230             // Should we do anything to the server session?
231
ses.close();
232             numServerSessions--;
233          }
234
235          sessionPool.clear();
236          sessionPool.notifyAll();
237       }
238
239       //Must be outside synchronized block because of recycle method.
240
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
241
242       //wait for all server sessions to be returned.
243
synchronized (sessionPool)
244       {
245          while (numServerSessions > 0)
246          {
247             try
248             {
249                sessionPool.wait();
250             }
251             catch (InterruptedException JavaDoc ignore)
252             {
253             }
254          }
255       }
256    }
257
258    /**
259     * Get the executor we are using.
260     *
261     * @return The Executor value
262     */

263    Executor getExecutor()
264    {
265       return executor;
266    }
267
268    // --- Protected messages for StdServerSession to use
269

270    /**
271     * Returns true if this server session is transacted.
272     *
273     * @return The Transacted value
274     */

275    boolean isTransacted()
276    {
277       return transacted;
278    }
279
280    /**
281     * Recycle a server session.
282     *
283     * @param session Description of Parameter
284     */

285    void recycle(StdServerSession session)
286    {
287       synchronized (sessionPool)
288       {
289          if (closing)
290          {
291             session.close();
292             numServerSessions--;
293             if (numServerSessions == 0)
294             {
295                //notify clear thread.
296
sessionPool.notifyAll();
297             }
298          }
299          else
300          {
301             sessionPool.add(session);
302             sessionPool.notifyAll();
303             if( log.isTraceEnabled() )
304                log.trace("recycled server session: " + session);
305          }
306       }
307    }
308
309    private void create() throws JMSException JavaDoc
310    {
311       for (int index = 0; index < poolSize; index++)
312       {
313          // Here is the meat, that MUST follow the spec
314
Session JavaDoc ses = null;
315          XASession JavaDoc xaSes = null;
316
317          log.debug("initializing with connection: " + con);
318
319          if (destination instanceof Topic JavaDoc && con instanceof XATopicConnection JavaDoc)
320          {
321             xaSes = ((XATopicConnection JavaDoc)con).createXATopicSession();
322             ses = ((XATopicSession JavaDoc)xaSes).getTopicSession();
323          }
324          else if (destination instanceof Queue JavaDoc && con instanceof XAQueueConnection JavaDoc)
325          {
326             xaSes = ((XAQueueConnection JavaDoc)con).createXAQueueSession();
327             ses = ((XAQueueSession JavaDoc)xaSes).getQueueSession();
328          }
329          else if (destination instanceof Topic JavaDoc && con instanceof TopicConnection JavaDoc)
330          {
331             ses = ((TopicConnection JavaDoc)con).createTopicSession(transacted, ack);
332             log.warn("Using a non-XA TopicConnection. " +
333                   "It will not be able to participate in a Global UOW");
334          }
335          else if (destination instanceof Queue JavaDoc && con instanceof QueueConnection JavaDoc)
336          {
337             ses = ((QueueConnection JavaDoc)con).createQueueSession(transacted, ack);
338             log.warn("Using a non-XA QueueConnection. " +
339                   "It will not be able to participate in a Global UOW");
340          }
341          else
342          {
343             throw new JMSException JavaDoc("Connection was not reconizable: " + con + " for destination " + destination);
344          }
345
346          // create the server session and add it to the pool - it is up to the
347
// server session to set the listener
348
StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
349             listener, useLocalTX, xidFactory, tm);
350
351          sessionPool.add(serverSession);
352          numServerSessions++;
353
354          log.debug("added server session to the pool: " + serverSession);
355       }
356    }
357
358    /**
359     * A pooled executor where the minimum pool size
360     * threads are kept alive
361     */

362    private static class MyPooledExecutor extends PooledExecutor
363    {
364       public MyPooledExecutor(int poolSize)
365       {
366          super(poolSize);
367       }
368       
369       protected Runnable JavaDoc getTask() throws InterruptedException JavaDoc
370       {
371          Runnable JavaDoc task = null;
372          while ((task = super.getTask()) == null && keepRunning());
373          return task;
374       }
375       
376       /**
377        * We keep running unless we are told to shutdown
378        * or there are more than minimumPoolSize_ threads in the pool
379        *
380        * @return whether to keep running
381        */

382       protected synchronized boolean keepRunning()
383       {
384          if (shutdown_)
385             return false;
386          
387          return poolSize_ <= minimumPoolSize_;
388       }
389    }
390
391    private static class DefaultThreadFactory implements ThreadFactory
392    {
393       private static int count = 0;
394       private static synchronized int nextCount()
395       {
396          return count ++;
397       }
398
399       /**
400        * Create a new Thread for the given Runnable
401        *
402        * @param command The Runnable to pass to Thread
403        * @return The newly created Thread
404        */

405       public Thread JavaDoc newThread(final Runnable JavaDoc command)
406       {
407          String JavaDoc name = "JMS SessionPool Worker-" + nextCount();
408          Thread JavaDoc thread = new Thread JavaDoc(threadGroup, command, name);
409          thread.setDaemon(true);
410          return thread;
411       }
412    }
413 }
414
Popular Tags