1 25 26 package org.objectweb.jonas_ejb.container; 27 28 import java.lang.reflect.Method ; 29 import java.lang.reflect.Proxy ; 30 import java.util.ArrayList ; 31 import java.util.List ; 32 import java.util.LinkedList ; 33 import java.util.ListIterator ; 34 35 import javax.ejb.EJBException ; 36 import javax.ejb.MessageDrivenBean ; 37 import javax.ejb.MessageDrivenContext ; 38 import javax.ejb.Timer ; 39 import javax.ejb.TimerService ; 40 import javax.naming.Context ; 41 import javax.resource.spi.ActivationSpec ; 42 import javax.resource.spi.UnavailableException ; 43 import javax.resource.spi.endpoint.MessageEndpoint ; 44 import javax.resource.spi.endpoint.MessageEndpointFactory ; 45 import javax.transaction.SystemException ; 46 import javax.transaction.xa.XAResource ; 47 48 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc; 49 import org.objectweb.jonas_ejb.deployment.api.MethodDesc; 50 import org.objectweb.jonas_ejb.deployment.api.ActivationConfigPropertyDesc; 51 52 import org.objectweb.jonas.resource.Rar; 53 import org.objectweb.jotm.Current; 54 55 import org.objectweb.util.monolog.api.BasicLevel; 56 57 62 public class JMdbEndpointFactory extends JFactory implements MessageEndpointFactory { 63 64 67 private List endpool = new ArrayList (); 68 69 72 protected int instanceCount = 0; 73 74 77 protected int minPoolSize = 0; 78 79 82 protected int maxCacheSize = 0; 83 84 87 private ActivationSpec as = null; 88 89 92 private String msglistenType = null; 93 94 97 private Rar rar = null; 98 99 102 private boolean activated = false; 103 104 107 private static String JMESSAGEENDPOINT_CLASS = "org.objectweb.jonas_ejb.container.JMessageEndpointIntf"; 108 109 112 private static String MESSAGEENDPOINT_CLASS = "javax.resource.spi.endpoint.MessageEndpoint"; 113 114 117 private static String MESSAGEDRIVENCONTEXT_CLASS = "javax.ejb.MessageDrivenContext"; 118 119 125 public JMdbEndpointFactory(MessageDrivenDesc dd, JContainer cont, ActivationSpec as) { 126 super(dd, cont); 127 String dest = dd.getDestinationJndiName(); 128 List acpl = null; 130 if (dd.getMdActivationConfigDesc() != null) { 131 acpl = dd.getMdActivationConfigDesc().getActivationConfigPropertyList(); 132 } 133 List jAcpl = null; 134 if (dd.getJonasMdActivationConfigDesc() != null) { 135 jAcpl = dd.getJonasMdActivationConfigDesc().getActivationConfigPropertyList(); 136 } 137 mdbEFInit(dd, dest, acpl, jAcpl, as); 138 } 139 140 147 public JMdbEndpointFactory(MessageDrivenDesc dd, String destination, JContainer cont, 148 ActivationSpec as) { 149 super(dd, cont); 150 151 153 List jAcpl = new LinkedList (); 154 List acpl = null; 155 if (dd.getMdActivationConfigDesc() != null) { 156 acpl = dd.getMdActivationConfigDesc().getActivationConfigPropertyList(); 157 } 158 buildACL(dd, jAcpl); 159 160 mdbEFInit(dd, destination, acpl, jAcpl, as); 161 } 162 163 168 private void buildACL(MessageDrivenDesc dd, List jacl) { 169 170 String [] aclNames = {"destination", "destinationType", "messageSelector", 171 "acknowledgeMode", "subscriptionDurability" }; 172 173 ActivationConfigPropertyDesc acp = null; 174 175 acp = new ActivationConfigPropertyDesc(); 176 acp.setActivationConfigPropertyName("destination"); 177 acp.setActivationConfigPropertyValue(dd.getDestinationJndiName()); 178 jacl.add(acp); 179 180 acp = new ActivationConfigPropertyDesc(); 181 acp.setActivationConfigPropertyName("destinationType"); 182 if (dd.isTopicDestination()) { 183 acp.setActivationConfigPropertyValue("javax.jms.Topic"); 184 } else { 185 acp.setActivationConfigPropertyValue("javax.jms.Queue"); 186 } 187 jacl.add(acp); 188 189 acp = new ActivationConfigPropertyDesc(); 190 acp.setActivationConfigPropertyName("messageSelector"); 191 acp.setActivationConfigPropertyValue(dd.getSelector()); 192 jacl.add(acp); 193 194 acp = new ActivationConfigPropertyDesc(); 195 acp.setActivationConfigPropertyName("acknowledgeMode"); 196 if (dd.getAcknowledgeMode() == MessageDrivenDesc.AUTO_ACKNOWLEDGE) { 197 acp.setActivationConfigPropertyValue("Auto-acknowledge"); 198 } else { 199 acp.setActivationConfigPropertyValue("Dups-ok-acknowledge"); 200 } 201 jacl.add(acp); 202 203 acp = new ActivationConfigPropertyDesc(); 204 acp.setActivationConfigPropertyName("subscriptionDurability"); 205 if (dd.getSubscriptionDurability() == MessageDrivenDesc.SUBS_DURABLE) { 206 acp.setActivationConfigPropertyValue("Durable"); 207 } else { 208 acp.setActivationConfigPropertyValue("NonDurable"); 209 } 210 jacl.add(acp); 211 212 if (dd.getSubscriptionDurability() == MessageDrivenDesc.SUBS_DURABLE) { 213 acp = new ActivationConfigPropertyDesc(); 214 acp.setActivationConfigPropertyName("subscriptionName"); 215 acp.setActivationConfigPropertyValue(dd.getEjbName()); 216 jacl.add(acp); 217 } 218 219 List acpl = null; 221 if (dd.getJonasMdActivationConfigDesc() != null) { 222 acpl = dd.getJonasMdActivationConfigDesc().getActivationConfigPropertyList(); 223 224 String acpName = null; 225 boolean found = false; 226 for (int i = 0; i < acpl.size(); i++) { 227 acp = (ActivationConfigPropertyDesc) acpl.get(i); 228 acpName = acp.getActivationConfigPropertyName(); 229 found = false; 230 for (int j = 0; j < aclNames.length; j++) { 231 if (acpName.equals(aclNames[j])) { 232 found = true; 233 break; 234 } 235 } 236 if (!found) { 237 jacl.add(acp); 238 } 239 } 240 241 } 242 } 243 244 252 private void mdbEFInit(MessageDrivenDesc dd, String dest, List acpl, 253 List jAcpl, ActivationSpec aSpec) { 254 255 String methName = "mdbEFInit: "; 256 String ejbName = dd.getEjbName(); 257 258 txbeanmanaged = dd.isBeanManagedTransaction(); 260 261 as = aSpec; 263 264 minPoolSize = dd.getPoolMin(); 265 maxCacheSize = dd.getCacheMax(); 266 if (TraceEjb.isDebugJms()) { 267 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "maxCacheSize = " + maxCacheSize + " minPoolSize = " + minPoolSize); 268 } 269 270 rar = Rar.getRar(dest); 272 if (rar == null) { 273 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot retrieve associated Resource Adapter "); 274 throw new EJBException ("cannot retrieve associated Resource Adapter "); 275 } 276 msglistenType = rar.getInterface(dest); 277 278 try { 279 rar.configureAS(as, acpl, jAcpl, dest, ejbName); 280 } catch (Exception ex) { 281 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot configure activationspec " + ex); 282 ex.printStackTrace(); 283 throw new EJBException ("cannot configure activationspec ", ex); 284 } 285 286 synchronized (endpool) { 288 for (int i = 0; i < minPoolSize; i++) { 289 JMessageEndpoint ep = null; 290 try { 291 ep = createNewInstance(); 292 endpool.add(ep); 293 } catch (Exception e) { 294 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot init pool of instances "); 295 throw new EJBException ("cannot init pool of instances ", e); 296 } 297 } 298 } 299 if (minPoolSize != 0) { 300 TraceEjb.mdb.log(BasicLevel.INFO, methName + "pre-allocate a set of " + minPoolSize 301 + " message driven bean instances"); 302 } 303 304 try { 306 ActivationSpec [] asArray = new ActivationSpec [1]; 307 asArray[0] = as; 308 XAResource [] xar = rar.getResourceAdapter().getXAResources(asArray); 309 if (xar != null && xar.length > 0) { 310 ((Current) tm).getTransactionRecovery(). 311 registerResourceManager(ejbName+msglistenType, 312 xar[0], "", null); 313 } 314 } catch (Exception ex) { 315 TraceEjb.mdb.log(BasicLevel.ERROR, ex.getMessage(), ex); 316 } 317 318 activated = true; 319 try { 321 rar.getResourceAdapter().endpointActivation(this, as); 322 } catch (Exception ex) { 323 activated = false; 324 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot activate endpoint ", ex); 325 throw new EJBException ("cannot activate endpoint ", ex); 326 } 327 } 328 329 333 336 public void initInstancePool() { 337 } 338 339 342 public int getPoolSize() { 343 return endpool.size(); 344 } 345 346 350 public void stop() { 351 String methName = "stop: "; 352 if (TraceEjb.isDebugJms()) { 353 TraceEjb.mdb.log(BasicLevel.DEBUG, methName); 354 } 355 try { 356 rar.getResourceAdapter().endpointDeactivation(this, as); 358 synchronized (endpool) { 360 if (TraceEjb.isDebugJms()) { 361 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "stopping " + this); 362 } 363 JMessageEndpoint ep = null; 364 while (endpool.size() > 0) { 365 ep = (JMessageEndpoint) endpool.remove(0); 366 instanceCount--; 367 try { 368 ep.mdb.ejbRemove(); 369 } catch (Exception e) { 370 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "Cannot remove mdb: " 371 + ep.mdb, e); 372 } 373 } 374 } 375 } catch (Exception e) { 376 TraceEjb.mdb.log(BasicLevel.WARN, methName + "Cannot deactivate the endpoint", e); 377 } 378 } 379 380 383 public void sync() { 384 } 385 386 389 public JHome getHome() { 390 return null; 391 } 392 393 396 public JLocalHome getLocalHome() { 397 return null; 398 } 399 400 404 411 public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException { 412 String methName = "createEndpoint: "; 413 if (TraceEjb.isDebugJms()) { 414 TraceEjb.mdb.log(BasicLevel.DEBUG, methName); 415 } 416 417 if (!activated) { 418 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "mdb is not usuable either initial deployment or undeployment "); 419 throw new UnavailableException ("mdb is not usuable either initial deployment or undeployment "); 420 } 421 422 JMessageEndpoint ep = null; 423 try { 424 ep = getNewInstance(xaResource); 425 } catch (Exception ex) { 426 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot create an endpoint "); 427 throw new UnavailableException ("cannot create an endpoint ", ex); 428 } 429 return ep.mep; 430 } 431 432 439 public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { 440 int txAttribute = 0; 441 try { 442 txAttribute = dd.getMethodDesc(method).getTxAttribute(); 443 } catch (Exception ex) { 444 TraceEjb.mdb.log(BasicLevel.ERROR, "isDeliveryTransacted: No such method exists. " + method.getName(), ex); 445 throw new NoSuchMethodException ("No such method exists. " + method.getName()); 446 } 447 return txAttribute == MethodDesc.TX_REQUIRED ? true : false; 448 } 449 450 454 461 public JMessageEndpoint getMessageEndpoint() throws Exception { 462 if (TraceEjb.isDebugJms()) { 463 TraceEjb.mdb.log(BasicLevel.DEBUG, "getMessageEndpoint: "); 464 } 465 466 return getNewInstance(null); 467 } 468 469 473 public void releaseEndpoint(JMessageEndpoint ep) { 474 String methName = "releaseEndpoint: "; 475 if (TraceEjb.isDebugJms()) { 476 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + ep); 477 } 478 479 ep.setReleasedState(true); 480 synchronized (endpool) { 481 endpool.add(ep); 482 if (TraceEjb.isDebugJms()) { 483 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "notifyAll " ); 484 } 485 endpool.notifyAll(); 486 } 487 if (TraceEjb.isDebugJms()) { 488 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "nb instances " + getCacheSize()); 489 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "nb free cached instances " + getPoolSize()); 490 } 491 492 } 493 494 498 502 public TimerService getTimerService() { 503 if (myTimerService == null) { 504 myTimerService = new JTimerService(this); 506 } 507 return myTimerService; 508 } 509 510 513 public int getMinPoolSize() { 514 return minPoolSize; 515 } 516 517 520 public int getMaxCacheSize() { 521 return maxCacheSize; 522 } 523 524 527 public int getCacheSize() { 528 return instanceCount; 529 } 530 531 534 public int getTransactionAttribute() { 535 return ((MessageDrivenDesc) dd).getTxAttribute(); 536 } 537 538 543 public void checkTransaction(RequestCtx rctx) { 544 String methName = "checkTransaction: "; 545 if (rctx.txAttr == MethodDesc.TX_REQUIRED) { 546 try { 547 if (txbeanmanaged) { 548 if (tm.getTransaction() == null) { 549 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "No transaction and need one"); 550 return; 551 } 552 } else { 553 if (tm.getTransaction() == null) { 554 tm.begin(); 555 } 556 } 557 rctx.mustCommit = true; 558 rctx.currTx = tm.getTransaction(); 559 } catch (Exception e) { 560 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot start tx:", e); 562 return; 563 } 564 } else { 565 if (rctx.txAttr != MethodDesc.TX_NOT_SUPPORTED) { 566 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "Bad transaction attribute: " + rctx.txAttr); 567 } 568 try { 569 rctx.currTx = tm.getTransaction(); 570 if (rctx.currTx != null) { 571 if (TraceEjb.isDebugJms()) { 572 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "Suspending client tx"); 573 } 574 rctx.clientTx = tm.suspend(); 575 rctx.currTx = null; 576 } 577 } catch (SystemException e) { 578 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot suspend transaction", e); 579 return; 580 } 581 } 582 } 583 584 588 public void reduceCache() { 589 String methName = "reduceCache: "; 590 if (TraceEjb.isDebugJms()) { 591 TraceEjb.mdb.log(BasicLevel.DEBUG, methName); 592 } 593 int poolsz = minPoolSize; 595 synchronized (endpool) { 596 if (TraceEjb.isDebugJms()) { 597 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "try to reduce " + endpool.size() + " to " + poolsz); 598 } 599 while (endpool.size() > poolsz) { 600 ListIterator i = endpool.listIterator(); 601 if (i.hasNext()) { 602 i.next(); 603 i.remove(); 604 instanceCount--; 605 } 606 } 607 } 608 if (TraceEjb.isDebugJms()) { 609 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "cacheSize= " + getCacheSize()); 610 } 611 612 } 613 614 618 public void notifyTimeout(Timer timer) { 619 String methName = "notifyTimeout: "; 620 if (TraceEjb.isDebugJms()) { 621 TraceEjb.mdb.log(BasicLevel.DEBUG, methName); 622 } 623 624 JMessageEndpoint ep = null; 626 try { 627 ep = getNewInstance(null); 628 } catch (Exception e) { 629 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "exception:" + e); 630 throw new EJBException ("Cannot deliver the timeout", e); 631 } 632 633 ep.deliverTimeout(timer); 635 636 releaseEndpoint(ep); 638 } 639 640 644 652 private JMessageEndpoint getNewInstance(XAResource xaResource) throws Exception { 653 String methName = "getNewInstance: "; 654 if (TraceEjb.isDebugJms()) { 655 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "Factory: " + this + " XAResource: " + xaResource); 656 } 657 658 JMessageEndpoint ep = null; 660 661 synchronized (endpool) { 663 if (!endpool.isEmpty()) { 664 try { 665 ep = (JMessageEndpoint) endpool.remove(0); 666 } catch (Exception ex) { 667 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "Exception:" + ex); 669 throw new EJBException ("Cannot get an instance from the pool", ex); 670 } 671 } else { 672 if (TraceEjb.isDebugJms()) { 673 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "pool is empty"); 674 } 675 if (maxCacheSize == 0 || instanceCount < maxCacheSize) { 676 try { 678 ep = createNewInstance(); 679 } catch (Exception e) { 680 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "exception:" + e); 681 throw new EJBException ("Cannot create a new instance", e); 682 } 683 } else { 684 while (endpool.isEmpty()) { 685 if (TraceEjb.isDebugJms()) { 686 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "endpool.isEmpty() = true --> wait()"); 687 } 688 try { 689 endpool.wait(); 690 if (TraceEjb.isDebugJms()) { 691 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "endpool notified"); 692 } 693 } catch (InterruptedException e) { 694 if (TraceEjb.isDebugJms()) { 695 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "endpool waiting interrupted", e); 696 } 697 } catch (Exception e) { 698 throw new EJBException ("synchronization pb", e); 699 } 700 } 701 try { 702 ep = (JMessageEndpoint) endpool.remove(0); 703 } catch (Exception ex) { 704 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "Exception:" + ex); 706 throw new EJBException ("Cannot get an instance from the pool", ex); 707 } 708 } 709 710 } 711 if (TraceEjb.isDebugJms()) { 712 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "nb instances " + getCacheSize()); 713 } 714 ep.setXAResource(xaResource); 715 ep.setReleasedState(false); 716 if (TraceEjb.isDebugJms()) { 717 TraceEjb.mdb.log(BasicLevel.DEBUG, methName + "Returning " + ep); 718 } 719 return ep; 720 } 721 } 722 723 729 private JMessageEndpoint createNewInstance() throws Exception { 730 String methName = "createNewInstance: "; 731 if (TraceEjb.isDebugJms()) { 732 TraceEjb.mdb.log(BasicLevel.DEBUG, methName); 733 } 734 JMessageEndpointProxy epProxy = null; 735 MessageEndpoint ep = null; 736 JMessageEndpoint jep = null; 737 MessageDrivenDesc mdd = (MessageDrivenDesc) dd; 738 ClassLoader cls = myClassLoader(); 739 740 Thread.currentThread().setContextClassLoader(cls); 744 745 MessageDrivenBean mdb = null; 747 try { 748 mdb = (MessageDrivenBean ) beanclass.newInstance(); 749 } catch (Exception e) { 750 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "failed to create instance:", e); 751 throw new EJBException ("Container failed to create instance of Message Driven Bean", e); 752 } 753 754 jep = new JMessageEndpoint(this, mdb); 756 epProxy = new JMessageEndpointProxy(this, mdb, jep); 757 Class endpointClass = cls.loadClass(MESSAGEENDPOINT_CLASS); 758 Class msgListenerClass = cls.loadClass(msglistenType); 759 ep = (MessageEndpoint ) Proxy.newProxyInstance(cls, new Class [] {endpointClass, msgListenerClass}, epProxy); 760 761 jep.setProxy(ep); 762 Context ctxsave = setComponentContext(); 766 mdb.setMessageDrivenContext((MessageDrivenContext ) jep); 767 try { 768 beanclass.getMethod("ejbCreate", (Class []) null).invoke(mdb, (Object []) null); 769 } catch (Exception e) { 770 TraceEjb.mdb.log(BasicLevel.ERROR, methName + "cannot call ejbCreate on message driven bean instance ", e); 771 throw new EJBException (" Container fails to call ejbCreate on message driven bean instance", e); 772 } finally { 773 resetComponentContext(ctxsave); 774 } 775 776 synchronized (endpool) { 777 instanceCount++; 778 } 779 return jep; 780 } 781 782 } 783 | Popular Tags |