KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > jonas_ejb > container > JMdbFactory


1 /**
2  * JOnAS: Java(TM) Open Application Server
3  * Copyright (C) 1999-2004 Bull S.A.
4  * Contact: jonas-team@objectweb.org
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA
20  *
21  * --------------------------------------------------------------------------
22  * $Id: JMdbFactory.java,v 1.34 2005/04/28 16:52:59 benoitf Exp $
23  * --------------------------------------------------------------------------
24  */

25
26 package org.objectweb.jonas_ejb.container;
27
28
29 import java.util.ArrayList JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.ListIterator JavaDoc;
32
33 import javax.ejb.EJBException JavaDoc;
34 import javax.ejb.MessageDrivenBean JavaDoc;
35 import javax.ejb.MessageDrivenContext JavaDoc;
36 import javax.ejb.Timer JavaDoc;
37 import javax.ejb.TimerService JavaDoc;
38 import javax.jms.ConnectionConsumer JavaDoc;
39 import javax.jms.JMSException JavaDoc;
40 import javax.jms.MessageListener JavaDoc;
41 import javax.jms.Queue JavaDoc;
42 import javax.jms.ServerSession JavaDoc;
43 import javax.jms.ServerSessionPool JavaDoc;
44 import javax.jms.Session JavaDoc;
45 import javax.jms.Topic JavaDoc;
46 import javax.jms.XAQueueConnection JavaDoc;
47 import javax.jms.XAQueueConnectionFactory JavaDoc;
48 import javax.jms.XATopicConnection JavaDoc;
49 import javax.jms.XATopicConnectionFactory JavaDoc;
50 import javax.naming.Context JavaDoc;
51
52 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc;
53 import org.objectweb.jonas_ejb.deployment.api.MethodDesc;
54
55 import org.objectweb.jonas_jms.api.JmsManager;
56
57 import org.objectweb.util.monolog.api.BasicLevel;
58
59 /**
60  * This class is a factory for a Message Driven Bean
61  * There is one such class per MDB class.
62  * Contains all information related to the bean and set up all JMS environment for the bean
63  * It manages a ServerSession pool to server MDB requests.
64  * @author Philippe Coq, Philippe Durieux
65  */

