1 25 26 package org.objectweb.jonas_ejb.container; 27 28 29 import java.util.ArrayList ; 30 import java.util.List ; 31 import java.util.ListIterator ; 32 33 import javax.ejb.EJBException ; 34 import javax.ejb.MessageDrivenBean ; 35 import javax.ejb.MessageDrivenContext ; 36 import javax.ejb.Timer ; 37 import javax.ejb.TimerService ; 38 import javax.jms.ConnectionConsumer ; 39 import javax.jms.JMSException ; 40 import javax.jms.MessageListener ; 41 import javax.jms.Queue ; 42 import javax.jms.ServerSession ; 43 import javax.jms.ServerSessionPool ; 44 import javax.jms.Session ; 45 import javax.jms.Topic ; 46 import javax.jms.XAQueueConnection ; 47 import javax.jms.XAQueueConnectionFactory ; 48 import javax.jms.XATopicConnection ; 49 import javax.jms.XATopicConnectionFactory ; 50 import javax.naming.Context ; 51 52 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc; 53 import org.objectweb.jonas_ejb.deployment.api.MethodDesc; 54 55 import org.objectweb.jonas_jms.api.JmsManager; 56 57 import org.objectweb.util.monolog.api.BasicLevel; 58 59 66 public class JMdbFactory extends JFactory implements ServerSessionPool { 67 68 71 private JmsManager jms = null; 72 73 76 ConnectionConsumer cc = null; 77 78 81 private List sspool = new ArrayList (); 82 83 protected int instanceCount = 0; 84 protected int minPoolSize = 0; 86 protected int maxCacheSize = 0; 88 89 93 protected XATopicConnection tconn = null; 94 95 99 protected XAQueueConnection qconn = null; 100 101 107 public JMdbFactory(MessageDrivenDesc dd, JContainer cont) { 108 super(dd, cont); 109 110 txbeanmanaged = dd.isBeanManagedTransaction(); 112 113 jms = cont.getJmsManager(); 115 if (jms == null) { 116 TraceEjb.logger.log(BasicLevel.ERROR, "cannot deploy a message driven bean without the JMS Service"); 117 throw new EJBException ("JMS Service must be run"); 118 } 119 120 String selector = dd.getSelector(); 122 String dest = dd.getDestinationJndiName(); 123 124 int maxMessages = 1; 130 131 if (dest == null) { 132 throw new EJBException ("The destination JNDI name is null in bean " + dd.getEjbName()); 133 } 134 135 try { 136 if (dd.isTopicDestination()) { 137 XATopicConnectionFactory tcf = jms.getXATopicConnectionFactory(); 139 tconn = tcf.createXATopicConnection(); 140 Topic t = jms.getTopic(dest); 141 if (dd.isSubscriptionDurable()) { 142 if (TraceEjb.isDebugJms()) { 143 TraceEjb.mdb.log(BasicLevel.DEBUG, "createDurableConnectionConsumer for "+ejbname); 144 } 145 cc = tconn.createDurableConnectionConsumer(t, ejbname, selector, this, maxMessages); 146 } else { 147 if (TraceEjb.isDebugJms()) { 148 TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest); 149 } 150 cc = tconn.createConnectionConsumer(t, selector, this, maxMessages); 151 } 152 tconn.start(); 153 } else { 154 XAQueueConnectionFactory qcf = jms.getXAQueueConnectionFactory(); 156 qconn = qcf.createXAQueueConnection(); 157 Queue q = jms.getQueue(dest); 158 if (TraceEjb.isDebugJms()) { 159 TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest); 160 } 161 cc = qconn.createConnectionConsumer(q, selector, this, maxMessages); 162 qconn.start(); 163 } 164 } catch (Exception e) { 165 throw new EJBException ("Cannot create connection consumer in bean " + dd.getEjbName() + " :", e); 166 } 167 168 minPoolSize = dd.getPoolMin(); 169 maxCacheSize = dd.getCacheMax(); 170 if(TraceEjb.isDebugSwapper()) { 171 TraceEjb.swapper.log(BasicLevel.DEBUG," maxCacheSize = "+ maxCacheSize + 172 " minPoolSize = "+minPoolSize); 173 } 174 } 175 176 180 183 public void initInstancePool() { 184 if (minPoolSize != 0) { 185 TraceEjb.mdb.log(BasicLevel.INFO, "pre-allocate a set of " + minPoolSize 186 + " message driven bean instances"); 187 synchronized (sspool) { 189 for (int i = 0; i < minPoolSize; i++) { 190 ServerSession ss = null; 191 try { 192 ss = createNewInstance(); 193 sspool.add(ss); 194 } catch (Exception e) { 195 TraceEjb.mdb.log(BasicLevel.ERROR, "cannot init pool of instances "); 196 throw new EJBException ("cannot init pool of instances ", e); 197 } 198 } 199 } 200 } 201 } 202 203 206 public int getPoolSize() { 207 return sspool.size(); 208 } 209 210 216 public void stop() { 217 if (TraceEjb.isDebugJms()) { 218 TraceEjb.mdb.log(BasicLevel.DEBUG, ""); 219 } 220 try { 221 cc.close(); 222 if (tconn != null) 223 tconn.close(); 224 if (qconn != null) 225 qconn.close(); 226 } catch(javax.jms.JMSException e) { 227 TraceEjb.logger.log(BasicLevel.WARN, "unregister: Cannot close Connection Consumer"); 228 } 229 } 230 231 234 public void sync() { 235 } 236 237 238 241 public JHome getHome() { 242 return null; 243 } 244 245 248 public JLocalHome getLocalHome() { 249 return null; 250 } 251 252 256 263 public ServerSession getServerSession() throws JMSException { 264 if (TraceEjb.isDebugJms()) { 265 TraceEjb.mdb.log(BasicLevel.DEBUG, ""); 266 } 267 268 return getNewInstance(); 269 } 270 271 272 273 277 281 public void releaseServerSession(ServerSession ss) { 282 if (TraceEjb.isDebugJms()) { 283 TraceEjb.mdb.log(BasicLevel.DEBUG, ""); 284 } 285 286 synchronized (sspool) { 287 sspool.add(ss); 288 if(TraceEjb.isDebugSwapper()) { 289 TraceEjb.swapper.log(BasicLevel.DEBUG, "notifyAll " ); 290 } 291 sspool.notifyAll(); 292 } 293 if(TraceEjb.isDebugJms()) { 294 TraceEjb.mdb.log(BasicLevel.DEBUG, "nb instances " + getCacheSize()); 295 } 296 297 } 298 299 303 307 public TimerService getTimerService() { 308 if (myTimerService == null) { 309 myTimerService = new JTimerService(this); 311 } 312 return myTimerService; 313 } 314 315 319 public int getMinPoolSize() { 320 return minPoolSize; 321 } 322 323 327 public int getMaxCacheSize() { 328 return maxCacheSize; 329 } 330 331 335 public int getCacheSize() { 336 return instanceCount; 337 } 338 339 342 public int getTransactionAttribute() { 343 return ((MessageDrivenDesc)dd).getTxAttribute(); 344 } 345 346 350 public void checkTransaction(RequestCtx rctx) { 351 if (rctx.txAttr == MethodDesc.TX_REQUIRED) { 352 try { 353 if (tm.getTransaction() != null) { 354 TraceEjb.logger.log(BasicLevel.ERROR, "Transaction already opened by this thread."); 356 TraceEjb.logger.log(BasicLevel.ERROR, "Transaction status = " + tm.getStatus()); 357 TraceEjb.logger.log(BasicLevel.ERROR, "Transaction = " + tm.getTransaction()); 358 Thread.dumpStack(); 359 return; 360 } 361 tm.begin(); 362 rctx.mustCommit = true; 363 rctx.currTx = tm.getTransaction(); 364 if (TraceEjb.isDebugTx()) { 365 TraceEjb.tx.log(BasicLevel.DEBUG, "Transaction started: " + rctx.currTx); 366 } 367 } catch (Exception e) { 368 TraceEjb.logger.log(BasicLevel.ERROR, "cannot start tx", e); 370 return; 371 } 372 } 373 } 374 375 379 public void reduceCache() { 380 if (TraceEjb.isDebugSwapper()) { 381 TraceEjb.swapper.log(BasicLevel.DEBUG, ""); 382 } 383 int poolsz = minPoolSize; 385 synchronized (sspool) { 386 if(TraceEjb.isDebugSwapper()) { 387 TraceEjb.swapper.log(BasicLevel.DEBUG, "try to reduce " + sspool.size() + 388 " to " + poolsz); 389 } 390 while (sspool.size() > poolsz) { 391 ListIterator i = sspool.listIterator(); 392 if (i.hasNext()) { 393 i.next(); 394 i.remove(); 395 instanceCount--; 396 } 397 } 398 } 399 if (TraceEjb.isDebugSwapper()) { 400 TraceEjb.swapper.log(BasicLevel.DEBUG, "cacheSize= " + getCacheSize()); 401 } 402 403 } 404 405 409 public void notifyTimeout(Timer timer) { 410 if (TraceEjb.isDebugJms()) { 411 TraceEjb.mdb.log(BasicLevel.DEBUG, ""); 412 } 413 414 JMessageDrivenBean jmdb = null; 416 try { 417 jmdb = getNewInstance(); 418 } catch (JMSException e) { 419 TraceEjb.logger.log(BasicLevel.ERROR, "exception:" + e); 420 throw new EJBException ("Cannot deliver the timeout", e); 421 } 422 423 jmdb.deliverTimeout(timer); 425 426 releaseServerSession(jmdb); 428 } 429 430 434 438 private JMessageDrivenBean getNewInstance() throws JMSException { 439 if (TraceEjb.isDebugJms()) { 440 TraceEjb.mdb.log(BasicLevel.DEBUG, ""); 441 } 442 443 JMessageDrivenBean ss = null; 445 446 synchronized(sspool) { 448 if (!sspool.isEmpty()) { 449 try { 450 ss = (JMessageDrivenBean) sspool.remove(0); 451 return ss; 452 } catch(IndexOutOfBoundsException ex) { 453 TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+ex); 455 throw new EJBException ("Cannot get an instance from the pool", ex); 456 } 457 } else { 458 if (TraceEjb.isDebugJms()) { 459 TraceEjb.mdb.log(BasicLevel.DEBUG,"pool is empty"); 460 } 461 if (maxCacheSize == 0 || instanceCount < maxCacheSize) { 462 try { 464 ss = createNewInstance(); 465 } catch (Exception e) { 466 TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+e); 467 throw new EJBException ("Cannot create a new instance", e); 468 } 469 } else { 470 while (sspool.isEmpty()) { 471 if (TraceEjb.isDebugSwapper()) { 472 TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool.isEmpty() = true --> wait()"); 473 } 474 try { 475 sspool.wait(); 476 if (TraceEjb.isDebugSwapper()) { 477 TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool notified"); 478 } 479 } catch (InterruptedException e) { 480 if (TraceEjb.isDebugSwapper()) { 481 TraceEjb.swapper.log(BasicLevel.DEBUG, "sspool waiting interrupted", e); 482 } 483 } catch (Exception e) { 484 throw new EJBException ("synchronization pb", e); 485 } 486 } 487 try { 488 ListIterator i = sspool.listIterator(); 489 if (i.hasNext()) { 490 ss = (JMessageDrivenBean) i.next(); 491 i.remove(); 492 } 493 return ss; 494 } catch(IndexOutOfBoundsException ex) { 495 } 497 } 498 499 } 500 if(TraceEjb.isDebugSwapper()) { 501 TraceEjb.swapper.log(BasicLevel.DEBUG, "nb instances " + getCacheSize()); 502 } 503 return ss; 504 } 505 } 506 507 510 private JMessageDrivenBean createNewInstance() throws Exception { 511 if (TraceEjb.isDebugJms()) { 512 TraceEjb.mdb.log(BasicLevel.DEBUG, ""); 513 } 514 Session sess = null; 515 JMessageDrivenBean ss = null; 516 MessageDrivenDesc mdd = (MessageDrivenDesc) dd; 517 if (tconn != null) { 518 if (mdd.isRequired()) { 519 sess = tconn.createXATopicSession(); 520 } else { 521 sess = tconn.createTopicSession(false, mdd.getAcknowledgeMode()); 522 } 523 } else if (qconn != null) { 524 if (mdd.isRequired()) { 525 sess = qconn.createXAQueueSession(); 526 } else { 527 sess = qconn.createQueueSession(false, mdd.getAcknowledgeMode()); 528 } 529 } else { 530 TraceEjb.mdb.log(BasicLevel.ERROR, "connection not initialized"); 531 throw new Exception ("JMS connection not initialized"); 532 } 533 534 Thread.currentThread().setContextClassLoader(myClassLoader()); 537 538 MessageDrivenBean mdb = null; 540 try { 541 mdb = (MessageDrivenBean ) beanclass.newInstance(); 542 } catch (Exception e) { 543 TraceEjb.logger.log(BasicLevel.ERROR, "failed to create instance:", e); 544 throw new EJBException ("Container failed to create instance of Message Driven Bean", e); 545 } 546 547 ss = new JMessageDrivenBean(this, sess, mdb, wm); 550 sess.setMessageListener((MessageListener )ss); 551 552 Context ctxsave = setComponentContext(); 556 mdb.setMessageDrivenContext((MessageDrivenContext )ss); 557 try { 558 beanclass.getMethod("ejbCreate", (Class []) null).invoke(mdb, (Object []) null); 559 } catch (Exception e) { 560 TraceEjb.logger.log(BasicLevel.ERROR, "cannot call ejbCreate on message driven bean instance ", e); 561 throw new EJBException (" Container fails to call ejbCreate on message driven bean instance", e); 562 } finally { 563 resetComponentContext(ctxsave); 564 } 565 566 synchronized (sspool) { 567 instanceCount++; 568 } 569 return ss; 570 } 571 572 } 573 | Popular Tags |