| 1 25 package org.objectweb.joram.client.connector; 26 27 import fr.dyade.aaa.agent.AgentServer; 28 import com.scalagent.jmx.JMXServer; 29 import org.objectweb.joram.client.jms.Queue; 30 import org.objectweb.joram.client.jms.Topic; 31 import org.objectweb.joram.client.jms.admin.AdminException; 32 import org.objectweb.joram.client.jms.admin.JoramAdmin; 33 import org.objectweb.joram.client.jms.admin.User; 34 import org.objectweb.joram.client.jms.admin.DeadMQueue; 35 import org.objectweb.joram.client.jms.ha.local.XAHALocalConnectionFactory; 36 import org.objectweb.joram.client.jms.ha.tcp.XAHATcpConnectionFactory; 37 import org.objectweb.joram.client.jms.ha.local.TopicHALocalConnectionFactory; 38 import org.objectweb.joram.client.jms.ha.tcp.TopicHATcpConnectionFactory; 39 40 import org.objectweb.joram.client.jms.local.TopicLocalConnectionFactory; 41 import org.objectweb.joram.client.jms.local.XALocalConnectionFactory; 42 import org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory; 43 import org.objectweb.joram.client.jms.tcp.XATcpConnectionFactory; 44 import org.objectweb.joram.client.jms.ConnectionMetaData; 45 46 import java.io.BufferedReader ; 47 import java.io.File ; 48 import java.io.FileReader ; 49 import java.io.IOException ; 50 import java.lang.reflect.Method ; 51 import java.net.ConnectException ; 52 import java.util.Enumeration ; 53 import java.util.Hashtable ; 54 import java.util.List ; 55 import java.util.Properties ; 56 import java.util.StringTokenizer ; 57 import java.util.Vector ; 58 59 import javax.jms.Destination ; 60 import javax.jms.Session ; 61 import javax.jms.TopicConnectionFactory ; 62 import javax.jms.XAConnection ; 63 import javax.jms.XAConnectionFactory ; 64 import javax.management.MBeanServer ; 65 import javax.management.MBeanServerFactory ; 66 import javax.management.ObjectName ; 67 import javax.naming.Context ; 68 import javax.naming.InitialContext ; 69 import javax.resource.NotSupportedException ; 70 import javax.resource.ResourceException ; 71 import javax.resource.spi.ActivationSpec ; 72 import javax.resource.spi.BootstrapContext ; 73 import javax.resource.spi.CommException ; 74 import javax.resource.spi.IllegalStateException ; 75 import javax.resource.spi.ResourceAdapterInternalException ; 76 import javax.resource.spi.endpoint.MessageEndpointFactory ; 77 import javax.resource.spi.work.WorkManager ; 78 import javax.transaction.xa.XAResource ; 79 80 import org.objectweb.util.monolog.api.BasicLevel; 81 82 88 public class JoramAdapter 89 implements javax.resource.spi.ResourceAdapter , 90 java.io.Serializable , JoramAdapterMBean { 91 92 private transient WorkManager workManager; 93 94 101 private transient Hashtable consumers; 102 106 private transient Vector producers; 107 114 private transient Hashtable connections; 115 116 117 private boolean started = false; 118 119 private boolean stopped = false; 120 121 122 boolean collocated = false; 123 124 125 boolean isHa = false; 126 127 128 String hostName = "localhost"; 129 130 int serverPort = 16010; 131 132 133 short serverId = 0; 134 135 136 short clusterId = AgentServer.NULL_ID; 137 138 139 List platformServersIds = null; 140 141 146 private String platformConfigDir; 147 148 private boolean persistentPlatform = false; 149 153 private String adminFile = "joram-admin.cfg"; 154 private String adminFileXML = "joramAdmin.xml"; 155 156 157 160 private String adminFileExportXML = "joramAdminExport.xml"; 161 162 163 private String serverName = "s0"; 164 165 166 private static Vector boundNames = new Vector (); 167 168 private static ObjectName jmsResourceON; 169 170 private static MBeanServer mbs = null; 171 172 177 public int connectingTimer = 0; 178 183 public int txPendingTimer = 0; 184 190 public int cnxPendingTimer = 0; 191 192 199 public int queueMessageReadMax = 2; 200 201 207 public int topicAckBufferMax = 0; 208 209 215 public int topicPassivationThreshold = Integer.MAX_VALUE; 216 217 223 public int topicActivationThreshold = 0; 224 225 230 public boolean asyncSend = false; 231 232 239 public boolean multiThreadSync = false; 240 241 248 public int multiThreadSyncDelay = 1; 249 250 255 public boolean deleteDurableSubscription = false; 256 257 public JMXServer jmxServer; 258 259 private transient JoramAdmin joramAdmin; 260 261 264 public JoramAdapter() { 265 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 266 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 267 "JORAM adapter instantiated."); 268 269 consumers = new Hashtable (); 270 producers = new Vector (); 271 272 java.util.ArrayList array = MBeanServerFactory.findMBeanServer(null); 273 if (!array.isEmpty()) 274 mbs = (MBeanServer ) array.get(0); 275 jmxServer = new JMXServer(mbs,"JoramAdapter"); 276 } 277 278 285 public synchronized void start(BootstrapContext ctx) 286 throws ResourceAdapterInternalException  287 { 288 joramAdmin.setHa(isHa); 290 291 if (started) 292 throw new ResourceAdapterInternalException ("Adapter already started."); 293 if (stopped) 294 throw new ResourceAdapterInternalException ("Adapter has been stopped."); 295 296 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 297 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 298 "JORAM adapter starting deployment..."); 299 300 workManager = ctx.getWorkManager(); 301 302 if (collocated) { 304 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 305 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 306 " - Collocated JORAM server is starting..."); 307 308 if (persistentPlatform) { 309 System.setProperty("Transaction", "fr.dyade.aaa.util.NTransaction"); 310 System.setProperty("NTNoLockFile", "true"); 311 } else { 312 System.setProperty("Transaction", "fr.dyade.aaa.util.NullTransaction"); 313 System.setProperty("NbMaxAgents", "" + Integer.MAX_VALUE); 314 } 315 316 if (platformConfigDir != null) { 317 System.setProperty("fr.dyade.aaa.agent.A3CONF_DIR", platformConfigDir); 318 System.setProperty("fr.dyade.aaa.DEBUG_DIR", platformConfigDir); 319 } 320 321 try { 322 AgentServer.init(serverId, serverName, null, clusterId); 323 AgentServer.start(); 324 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 325 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 326 " - Collocated JORAM server has successfully started."); 327 } catch (Exception exc) { 328 AgentServer.stop(); 329 AgentServer.reset(true); 330 331 throw new ResourceAdapterInternalException ("Could not start " 332 + "collocated JORAM " 333 + " instance: " + exc); 334 } 335 } 336 337 try { 339 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 340 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 341 " - Reading the provided admin file: " + adminFileXML); 342 JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileXML); 343 } catch (Exception exc) { 344 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 345 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 346 "JORAM ADMIN XML not found."); 347 } 348 349 try { 351 adminConnect(); 352 serverId = (short) joramAdmin.getPlatformAdmin().getLocalServerId(); 353 } catch (Exception exc) { 354 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 355 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 356 " - JORAM server not administerable: " + exc); 357 } 358 359 if (joramAdmin != null) { 361 joramAdmin.setAdminFileExportXML(adminFileExportXML); 362 363 try { 364 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 365 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 366 " - Reading the provided admin file: " + adminFileExportXML); 367 JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileExportXML); 368 369 adminConnect(); 371 } catch (Exception exc) { 372 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 373 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 374 adminFileExportXML + " not found."); 375 } 376 } 377 378 try { 380 File file = null; 381 382 try { 383 if (platformConfigDir == null) { 384 java.net.URL url = ClassLoader.getSystemResource(adminFile); 385 file = new File (url.getFile()); 386 } 387 else 388 file = new File (platformConfigDir, adminFile); 389 } catch (NullPointerException e) { 390 throw new java.io.FileNotFoundException (); 391 } 392 393 FileReader fileReader = new FileReader (file); 394 BufferedReader reader = new BufferedReader (fileReader); 395 396 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 397 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 398 " - Reading the provided admin file: " + file); 399 400 boolean end = false; 401 String line; 402 StringTokenizer tokenizer; 403 String firstToken; 404 String name = null; 405 406 while (! end) { 407 try { 408 line = reader.readLine(); 409 410 if (line == null) 411 end = true; 412 else { 413 tokenizer = new StringTokenizer (line); 414 415 if (tokenizer.hasMoreTokens()) { 416 firstToken = tokenizer.nextToken(); 417 if (firstToken.equalsIgnoreCase("Host")) { 418 if (tokenizer.hasMoreTokens()) 419 hostName = tokenizer.nextToken(); 420 } 421 else if (firstToken.equalsIgnoreCase("Port")) { 422 if (tokenizer.hasMoreTokens()) 423 serverPort = Integer.parseInt(tokenizer.nextToken()); 424 } 425 else if (firstToken.equalsIgnoreCase("Queue")) { 426 if (tokenizer.hasMoreTokens()) { 427 name = tokenizer.nextToken(); 428 createQueue(name); 429 } 430 } 431 else if (firstToken.equalsIgnoreCase("Topic")) { 432 if (tokenizer.hasMoreTokens()) { 433 name = tokenizer.nextToken(); 434 createTopic(name); 435 } 436 } 437 else if (firstToken.equalsIgnoreCase("User")) { 438 if (tokenizer.hasMoreTokens()) 439 name = tokenizer.nextToken(); 440 if (tokenizer.hasMoreTokens()) { 441 String password = tokenizer.nextToken(); 442 createUser(name, password); 443 } 444 else 445 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 446 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 447 " - Missing password for user [" + name + "]"); 448 } 449 else if (firstToken.equalsIgnoreCase("CF")) { 450 if (tokenizer.hasMoreTokens()) { 451 name = tokenizer.nextToken(); 452 createCF(name); 453 } 454 } 455 else if (firstToken.equalsIgnoreCase("QCF")) { 456 if (tokenizer.hasMoreTokens()) { 457 name = tokenizer.nextToken(); 458 createQCF(name); 459 } 460 } 461 else if (firstToken.equalsIgnoreCase("TCF")) { 462 if (tokenizer.hasMoreTokens()) { 463 name = tokenizer.nextToken(); 464 createTCF(name); 465 } 466 } 467 } 468 } 469 } 470 catch (IOException exc) { 472 } catch (AdminException exc) { 474 AdapterTracing.dbgAdapter.log(BasicLevel.ERROR, 475 "Creation failed",exc); 476 } 477 } 478 } 479 catch (java.io.FileNotFoundException fnfe) { 481 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 482 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 483 " - No administration task requested."); 484 } 485 486 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 487 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 488 "Server port is " + serverPort); 489 490 started = true; 491 492 try { 494 jmxServer.registerMBean(this, 495 "joramClient", 496 "type=JoramAdapter,version=" + 497 ConnectionMetaData.providerVersion); 498 } catch (Exception e) { 499 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 500 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 501 " - Could not register JoramAdapterMBean", 502 e); 503 } 504 505 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 506 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 507 "JORAM adapter " + 508 ConnectionMetaData.providerVersion + 509 " successfully deployed."); 510 } 511 512 516 public synchronized void stop() 517 { 518 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 519 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 520 "JORAM adapter stopping..."); 521 522 if (! started || stopped) 523 return; 524 525 while (! boundNames.isEmpty()) 527 unbind((String ) boundNames.remove(0)); 528 529 joramAdmin.getPlatformAdmin().disconnect(); 531 532 while (! producers.isEmpty()) { 534 try { 535 ((ManagedConnectionImpl) producers.remove(0)).destroy(); 536 } 537 catch (Exception exc) {} 538 } 539 540 for (Enumeration keys = consumers.keys(); keys.hasMoreElements();) 542 ((InboundConsumer) consumers.get(keys.nextElement())).close(); 543 544 if (connections != null) { 546 for (Enumeration keys = connections.keys(); keys.hasMoreElements();) { 547 try { 548 ((XAConnection ) connections.get(keys.nextElement())).close(); 549 } 550 catch (Exception exc) {} 551 } 552 } 553 554 if (collocated) { 556 try { 557 AgentServer.stop(); 558 } 559 catch (Exception exc) {} 560 } 561 562 stopped = true; 563 564 try { 565 jmxServer.unregisterMBean("joramClient", 566 "type=JoramAdapter,version=" + 567 ConnectionMetaData.providerVersion); 568 } catch (Exception e) { 569 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 570 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 571 "unregisterMBean", 572 e); 573 } 574 575 576 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 577 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 578 "JORAM adapter successfully stopped."); 579 } 580 581 593 public void endpointActivation(MessageEndpointFactory endpointFactory, 594 ActivationSpec spec) 595 throws ResourceException { 596 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 597 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 598 this + " endpointActivation(" + endpointFactory + 599 ", " + spec + ")"); 600 601 if (! started) 602 throw new IllegalStateException ("Non started resource adapter."); 603 if (stopped) 604 throw new IllegalStateException ("Stopped resource adapter."); 605 606 if (! (spec instanceof ActivationSpecImpl)) 607 throw new ResourceException ("Provided ActivationSpec instance is not " 608 + "a JORAM activation spec."); 609 610 ActivationSpecImpl specImpl = (ActivationSpecImpl) spec; 611 612 if (! specImpl.getResourceAdapter().equals(this)) 613 throw new ResourceException ("Supplied ActivationSpec instance " 614 + "associated to an other ResourceAdapter."); 615 616 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 617 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 618 "Activating Endpoint on JORAM adapter."); 619 620 boolean durable = 621 specImpl.getSubscriptionDurability() != null 622 && specImpl.getSubscriptionDurability().equalsIgnoreCase("Durable"); 623 624 boolean transacted = false; 625 try { 626 Class listenerClass = Class.forName("javax.jms.MessageListener"); 627 Class [] parameters = { Class.forName("javax.jms.Message") }; 628 Method meth = listenerClass.getMethod("onMessage", parameters); 629 transacted = endpointFactory.isDeliveryTransacted(meth); 630 } 631 catch (Exception exc) { 632 throw new ResourceException ("Could not determine transactional " 633 + "context: " + exc); 634 } 635 636 int maxWorks = 10; 637 try { 638 maxWorks = Integer.parseInt(specImpl.getMaxNumberOfWorks()); 639 } catch (Exception exc) { 640 throw new ResourceException ("Invalid max number of works instances " 641 + "number: " + exc); 642 } 643 644 int maxMessages = 10; 645 try { 646 maxMessages = Integer.parseInt(specImpl.getMaxMessages()); 647 } catch (Exception exc) { 648 throw new ResourceException ("Invalid max messages " 649 + "number: " + exc); 650 } 651 652 int ackMode; 653 try { 654 if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl 655 .getAcknowledgeMode())) { 656 ackMode = Session.AUTO_ACKNOWLEDGE; 657 } else if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl 658 .getAcknowledgeMode())) { 659 ackMode = Session.DUPS_OK_ACKNOWLEDGE; 660 } else { 661 ackMode = Session.AUTO_ACKNOWLEDGE; 662 } 663 } catch (Exception exc) { 664 throw new ResourceException ("Invalid acknowledge mode: " + exc); 665 } 666 667 String destType = specImpl.getDestinationType(); 668 String destName = specImpl.getDestination(); 669 670 try { 671 Destination dest; 672 673 if (destType.equals("javax.jms.Queue")) 674 dest = createQueue(destName); 675 else if (destType.equals("javax.jms.Topic")) 676 dest = createTopic(destName); 677 else 678 throw new NotSupportedException ("Invalid destination type provided " 679 + "as activation parameter: " 680 + destType); 681 682 String userName = specImpl.getUserName(); 683 String password = specImpl.getPassword(); 684 685 createUser(userName, password); 686 687 XAConnectionFactory connectionFactory = null; 688 689 if (isHa) { 690 if (collocated) 691 connectionFactory = XAHALocalConnectionFactory.create(); 692 else { 693 String urlHa = "hajoram://" + hostName + ":" + serverPort; 694 connectionFactory = XAHATcpConnectionFactory.create(urlHa); 695 } 696 } else { 697 698 if (collocated) 699 connectionFactory = XALocalConnectionFactory.create(); 700 else 701 connectionFactory = XATcpConnectionFactory.create(hostName, serverPort); 702 } 703 704 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().connectingTimer = connectingTimer; 705 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().cnxPendingTimer = cnxPendingTimer; 706 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().txPendingTimer = txPendingTimer; 707 708 if (queueMessageReadMax > 0) { 709 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 710 .getParameters().queueMessageReadMax = queueMessageReadMax; 711 } 712 713 if (topicAckBufferMax > 0) { 714 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 715 .getParameters().topicAckBufferMax = topicAckBufferMax; 716 } 717 718 if (topicPassivationThreshold > 0) { 719 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 720 .getParameters().topicPassivationThreshold = topicPassivationThreshold; 721 } 722 723 if (topicActivationThreshold > 0) { 724 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 725 .getParameters().topicActivationThreshold = topicActivationThreshold; 726 } 727 728 XAConnection cnx = 729 connectionFactory.createXAConnection(userName, password); 730 731 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 732 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 733 this + " endpointActivation cnx = " + cnx); 734 735 InboundConsumer consumer = 737 new InboundConsumer(workManager, 738 endpointFactory, 739 cnx, 740 dest, 741 specImpl.getMessageSelector(), 742 durable, 743 specImpl.getSubscriptionName(), 744 transacted, 745 maxWorks, 746 maxMessages, 747 ackMode, 748 deleteDurableSubscription); 749 750 consumers.put(specImpl, consumer); 751 } 752 catch (javax.jms.JMSSecurityException exc) { 753 throw new SecurityException ("Invalid user identification: " + exc); 754 } 755 catch (javax.jms.JMSException exc) { 756 throw new CommException ("Could not connect to the JORAM server: " 757 + exc); 758 } 759 catch (AdminException exc) { 760 throw new ResourceException ("Problem when handling the JORAM " 761 + "destinations: " + exc); 762 } 763 } 764 765 768 public void endpointDeactivation(MessageEndpointFactory endpointFactory, 769 ActivationSpec spec) { 770 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 771 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 772 this + " endpointDeactivation(" + endpointFactory + ", " + spec + ")"); 773 &
|