66 public class JMdbFactory extends JFactory implements ServerSessionPool JavaDoc {
67
68     /**
69      * JMS Manager
70      */

71     private JmsManager jms = null;
72
73     /**
74      * Connection Consumer for this message driven bean
75      */

76     ConnectionConsumer JavaDoc cc = null;
77
78     /**
79      * pool of ServerSession objects
80      */

81     private List JavaDoc sspool = new ArrayList JavaDoc();
82
83     protected int instanceCount = 0;
84     // initial value for pool size
85
protected int minPoolSize = 0;
86     // nb max of instances in pool
87
protected int maxCacheSize = 0;
88
89     /**
90      * JMS Topic Connection
91      * always use XA Connections for transactions.
92      */

93     protected XATopicConnection JavaDoc tconn = null;
94
95     /**
96      * JMS Queue Connection (Topic or Queue)
97      * always use XA Connections for transactions.
98      */

99     protected XAQueueConnection JavaDoc qconn = null;
100
101     /**
102      * Constructor
103      * @param dd Message Driven Descriptor
104      * @param cont Container where this bean is defined
105      * @param thp Thread pool that will be used to process MDB requests
106      */

107     public JMdbFactory(MessageDrivenDesc dd, JContainer cont) {
108         super(dd, cont);
109
110         // Check if tx managed by the bean or the container
111
txbeanmanaged = dd.isBeanManagedTransaction();
112
113         // Check that JMS Service has been run in the server
114
jms = cont.getJmsManager();
115         if (jms == null) {
116             TraceEjb.logger.log(BasicLevel.ERROR, "cannot deploy a message driven bean without the JMS Service");
117             throw new EJBException JavaDoc("JMS Service must be run");
118         }
119
120         // Create a Connection Consumer, depending on Deployment Descriptor
121
String JavaDoc selector = dd.getSelector();
122         String JavaDoc dest = dd.getDestinationJndiName();
123
124         // Get the number max of messages sent on a Session at one time.
125
// There is a bug in Joram: If the number is greater than 1, Joram
126
// will wait until this nb is reached!
127
// We set the number max to 1 to work around this bug.
128
// LATER: maxMessage = dd.getMaxMessages(), with this value configurable.
129
int maxMessages = 1;
130
131         if (dest == null) {
132             throw new EJBException JavaDoc("The destination JNDI name is null in bean " + dd.getEjbName());
133         }
134
135         try {
136             if (dd.isTopicDestination()) {
137                 // topic
138
XATopicConnectionFactory JavaDoc tcf = jms.getXATopicConnectionFactory();
139                 tconn = tcf.createXATopicConnection();
140                 Topic JavaDoc t = jms.getTopic(dest);
141                 if (dd.isSubscriptionDurable()) {
142                     if (TraceEjb.isDebugJms()) {
143                         TraceEjb.mdb.log(BasicLevel.DEBUG, "createDurableConnectionConsumer for "+ejbname);
144                     }
145                     cc = tconn.createDurableConnectionConsumer(t, ejbname, selector, this, maxMessages);
146                 } else {
147                     if (TraceEjb.isDebugJms()) {
148                         TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest);
149                     }
150                     cc = tconn.createConnectionConsumer(t, selector, this, maxMessages);
151                 }
152                 tconn.start();
153             } else {
154                 // queue
155
XAQueueConnectionFactory JavaDoc qcf = jms.getXAQueueConnectionFactory();
156                 qconn = qcf.createXAQueueConnection();
157                 Queue JavaDoc q = jms.getQueue(dest);
158                 if (TraceEjb.isDebugJms()) {
159                     TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest);
160                 }
161                 cc = qconn.createConnectionConsumer(q, selector, this, maxMessages);
162                 qconn.start();
163             }
164         } catch (Exception JavaDoc e) {
165             throw new EJBException JavaDoc("Cannot create connection consumer in bean " + dd.getEjbName() + " :", e);
166         }
167
168         minPoolSize = dd.getPoolMin();
169         maxCacheSize = dd.getCacheMax();
170         if(TraceEjb.isDebugSwapper()) {
171             TraceEjb.swapper.log(BasicLevel.DEBUG," maxCacheSize = "+ maxCacheSize +
172                                  " minPoolSize = "+minPoolSize);
173         }
174     }
175
176     // ---------------------------------------------------------------
177
// Specific BeanFactory implementation
178
// ---------------------------------------------------------------
179

180     /**
181      * Init pool of instances
182      */

183     public void initInstancePool() {
184         if (minPoolSize != 0) {
185             TraceEjb.mdb.log(BasicLevel.INFO, "pre-allocate a set of " + minPoolSize
186                     + " message driven bean instances");
187             // pre-allocate a set of ServerSession
188
synchronized (sspool) {
189                 for (int i = 0; i < minPoolSize; i++) {
190                     ServerSession JavaDoc ss = null;
191                     try {
192                         ss = createNewInstance();
193                         sspool.add(ss);
194                     } catch (Exception JavaDoc e) {
195                         TraceEjb.mdb.log(BasicLevel.ERROR, "cannot init pool of instances ");
196                         throw new EJBException JavaDoc("cannot init pool of instances ", e);
197                     }
198                 }
199             }
200         }
201     }
202
203     /**
204      * @return the size of the ServerSessionPool
205      */

206     public int getPoolSize() {
207         return sspool.size();
208     }
209
210     /**
211      * stop this EJB.
212      * call ejbRemove on all MDB
213      * close the connection consumer
214      * Stop the threads and remove the beans
215      */

216     public void stop() {
217         if (TraceEjb.isDebugJms()) {
218             TraceEjb.mdb.log(BasicLevel.DEBUG, "");
219         }
220         try {
221             cc.close();
222             if (tconn != null)
223                 tconn.close();
224             if (qconn != null)
225                 qconn.close();
226         } catch(javax.jms.JMSException JavaDoc e) {
227             TraceEjb.logger.log(BasicLevel.WARN, "unregister: Cannot close Connection Consumer");
228         }
229     }
230
231     /**
232      * synchronize bean instances if needed
233      */

