1 25 package org.objectweb.joram.client.connector; 26 27 import fr.dyade.aaa.agent.AgentServer; 28 import com.scalagent.jmx.JMXServer; 29 import org.objectweb.joram.client.jms.Queue; 30 import org.objectweb.joram.client.jms.Topic; 31 import org.objectweb.joram.client.jms.admin.AdminException; 32 import org.objectweb.joram.client.jms.admin.JoramAdmin; 33 import org.objectweb.joram.client.jms.admin.User; 34 import org.objectweb.joram.client.jms.admin.DeadMQueue; 35 import org.objectweb.joram.client.jms.ha.local.XAHALocalConnectionFactory; 36 import org.objectweb.joram.client.jms.ha.tcp.XAHATcpConnectionFactory; 37 import org.objectweb.joram.client.jms.ha.local.TopicHALocalConnectionFactory; 38 import org.objectweb.joram.client.jms.ha.tcp.TopicHATcpConnectionFactory; 39 40 import org.objectweb.joram.client.jms.local.TopicLocalConnectionFactory; 41 import org.objectweb.joram.client.jms.local.XALocalConnectionFactory; 42 import org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory; 43 import org.objectweb.joram.client.jms.tcp.XATcpConnectionFactory; 44 import org.objectweb.joram.client.jms.ConnectionMetaData; 45 46 import java.io.BufferedReader ; 47 import java.io.File ; 48 import java.io.FileReader ; 49 import java.io.IOException ; 50 import java.lang.reflect.Method ; 51 import java.net.ConnectException ; 52 import java.util.Enumeration ; 53 import java.util.Hashtable ; 54 import java.util.List ; 55 import java.util.Properties ; 56 import java.util.StringTokenizer ; 57 import java.util.Vector ; 58 59 import javax.jms.Destination ; 60 import javax.jms.Session ; 61 import javax.jms.TopicConnectionFactory ; 62 import javax.jms.XAConnection ; 63 import javax.jms.XAConnectionFactory ; 64 import javax.management.MBeanServer ; 65 import javax.management.MBeanServerFactory ; 66 import javax.management.ObjectName ; 67 import javax.naming.Context ; 68 import javax.naming.InitialContext ; 69 import javax.resource.NotSupportedException ; 70 import javax.resource.ResourceException ; 71 import javax.resource.spi.ActivationSpec ; 72 import javax.resource.spi.BootstrapContext ; 73 import javax.resource.spi.CommException ; 74 import javax.resource.spi.IllegalStateException ; 75 import javax.resource.spi.ResourceAdapterInternalException ; 76 import javax.resource.spi.endpoint.MessageEndpointFactory ; 77 import javax.resource.spi.work.WorkManager ; 78 import javax.transaction.xa.XAResource ; 79 80 import org.objectweb.util.monolog.api.BasicLevel; 81 82 88 public class JoramAdapter 89 implements javax.resource.spi.ResourceAdapter , 90 java.io.Serializable , JoramAdapterMBean { 91 92 private transient WorkManager workManager; 93 94 101 private transient Hashtable consumers; 102 106 private transient Vector producers; 107 114 private transient Hashtable connections; 115 116 117 private boolean started = false; 118 119 private boolean stopped = false; 120 121 122 boolean collocated = false; 123 124 125 boolean isHa = false; 126 127 128 String hostName = "localhost"; 129 130 int serverPort = 16010; 131 132 133 short serverId = 0; 134 135 136 short clusterId = AgentServer.NULL_ID; 137 138 139 List platformServersIds = null; 140 141 146 private String platformConfigDir; 147 148 private boolean persistentPlatform = false; 149 153 private String adminFile = "joram-admin.cfg"; 154 private String adminFileXML = "joramAdmin.xml"; 155 156 157 160 private String adminFileExportXML = "joramAdminExport.xml"; 161 162 163 private String serverName = "s0"; 164 165 166 private static Vector boundNames = new Vector (); 167 168 private static ObjectName jmsResourceON; 169 170 private static MBeanServer mbs = null; 171 172 177 public int connectingTimer = 0; 178 183 public int txPendingTimer = 0; 184 190 public int cnxPendingTimer = 0; 191 192 199 public int queueMessageReadMax = 2; 200 201 207 public int topicAckBufferMax = 0; 208 209 215 public int topicPassivationThreshold = Integer.MAX_VALUE; 216 217 223 public int topicActivationThreshold = 0; 224 225 230 public boolean asyncSend = false; 231 232 239 public boolean multiThreadSync = false; 240 241 248 public int multiThreadSyncDelay = 1; 249 250 255 public boolean deleteDurableSubscription = false; 256 257 public JMXServer jmxServer; 258 259 private transient JoramAdmin joramAdmin; 260 261 264 public JoramAdapter() { 265 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 266 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 267 "JORAM adapter instantiated."); 268 269 consumers = new Hashtable (); 270 producers = new Vector (); 271 272 java.util.ArrayList array = MBeanServerFactory.findMBeanServer(null); 273 if (!array.isEmpty()) 274 mbs = (MBeanServer ) array.get(0); 275 jmxServer = new JMXServer(mbs,"JoramAdapter"); 276 } 277 278 285 public synchronized void start(BootstrapContext ctx) 286 throws ResourceAdapterInternalException 287 { 288 joramAdmin.setHa(isHa); 290 291 if (started) 292 throw new ResourceAdapterInternalException ("Adapter already started."); 293 if (stopped) 294 throw new ResourceAdapterInternalException ("Adapter has been stopped."); 295 296 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 297 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 298 "JORAM adapter starting deployment..."); 299 300 workManager = ctx.getWorkManager(); 301 302 if (collocated) { 304 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 305 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 306 " - Collocated JORAM server is starting..."); 307 308 if (persistentPlatform) { 309 System.setProperty("Transaction", "fr.dyade.aaa.util.NTransaction"); 310 System.setProperty("NTNoLockFile", "true"); 311 } else { 312 System.setProperty("Transaction", "fr.dyade.aaa.util.NullTransaction"); 313 System.setProperty("NbMaxAgents", "" + Integer.MAX_VALUE); 314 } 315 316 if (platformConfigDir != null) { 317 System.setProperty("fr.dyade.aaa.agent.A3CONF_DIR", platformConfigDir); 318 System.setProperty("fr.dyade.aaa.DEBUG_DIR", platformConfigDir); 319 } 320 321 try { 322 AgentServer.init(serverId, serverName, null, clusterId); 323 AgentServer.start(); 324 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 325 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 326 " - Collocated JORAM server has successfully started."); 327 } catch (Exception exc) { 328 AgentServer.stop(); 329 AgentServer.reset(true); 330 331 throw new ResourceAdapterInternalException ("Could not start " 332 + "collocated JORAM " 333 + " instance: " + exc); 334 } 335 } 336 337 try { 339 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 340 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 341 " - Reading the provided admin file: " + adminFileXML); 342 JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileXML); 343 } catch (Exception exc) { 344 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 345 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 346 "JORAM ADMIN XML not found."); 347 } 348 349 try { 351 adminConnect(); 352 serverId = (short) joramAdmin.getPlatformAdmin().getLocalServerId(); 353 } catch (Exception exc) { 354 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 355 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 356 " - JORAM server not administerable: " + exc); 357 } 358 359 if (joramAdmin != null) { 361 joramAdmin.setAdminFileExportXML(adminFileExportXML); 362 363 try { 364 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 365 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 366 " - Reading the provided admin file: " + adminFileExportXML); 367 JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileExportXML); 368 369 adminConnect(); 371 } catch (Exception exc) { 372 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 373 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 374 adminFileExportXML + " not found."); 375 } 376 } 377 378 try { 380 File file = null; 381 382 try { 383 if (platformConfigDir == null) { 384 java.net.URL url = ClassLoader.getSystemResource(adminFile); 385 file = new File (url.getFile()); 386 } 387 else 388 file = new File (platformConfigDir, adminFile); 389 } catch (NullPointerException e) { 390 throw new java.io.FileNotFoundException (); 391 } 392 393 FileReader fileReader = new FileReader (file); 394 BufferedReader reader = new BufferedReader (fileReader); 395 396 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 397 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 398 " - Reading the provided admin file: " + file); 399 400 boolean end = false; 401 String line; 402 StringTokenizer tokenizer; 403 String firstToken; 404 String name = null; 405 406 while (! end) { 407 try { 408 line = reader.readLine(); 409 410 if (line == null) 411 end = true; 412 else { 413 tokenizer = new StringTokenizer (line); 414 415 if (tokenizer.hasMoreTokens()) { 416 firstToken = tokenizer.nextToken(); 417 if (firstToken.equalsIgnoreCase("Host")) { 418 if (tokenizer.hasMoreTokens()) 419 hostName = tokenizer.nextToken(); 420 } 421 else if (firstToken.equalsIgnoreCase("Port")) { 422 if (tokenizer.hasMoreTokens()) 423 serverPort = Integer.parseInt(tokenizer.nextToken()); 424 } 425 else if (firstToken.equalsIgnoreCase("Queue")) { 426 if (tokenizer.hasMoreTokens()) { 427 name = tokenizer.nextToken(); 428 createQueue(name); 429 } 430 } 431 else if (firstToken.equalsIgnoreCase("Topic")) { 432 if (tokenizer.hasMoreTokens()) { 433 name = tokenizer.nextToken(); 434 createTopic(name); 435 } 436 } 437 else if (firstToken.equalsIgnoreCase("User")) { 438 if (tokenizer.hasMoreTokens()) 439 name = tokenizer.nextToken(); 440 if (tokenizer.hasMoreTokens()) { 441 String password = tokenizer.nextToken(); 442 createUser(name, password); 443 } 444 else 445 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 446 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 447 " - Missing password for user [" + name + "]"); 448 } 449 else if (firstToken.equalsIgnoreCase("CF")) { 450 if (tokenizer.hasMoreTokens()) { 451 name = tokenizer.nextToken(); 452 createCF(name); 453 } 454 } 455 else if (firstToken.equalsIgnoreCase("QCF")) { 456 if (tokenizer.hasMoreTokens()) { 457 name = tokenizer.nextToken(); 458 createQCF(name); 459 } 460 } 461 else if (firstToken.equalsIgnoreCase("TCF")) { 462 if (tokenizer.hasMoreTokens()) { 463 name = tokenizer.nextToken(); 464 createTCF(name); 465 } 466 } 467 } 468 } 469 } 470 catch (IOException exc) { 472 } catch (AdminException exc) { 474 AdapterTracing.dbgAdapter.log(BasicLevel.ERROR, 475 "Creation failed",exc); 476 } 477 } 478 } 479 catch (java.io.FileNotFoundException fnfe) { 481 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 482 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 483 " - No administration task requested."); 484 } 485 486 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 487 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 488 "Server port is " + serverPort); 489 490 started = true; 491 492 try { 494 jmxServer.registerMBean(this, 495 "joramClient", 496 "type=JoramAdapter,version=" + 497 ConnectionMetaData.providerVersion); 498 } catch (Exception e) { 499 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 500 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 501 " - Could not register JoramAdapterMBean", 502 e); 503 } 504 505 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 506 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 507 "JORAM adapter " + 508 ConnectionMetaData.providerVersion + 509 " successfully deployed."); 510 } 511 512 516 public synchronized void stop() 517 { 518 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 519 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 520 "JORAM adapter stopping..."); 521 522 if (! started || stopped) 523 return; 524 525 while (! boundNames.isEmpty()) 527 unbind((String ) boundNames.remove(0)); 528 529 joramAdmin.getPlatformAdmin().disconnect(); 531 532 while (! producers.isEmpty()) { 534 try { 535 ((ManagedConnectionImpl) producers.remove(0)).destroy(); 536 } 537 catch (Exception exc) {} 538 } 539 540 for (Enumeration keys = consumers.keys(); keys.hasMoreElements();) 542 ((InboundConsumer) consumers.get(keys.nextElement())).close(); 543 544 if (connections != null) { 546 for (Enumeration keys = connections.keys(); keys.hasMoreElements();) { 547 try { 548 ((XAConnection ) connections.get(keys.nextElement())).close(); 549 } 550 catch (Exception exc) {} 551 } 552 } 553 554 if (collocated) { 556 try { 557 AgentServer.stop(); 558 } 559 catch (Exception exc) {} 560 } 561 562 stopped = true; 563 564 try { 565 jmxServer.unregisterMBean("joramClient", 566 "type=JoramAdapter,version=" + 567 ConnectionMetaData.providerVersion); 568 } catch (Exception e) { 569 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 570 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 571 "unregisterMBean", 572 e); 573 } 574 575 576 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 577 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 578 "JORAM adapter successfully stopped."); 579 } 580 581 593 public void endpointActivation(MessageEndpointFactory endpointFactory, 594 ActivationSpec spec) 595 throws ResourceException { 596 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 597 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 598 this + " endpointActivation(" + endpointFactory + 599 ", " + spec + ")"); 600 601 if (! started) 602 throw new IllegalStateException ("Non started resource adapter."); 603 if (stopped) 604 throw new IllegalStateException ("Stopped resource adapter."); 605 606 if (! (spec instanceof ActivationSpecImpl)) 607 throw new ResourceException ("Provided ActivationSpec instance is not " 608 + "a JORAM activation spec."); 609 610 ActivationSpecImpl specImpl = (ActivationSpecImpl) spec; 611 612 if (! specImpl.getResourceAdapter().equals(this)) 613 throw new ResourceException ("Supplied ActivationSpec instance " 614 + "associated to an other ResourceAdapter."); 615 616 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 617 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 618 "Activating Endpoint on JORAM adapter."); 619 620 boolean durable = 621 specImpl.getSubscriptionDurability() != null 622 && specImpl.getSubscriptionDurability().equalsIgnoreCase("Durable"); 623 624 boolean transacted = false; 625 try { 626 Class listenerClass = Class.forName("javax.jms.MessageListener"); 627 Class [] parameters = { Class.forName("javax.jms.Message") }; 628 Method meth = listenerClass.getMethod("onMessage", parameters); 629 transacted = endpointFactory.isDeliveryTransacted(meth); 630 } 631 catch (Exception exc) { 632 throw new ResourceException ("Could not determine transactional " 633 + "context: " + exc); 634 } 635 636 int maxWorks = 10; 637 try { 638 maxWorks = Integer.parseInt(specImpl.getMaxNumberOfWorks()); 639 } catch (Exception exc) { 640 throw new ResourceException ("Invalid max number of works instances " 641 + "number: " + exc); 642 } 643 644 int maxMessages = 10; 645 try { 646 maxMessages = Integer.parseInt(specImpl.getMaxMessages()); 647 } catch (Exception exc) { 648 throw new ResourceException ("Invalid max messages " 649 + "number: " + exc); 650 } 651 652 int ackMode; 653 try { 654 if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl 655 .getAcknowledgeMode())) { 656 ackMode = Session.AUTO_ACKNOWLEDGE; 657 } else if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl 658 .getAcknowledgeMode())) { 659 ackMode = Session.DUPS_OK_ACKNOWLEDGE; 660 } else { 661 ackMode = Session.AUTO_ACKNOWLEDGE; 662 } 663 } catch (Exception exc) { 664 throw new ResourceException ("Invalid acknowledge mode: " + exc); 665 } 666 667 String destType = specImpl.getDestinationType(); 668 String destName = specImpl.getDestination(); 669 670 try { 671 Destination dest; 672 673 if (destType.equals("javax.jms.Queue")) 674 dest = createQueue(destName); 675 else if (destType.equals("javax.jms.Topic")) 676 dest = createTopic(destName); 677 else 678 throw new NotSupportedException ("Invalid destination type provided " 679 + "as activation parameter: " 680 + destType); 681 682 String userName = specImpl.getUserName(); 683 String password = specImpl.getPassword(); 684 685 createUser(userName, password); 686 687 XAConnectionFactory connectionFactory = null; 688 689 if (isHa) { 690 if (collocated) 691 connectionFactory = XAHALocalConnectionFactory.create(); 692 else { 693 String urlHa = "hajoram://" + hostName + ":" + serverPort; 694 connectionFactory = XAHATcpConnectionFactory.create(urlHa); 695 } 696 } else { 697 698 if (collocated) 699 connectionFactory = XALocalConnectionFactory.create(); 700 else 701 connectionFactory = XATcpConnectionFactory.create(hostName, serverPort); 702 } 703 704 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().connectingTimer = connectingTimer; 705 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().cnxPendingTimer = cnxPendingTimer; 706 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().txPendingTimer = txPendingTimer; 707 708 if (queueMessageReadMax > 0) { 709 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 710 .getParameters().queueMessageReadMax = queueMessageReadMax; 711 } 712 713 if (topicAckBufferMax > 0) { 714 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 715 .getParameters().topicAckBufferMax = topicAckBufferMax; 716 } 717 718 if (topicPassivationThreshold > 0) { 719 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 720 .getParameters().topicPassivationThreshold = topicPassivationThreshold; 721 } 722 723 if (topicActivationThreshold > 0) { 724 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) 725 .getParameters().topicActivationThreshold = topicActivationThreshold; 726 } 727 728 XAConnection cnx = 729 connectionFactory.createXAConnection(userName, password); 730 731 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 732 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 733 this + " endpointActivation cnx = " + cnx); 734 735 InboundConsumer consumer = 737 new InboundConsumer(workManager, 738 endpointFactory, 739 cnx, 740 dest, 741 specImpl.getMessageSelector(), 742 durable, 743 specImpl.getSubscriptionName(), 744 transacted, 745 maxWorks, 746 maxMessages, 747 ackMode, 748 deleteDurableSubscription); 749 750 consumers.put(specImpl, consumer); 751 } 752 catch (javax.jms.JMSSecurityException exc) { 753 throw new SecurityException ("Invalid user identification: " + exc); 754 } 755 catch (javax.jms.JMSException exc) { 756 throw new CommException ("Could not connect to the JORAM server: " 757 + exc); 758 } 759 catch (AdminException exc) { 760 throw new ResourceException ("Problem when handling the JORAM " 761 + "destinations: " + exc); 762 } 763 } 764 765 768 public void endpointDeactivation(MessageEndpointFactory endpointFactory, 769 ActivationSpec spec) { 770 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 771 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 772 this + " endpointDeactivation(" + endpointFactory + ", " + spec + ")"); 773 if (! started || stopped) 774 return; 775 776 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 777 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 778 "Deactivating Endpoint on JORAM adapter."); 779 780 ((InboundConsumer) consumers.remove(spec)).close(); 781 } 782 783 794 public XAResource [] getXAResources(ActivationSpec [] specs) 795 throws ResourceException { 796 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 797 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 798 this + " getXAResources(" + specs + ")"); 799 800 if (! started) 801 throw new IllegalStateException ("Non started resource adapter."); 802 if (stopped) 803 throw new IllegalStateException ("Stopped resource adapter."); 804 805 ActivationSpecImpl specImpl; 806 String userName; 807 String password; 808 XAConnectionFactory connectionFactory = null; 809 XAConnection connection; 810 Vector resources = new Vector (); 811 812 if (connections == null) 813 connections = new Hashtable (); 814 815 try { 816 for (int i = 0; i < specs.length; i++) { 817 if (! (specs[i] instanceof ActivationSpecImpl)) 818 throw new ResourceException ("Provided ActivationSpec instance is " 819 + "not a JORAM activation spec."); 820 821 specImpl = (ActivationSpecImpl) specs[i]; 822 823 if (! specImpl.getResourceAdapter().equals(this)) 824 throw new ResourceException ("Supplied ActivationSpec instance " 825 + "associated to an other " 826 + "ResourceAdapter."); 827 828 userName = specImpl.getUserName(); 829 830 if (! connections.containsKey(userName)) { 832 password = specImpl.getPassword(); 833 834 if (isHa) { 835 if (collocated) 836 connectionFactory = XAHALocalConnectionFactory.create(); 837 else { 838 String urlHa = "hajoram://" + hostName + ":" + serverPort; 839 connectionFactory = XAHATcpConnectionFactory.create(urlHa); 840 } 841 } else { 842 if (collocated) 843 connectionFactory = XALocalConnectionFactory.create(); 844 else 845 connectionFactory = XATcpConnectionFactory.create(hostName, serverPort); 846 } 847 848 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().connectingTimer = connectingTimer; 849 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().cnxPendingTimer = cnxPendingTimer; 850 ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().txPendingTimer = txPendingTimer; 851 852 connection = 853 connectionFactory.createXAConnection(userName, password); 854 855 connections.put(userName, connection); 856 857 resources.add(connection.createXASession().getXAResource()); 858 } 859 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 860 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 861 this + " getXAResources resources = " + resources); 862 } 863 } 864 catch (javax.jms.JMSSecurityException exc) { 865 throw new SecurityException ("Invalid user identification: " + exc); 866 } 867 catch (javax.jms.JMSException exc) { 868 throw new CommException ("Could not connect to the JORAM server: " 869 + exc); 870 } 871 872 return (XAResource []) resources.toArray(new XAResource [resources.size()]); 873 } 874 875 876 877 public int hashCode() 878 { 879 return (collocated + " " + hostName + " " + serverPort).hashCode(); 880 } 881 882 883 public boolean equals(Object o) 884 { 885 if (! (o instanceof JoramAdapter)) 886 return false; 887 888 JoramAdapter other = (JoramAdapter) o; 889 890 boolean res = 891 collocated == other.collocated 892 && hostName.equals(other.hostName) 893 && serverPort == other.serverPort; 894 895 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 896 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 897 this + " equals = " + res); 898 return res; 899 } 900 901 public List getDestinations() { 902 return joramAdmin.getDestinations(); 903 } 904 905 public List getDestinations(int serverId) { 906 return joramAdmin.getDestinations(serverId); 907 } 908 909 public List getUsers() { 910 return joramAdmin.getUsers(); 911 } 912 913 public List getUsers(int serverId) { 914 return joramAdmin.getUsers(serverId); 915 } 916 917 public List getPlatformServersIds() { 918 return joramAdmin.getPlatformAdmin().getServersIds(); 919 } 920 921 public List getLocalUsers() { 922 return joramAdmin.getUsers(serverId); 923 } 924 925 public void setDefaultDMQ(int serverId, DeadMQueue dmq) 926 throws ConnectException , AdminException { 927 joramAdmin.setDefaultDMQ(serverId,dmq); 928 } 929 930 public DeadMQueue getDefaultDMQ(int serverId) 931 throws ConnectException , AdminException { 932 return joramAdmin.getDefaultDMQ(serverId); 933 } 934 935 public DeadMQueue getDefaultDMQ() 936 throws ConnectException , AdminException { 937 return joramAdmin.getDefaultDMQ(); 938 } 939 940 public void exit() { 941 joramAdmin.exit(); 942 } 943 944 947 public void setTimeOutToAbortRequest(long timeOut) { 948 joramAdmin.setTimeOutToAbortRequest(timeOut); 949 } 950 951 954 public long getTimeOutToAbortRequest() { 955 return joramAdmin.getTimeOutToAbortRequest(); 956 } 957 958 963 public void createUser(String name, String password) 964 throws AdminException { 965 try { 966 User.create(name, password); 967 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 968 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 969 " - User [" + name + "] has been created."); 970 } 971 catch (ConnectException exc) { 972 throw new AdminException("createUser() failed: admin connection " 973 + "has been lost."); 974 } 975 } 976 977 982 public void createUser(String name, String password, int serverId) 983 throws AdminException { 984 try { 985 User.create(name, password,serverId); 986 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 987 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 988 " - User [" + name + "] has been created."); 989 } 990 catch (ConnectException exc) { 991 throw new AdminException("createUser() failed: admin connection " 992 + "has been lost."); 993 } 994 } 995 996 999 public void createCF(String name) { 1000 ManagedConnectionFactoryImpl mcf = 1001 new ManagedConnectionFactoryImpl(); 1002 1003 try { 1004 mcf.setResourceAdapter(this); 1005 mcf.setCollocated(new Boolean (false)); 1006 1007 Object factory = mcf.createConnectionFactory(); 1008 bind(name, factory); 1009 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 1010 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 1011 " - ConnectionFactory [" + name 1012 + "] has been created and bound."); 1013 } catch (Exception exc) {} 1014 } 1015 1016 1019 public void createQCF(String name) { 1020 ManagedConnectionFactoryImpl mcf = 1021 new ManagedQueueConnectionFactoryImpl(); 1022 1023 try { 1024 mcf.setResourceAdapter(this); 1025 mcf.setCollocated(new Boolean (false)); 1026 1027 Object factory = mcf.createConnectionFactory(); 1028 bind(name, factory); 1029 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 1030 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 1031 " - QueueConnectionFactory [" + name 1032 + "] has been created and bound."); 1033 } catch (Exception exc) {} 1034 } 1035 1036 1039 public void createTCF(String name) { 1040 ManagedConnectionFactoryImpl mcf = 1041 new ManagedTopicConnectionFactoryImpl(); 1042 1043 try { 1044 mcf.setResourceAdapter(this); 1045 mcf.setCollocated(new Boolean (false)); 1046 1047 Object factory = mcf.createConnectionFactory(); 1048 bind(name, factory); 1049 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 1050 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 1051 " - TopicConnectionFactory [" + name 1052 + "] has been created and bound."); 1053 } catch (Exception exc) {} 1054 } 1055 1056 1061 void adminConnect() throws AdminException 1062 { 1063 try { 1064 TopicConnectionFactory factory; 1065 1066 if (isHa) { 1067 if (collocated) 1068 factory = TopicHALocalConnectionFactory.create(); 1069 else { 1070 String urlHa = "hajoram://" + hostName + ":" + serverPort; 1071 factory = TopicHATcpConnectionFactory.create(urlHa); 1072 } 1073 } else { 1074 if (collocated) 1075 factory = TopicLocalConnectionFactory.create(); 1076 else 1077 factory = TopicTcpConnectionFactory.create(hostName, serverPort); 1078 } 1079 1080 ((org.objectweb.joram.client.jms.ConnectionFactory) factory) 1081 .getParameters().connectingTimer = 60; 1082 1083 joramAdmin = new JoramAdmin(factory, "root", "root"); 1084 } 1085 catch (ConnectException exc) { 1086 throw new AdminException("Admin connection can't be established: " 1087 + exc.getMessage()); 1088 } 1089 } 1090 1091 1092 void addProducer(ManagedConnectionImpl managedCx) 1093 { 1094 producers.add(managedCx); 1095 } 1096 1097 1098 void removeProducer(ManagedConnectionImpl managedCx) 1099 { 1100 producers.remove(managedCx); 1101 } 1102 1103 1104 private static String removePrefix(String name) { 1105 String PREFIX_NAME = "scn:comp/"; 1106 try { 1107 if (name.startsWith(PREFIX_NAME)) 1108 return name.substring(PREFIX_NAME.length()); 1109 else 1110 return name; 1111 } catch (Exception e) { 1112 return name; 1113 } 1114 } 1115 1116 1127 public Destination createQueue(int serverId, 1128 String name, 1129 String className, 1130 Properties prop) 1131 throws AdminException { 1132 try { 1133 Context ctx = new InitialContext (); 1134 return (Destination ) ctx.lookup(name); 1135 } catch (javax.naming.NamingException exc) { 1136 try { 1137 String shortName = removePrefix(name); 1138 Queue queue = Queue.create(serverId, 1139 shortName, 1140 className, 1141 prop); 1142 queue.setFreeReading(); 1143 queue.setFreeWriting(); 1144 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 1145 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 1146 " - Queue [" + shortName + "] has been created."); 1147 bind(name, queue); 1148 return queue; 1149 } catch (ConnectException exc2) { 1150 throw new AdminException("createQueue() failed: admin connection " 1151 + "has been lost."); 1152 } 1153 } 1154 } 1155 1156 1165 public Destination createQueue(int serverId, String name) 1166 throws AdminException { 1167 return createQueue(serverId, 1168 name, 1169 "org.objectweb.joram.mom.dest.Queue", 1170 null); 1171 } 1172 1173 1181 public Destination createQueue(String name) 1182 throws AdminException { 1183 try { 1184 return createQueue(joramAdmin.getPlatformAdmin().getLocalServerId(), 1185 name, 1186 "org.objectweb.joram.mom.dest.Queue", 1187 null); 1188 } catch (ConnectException exc2) { 1189 throw new AdminException("createQueue() failed: admin connection " 1190 + "has been lost."); 1191 } 1192 } 1193 1194 1205 public Destination createTopic(int serverId, 1206 String name, 1207 String className, 1208 Properties prop) 1209 throws AdminException { 1210 try { 1211 Context ctx = new InitialContext (); 1212 return (Destination ) ctx.lookup(name); 1213 } catch (javax.naming.NamingException exc) { 1214 try { 1215 String shortName = removePrefix(name); 1216 Topic topic = Topic.create(serverId, 1217 shortName, 1218 className, 1219 prop); 1220 topic.setFreeReading(); 1221 topic.setFreeWriting(); 1222 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) 1223 AdapterTracing.dbgAdapter.log(BasicLevel.INFO, 1224 " - Topic [" + shortName + "] has been created."); 1225 bind(name, topic); 1226 return topic; 1227 } catch (ConnectException exc2) { 1228 throw new AdminException("createTopic() failed: admin connection " 1229 + "has been lost."); 1230 } 1231 } 1232 } 1233 1234 1243 public Destination createTopic(int serverId, String name) 1244 throws AdminException { 1245 return createTopic(serverId, 1246 name, 1247 "org.objectweb.joram.mom.dest.Topic", 1248 null); 1249 } 1250 1251 1259 public Destination createTopic(String name) 1260 throws AdminException { 1261 try { 1262 return createTopic(joramAdmin.getPlatformAdmin().getLocalServerId(), 1263 name, 1264 "org.objectweb.joram.mom.dest.Topic", 1265 null); 1266 } catch (ConnectException exc2) { 1267 throw new AdminException("createTopic() failed: admin connection " 1268 + "has been lost."); 1269 } 1270 } 1271 1272 1277 public void removeDestination(String name) 1278 throws AdminException { 1279 try { 1280 Context ctx = new InitialContext (); 1281 Destination dest = (Destination ) ctx.lookup(name); 1282 ctx.close(); 1283 1284 if (dest instanceof org.objectweb.joram.client.jms.Destination) 1285 ((org.objectweb.joram.client.jms.Destination) dest).delete(); 1286 unbind(name); 1287 } catch (Exception exc) { 1288 throw new AdminException("removeDestination(" + name + 1289 ") failed: use Destination.delete()"); 1290 } 1291 } 1292 1293 1294 void bind(String name, Object obj) { 1295 try { 1296 Context ctx = new InitialContext (); 1297 ctx.rebind(name, obj); 1298 if (! boundNames.contains(name)) 1299 boundNames.add(name); 1300 } catch (Exception e) { 1301 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) 1302 AdapterTracing.dbgAdapter.log(BasicLevel.WARN, 1303 "Binding failed: bind(" + name +"," + obj +")", e); 1304 } 1305 } 1306 1307 1308 void unbind(String name) { 1309 try { 1310 Context ctx = new InitialContext (); 1311 ctx.unbind(name); 1312 boundNames.remove(name); 1313 } 1314 catch (Exception exc) {} 1315 } 1316 1317 1318 private void readObject(java.io.ObjectInputStream in) 1319 throws java.io.IOException , ClassNotFoundException 1320 { 1321 in.defaultReadObject(); 1322 consumers = new Hashtable (); 1323 producers = new Vector (); 1324 } 1325 1326 1327 public void setPlatformConfigDir(java.lang.String platformConfigDir) { 1331 this.platformConfigDir = platformConfigDir; 1332 } 1333 1334 public void setPersistentPlatform(java.lang.Boolean persistentPlatform) { 1335 this.persistentPlatform = persistentPlatform.booleanValue(); 1336 } 1337 1338 public void setServerId(java.lang.Short serverId) { 1339 this.serverId = serverId.shortValue(); 1340 } 1341 1342 public void setClusterId(java.lang.Short clusterId) { 1343 this.clusterId = clusterId.shortValue(); 1344 if (this.clusterId != AgentServer.NULL_ID){ 1345 this.isHa = true; 1346 } 1347 } 1348 1349 public void setServerName(java.lang.String serverName) { 1350 this.serverName = serverName; 1351 } 1352 1353 public void setAdminFile(java.lang.String adminFile) { 1354 this.adminFile = adminFile; 1355 } 1356 1357 public void setAdminFileXML(java.lang.String adminFileXML) { 1358 this.adminFileXML = adminFileXML; 1359 } 1360 1361 public void setCollocatedServer(java.lang.Boolean collocatedServer) { 1362 collocated = collocatedServer.booleanValue(); 1363 } 1364 1365 public void setHostName(java.lang.String hostName) { 1366 this.hostName = hostName; 1367 } 1368 1369 public void setServerPort(java.lang.Integer serverPort) { 1370 this.serverPort = serverPort.intValue(); 1371 } 1372 1373 public void setConnectingTimer(java.lang.Integer connectingTimer) { 1374 this.connectingTimer = connectingTimer.intValue(); 1375 } 1376 1377 public void setTxPendingTimer(java.lang.Integer txPendingTimer) { 1378 this.txPendingTimer = txPendingTimer.intValue(); 1379 } 1380 1381 public void setCnxPendingTimer(java.lang.Integer cnxPendingTimer) { 1382 this.cnxPendingTimer = cnxPendingTimer.intValue(); 1383 } 1384 1385 public void setQueueMessageReadMax(java.lang.Integer queueMessageReadMax) { 1386 this.queueMessageReadMax = queueMessageReadMax.intValue(); 1387 } 1388 1389 public void setTopicAckBufferMax(java.lang.Integer topicAckBufferMax) { 1390 this.topicAckBufferMax = topicAckBufferMax.intValue(); 1391 } 1392 1393 public void setTopicPassivationThreshold(java.lang.Integer topicPassivationThreshold) { 1394 this.topicPassivationThreshold = topicPassivationThreshold.intValue(); 1395 } 1396 1397 public void setTopicActivationThreshold(java.lang.Integer topicActivationThreshold) { 1398 this.topicActivationThreshold = topicActivationThreshold.intValue(); 1399 } 1400 1401 public void setAsyncSend(java.lang.Boolean asyncSend) { 1402 this.asyncSend = asyncSend.booleanValue(); 1403 } 1404 1405 public void setMultiThreadSync(java.lang.Boolean multiThreadSync) { 1406 this.multiThreadSync = multiThreadSync.booleanValue(); 1407 } 1408 1409 public void setMultiThreadSyncDelay(java.lang.Integer multiThreadSyncDelay) { 1410 this.multiThreadSyncDelay = multiThreadSyncDelay.intValue(); 1411 } 1412 1413 public java.lang.String getPlatformConfigDir() { 1414 return platformConfigDir; 1415 } 1416 1417 public java.lang.Boolean getPersistentPlatform() { 1418 return new Boolean (persistentPlatform); 1419 } 1420 1421 public Short getServerId() { 1422 return new Short (serverId); 1423 } 1424 1425 public java.lang.String getServerName() { 1426 return serverName; 1427 } 1428 1429 public java.lang.String getAdminFile() { 1430 return adminFile; 1431 } 1432 1433 public java.lang.String getAdminFileXML() { 1434 return adminFileXML; 1435 } 1436 1437 public java.lang.String getAdminFileExportXML() { 1438 return adminFileExportXML; 1439 } 1440 1441 public java.lang.Boolean getCollocatedServer() { 1442 return new Boolean (collocated); 1443 } 1444 1445 public java.lang.String getHostName() { 1446 return hostName; 1447 } 1448 1449 public java.lang.Integer getServerPort() { 1450 return new Integer (serverPort); 1451 } 1452 1453 public java.lang.Integer getConnectingTimer() { 1454 return new Integer (connectingTimer); 1455 } 1456 1457 public java.lang.Integer getTxPendingTimer() { 1458 return new Integer (txPendingTimer); 1459 } 1460 1461 public java.lang.Integer getCnxPendingTimer() { 1462 return new Integer (cnxPendingTimer); 1463 } 1464 1465 public java.lang.Integer getQueueMessageReadMax() { 1466 return new Integer (queueMessageReadMax); 1467 } 1468 1469 public java.lang.Integer getTopicAckBufferMax() { 1470 return new Integer (topicAckBufferMax); 1471 } 1472 1473 public java.lang.Integer getTopicPassivationThreshold() { 1474 return new Integer (topicPassivationThreshold); 1475 } 1476 1477 public java.lang.Integer getTopicActivationThreshold() { 1478 return new Integer (topicActivationThreshold); 1479 } 1480 1481 public java.lang.Boolean getAsyncSend() { 1482 return new Boolean (asyncSend); 1483 } 1484 1485 public java.lang.Boolean getMultiThreadSync() { 1486 return new Boolean (multiThreadSync); 1487 } 1488 1489 public java.lang.Integer getMultiThreadSyncDelay() { 1490 return new Integer (multiThreadSyncDelay); 1491 } 1492 1493 1494 1497 public java.lang.Boolean getDeleteDurableSubscription() { 1498 return new Boolean (deleteDurableSubscription); 1499 } 1500 1501 1505 1506 public void setDeleteDurableSubscription(java.lang.Boolean flg) { 1507 this.deleteDurableSubscription = flg.booleanValue(); 1508 } 1509 1510 1517 public void exportRepositoryToFile(String exportDir) throws AdminException { 1518 joramAdmin.exportRepositoryToFile(exportDir); 1519 } 1520 1521 1526 public boolean executeXMLAdminJMX(String path) 1527 throws Exception { 1528 boolean executeAdmin = joramAdmin.executeXMLAdmin(path); 1529 adminConnect(); 1530 return executeAdmin; 1531 } 1532} 1533 | Popular Tags |