234     public void sync() {
235     }
236
237
238     /**
239      * @return the home if exist
240      */

241     public JHome getHome() {
242         return null;
243     }
244
245     /**
246      * @return the local home if exist
247      */

248     public JLocalHome getLocalHome() {
249         return null;
250     }
251
252     // ---------------------------------------------------------------
253
// ServerSessionPool Implementation
254
// ---------------------------------------------------------------
255

256     /**
257      * Return a server session from the pool. If pool is empty, creates a new one.
258      *
259      * @return a server session from the pool.
260      * @exception JMSException - if an application server fails to return a Server Session
261      * out of its server session pool.
262      */

263     public ServerSession JavaDoc getServerSession() throws JMSException JavaDoc {
264         if (TraceEjb.isDebugJms()) {
265             TraceEjb.mdb.log(BasicLevel.DEBUG, "");
266         }
267
268         return getNewInstance();
269     }
270
271
272
273     // ---------------------------------------------------------------
274
// Other methods
275
// ---------------------------------------------------------------
276

277     /**
278      * put the ServerSession back to the pool
279      * @param ss The ServerSession
280      */

281     public void releaseServerSession(ServerSession JavaDoc ss) {
282         if (TraceEjb.isDebugJms()) {
283             TraceEjb.mdb.log(BasicLevel.DEBUG, "");
284         }
285
286         synchronized (sspool) {
287             sspool.add(ss);
288             if(TraceEjb.isDebugSwapper()) {
289                     TraceEjb.swapper.log(BasicLevel.DEBUG, "notifyAll " );
290             }
291             sspool.notifyAll();
292         }
293         if(TraceEjb.isDebugJms()) {
294             TraceEjb.mdb.log(BasicLevel.DEBUG, "nb instances " + getCacheSize());
295         }
296
297     }
298
299     // ---------------------------------------------------------------
300
// other public methods
301
// ---------------------------------------------------------------
302

303     /**
304      * Obtains the TimerService associated for this Bean
305      * @return a JTimerService instance.
306      */

307     public TimerService JavaDoc getTimerService() {
308         if (myTimerService == null) {
309             // TODO : Check that instance implements TimedObject ?
310
myTimerService = new JTimerService(this);
311         }
312         return myTimerService;
313     }
314
315     /**
316      * @return min pool size
317      * for Jmx
318      */

319     public int getMinPoolSize() {
320         return minPoolSize;
321     }
322
323     /**
324      * @return max cache size
325      * for Jmx
326      */

327     public int getMaxCacheSize() {
328         return maxCacheSize;
329     }
330
331     /**
332      * @return current cache size ( = nb of instance created)
333      * for Jmx
334      */

335     public int getCacheSize() {
336         return instanceCount;
337     }
338
339     /**
340      * @return the Transaction Attribute
341      */

342     public int getTransactionAttribute() {
343         return ((MessageDrivenDesc)dd).getTxAttribute();
344     }
345
346     /**
347      * For Message Driven Beans, only 2 cases are possible.
348      * @param rctx The Request Context
349      */

350     public void checkTransaction(RequestCtx rctx) {
351         if (rctx.txAttr == MethodDesc.TX_REQUIRED) {
352             try {
353                 if (tm.getTransaction() != null) {
354                     // This should not occur (DEBUG)
355
TraceEjb.logger.log(BasicLevel.ERROR, "Transaction already opened by this thread.");
356                     TraceEjb.logger.log(BasicLevel.ERROR, "Transaction status = " + tm.getStatus());
357                     TraceEjb.logger.log(BasicLevel.ERROR, "Transaction = " + tm.getTransaction());
358                     Thread.dumpStack();
359                     return;
360                 }
361                 tm.begin();
362                 rctx.mustCommit = true;
363                 rctx.currTx = tm.getTransaction();
364                 if (TraceEjb.isDebugTx()) {
365                     TraceEjb.tx.log(BasicLevel.DEBUG, "Transaction started: " + rctx.currTx);
366                 }
367             } catch (Exception JavaDoc e) {
368                 // No exception raised in case of MDB
369
TraceEjb.logger.log(BasicLevel.ERROR, "cannot start tx", e);
370                 return;
371             }
372         }
373     }
374
375     /**
376      * Reduce number of instances in memory in the free list
377      * we reduce to the minPoolSize
378      */

379     public void reduceCache() {
380         if (TraceEjb.isDebugSwapper()) {
381             TraceEjb.swapper.log(BasicLevel.DEBUG, "");
382         }
383         // reduce the pool to the minPoolSize
384
int poolsz = minPoolSize;
385         synchronized (sspool) {
386             if(TraceEjb.isDebugSwapper()) {
387                 TraceEjb.swapper.log(BasicLevel.DEBUG, "try to reduce " + sspool.size() +
388                                      " to " + poolsz);
389             }
390             while (sspool.size() > poolsz) {
391                 ListIterator JavaDoc i = sspool.listIterator();
392                 if (i.hasNext()) {
393                     i.next();
394                     i.remove();
395                     instanceCount--;
396                 }
397             }
398         }
399         if (TraceEjb.isDebugSwapper()) {
400             TraceEjb.swapper.log(BasicLevel.DEBUG, "cacheSize= " + getCacheSize());
401         }
402
403     }
404
405     /**
406      * Notify a timeout for this bean
407      * @param timer timer whose expiration caused this notification.
408      */

409     public void notifyTimeout(Timer JavaDoc timer) {
410         if (TraceEjb.isDebugJms()) {
411             TraceEjb.mdb.log(BasicLevel.DEBUG, "");
412         }
413
414         // We need an instance from the pool to process the timeout.
415
JMessageDrivenBean jmdb = null;
416         try {
417             jmdb = getNewInstance();
418         } catch (JMSException JavaDoc e) {
419             TraceEjb.logger.log(BasicLevel.ERROR, "exception:" + e);
420             throw new EJBException JavaDoc("Cannot deliver the timeout", e);
421         }
422
423         // deliver the timeout to the bean
424
jmdb.deliverTimeout(timer);
425
426         // release the instance
427
releaseServerSession(jmdb);
428     }
429
430     // ---------------------------------------------------------------
431
// private methods
432
// ---------------------------------------------------------------
433

434     /**
435      * return a new instance of the bean. Try to get one from the pool,
436      * and create a new one if the pool is empty.
437      */

438     private JMessageDrivenBean getNewInstance() throws JMSException JavaDoc {
439         if (TraceEjb.isDebugJms()) {
440             TraceEjb.mdb.log(BasicLevel.DEBUG, "");
441         }
442
443         // try to get one from the Pool
444
JMessageDrivenBean ss = null;
445
446         // try to find a free context in the pool
447
synchronized(sspool) {
448             if (!sspool.isEmpty()) {
449                 try {
450                     ss = (JMessageDrivenBean) sspool.remove(0);
451                     return ss;
452                 } catch(IndexOutOfBoundsException JavaDoc ex) {
453                     // This should never happen
454
TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+ex);
455                     throw new EJBException JavaDoc("Cannot get an instance from the pool", ex);
456                 }
457             } else {
458                 if (TraceEjb.isDebugJms()) {
459                     TraceEjb.mdb.log(BasicLevel.DEBUG,"pool is empty");
460                 }
461                 if (maxCacheSize == 0 || instanceCount < maxCacheSize) {
462                     // Pool is empty creates the ServerSession object
463
try {
464                         ss = createNewInstance();
465                     } catch (Exception JavaDoc e) {
466                         TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+e);
467                         throw new EJBException JavaDoc("Cannot create a new instance", e);
468                     }
469                 } else {
470                     while (sspool.isEmpty()) {
471                         if (TraceEjb.isDebugSwapper()) {
472                             TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool.isEmpty() = true --> wait()");
473                         }
474                         try {
475                             sspool.wait();
476                             if (TraceEjb.isDebugSwapper()) {
477                                 TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool notified");
478                             }
479                         } catch (InterruptedException JavaDoc e) {
480                             if (TraceEjb.isDebugSwapper()) {
481                                 TraceEjb.swapper.log(BasicLevel.DEBUG, "sspool waiting interrupted", e);
482                             }
483                         } catch (Exception JavaDoc e) {
484                             throw new EJBException JavaDoc("synchronization pb", e);
485                         }
486                     }
487                     try {
488                         ListIterator JavaDoc i = sspool.listIterator();
489                         if (i.hasNext()) {
490                             ss = (JMessageDrivenBean) i.next();
491                             i.remove();
492                         }
493                         return ss;
494                     } catch(IndexOutOfBoundsException JavaDoc ex) {
495                         // pool is empty
496
}
497                 }
498
499             }
500             if(TraceEjb.isDebugSwapper()) {
501                 TraceEjb.swapper.log(BasicLevel.DEBUG, "nb instances " + getCacheSize());
502             }
503             return ss;
504         }
505     }
506
507     /**
508      * Create a new instance of the bean
509      */

510     private JMessageDrivenBean createNewInstance() throws Exception JavaDoc {
511         if (TraceEjb.isDebugJms()) {
512             TraceEjb.mdb.log(BasicLevel.DEBUG, "");
513         }
514         Session JavaDoc sess = null;
515         JMessageDrivenBean ss = null;
516         MessageDrivenDesc mdd = (MessageDrivenDesc) dd;
517         if (tconn != null) {
518             if (mdd.isRequired()) {
519                 sess = tconn.createXATopicSession();
520             } else {
521                 sess = tconn.createTopicSession(false, mdd.getAcknowledgeMode());
522             }
523         } else if (qconn != null) {
524             if (mdd.isRequired()) {
525                 sess = qconn.createXAQueueSession();
526             } else {
527                 sess = qconn.createQueueSession(false, mdd.getAcknowledgeMode());
528             }
529         } else {
530             TraceEjb.mdb.log(BasicLevel.ERROR, "connection not initialized");
531             throw new Exception JavaDoc("JMS connection not initialized");
532         }
533
534         // Set ContextClassLoader with the ejbclassloader.
535
// This is necessary in case ejbCreate calls another bean in the same jar.
536
Thread.currentThread().setContextClassLoader(myClassLoader());
537
538         // Creates the new instance
539
MessageDrivenBean JavaDoc mdb = null;
540         try {
541             mdb = (MessageDrivenBean JavaDoc) beanclass.newInstance();
542         } catch (Exception JavaDoc e) {
543             TraceEjb.logger.log(BasicLevel.ERROR, "failed to create instance:", e);
544             throw new EJBException JavaDoc("Container failed to create instance of Message Driven Bean", e);
545         }
546
547         // Instanciates a new JMessageDrivenBean object
548
// and set it as the MessageListener for this Session.
549
ss = new JMessageDrivenBean(this, sess, mdb, wm);
550         sess.setMessageListener((MessageListener JavaDoc)ss);
551
552         // starts the bean instance: setMessageDrivenContext() + ejbCreate()
553
// see EJB spec. 2.0 page 322.
554
// Both operations must be called with the correct ComponentContext
555
Context JavaDoc ctxsave = setComponentContext();
556         mdb.setMessageDrivenContext((MessageDrivenContext JavaDoc)ss);
557         try {
558             beanclass.getMethod("ejbCreate", (Class JavaDoc[]) null).invoke(mdb, (Object JavaDoc[]) null);
559         } catch (Exception JavaDoc e) {
560             TraceEjb.logger.log(BasicLevel.ERROR, "cannot call ejbCreate on message driven bean instance ", e);
561             throw new EJBException JavaDoc(" Container fails to call ejbCreate on message driven bean instance", e);
562         } finally {
563             resetComponentContext(ctxsave);
564         }
565
566         synchronized (sspool) {
567             instanceCount++;
568         }
569         return ss;
570     }
571
572 }
573
Popular Tags