1 7 package org.jboss.jms.serverless.client; 8 9 10 import org.jboss.logging.Logger; 11 import javax.jms.ConnectionFactory ; 12 import javax.jms.Connection ; 13 import java.util.List ; 14 import java.util.ArrayList ; 15 import java.util.Iterator ; 16 import javax.naming.Context ; 17 import javax.naming.InitialContext ; 18 import javax.jms.Session ; 19 import java.util.Map ; 20 import java.util.HashMap ; 21 import javax.jms.Destination ; 22 import javax.jms.MessageProducer ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.Message ; 26 import javax.jms.TextMessage ; 27 import java.util.Set ; 28 29 37 public class Interactive { 38 39 private static final Logger log = Logger.getLogger(Interactive.class); 40 41 43 private Context initialContext; 44 45 49 private Map connectionFactories; private Map destinations; private List connectionHolders; 53 57 public Interactive() throws Exception { 58 59 connectionFactories = new HashMap (); 60 destinations = new HashMap (); 61 connectionHolders = new ArrayList (); 62 initJNDI(); 63 } 64 65 69 public void exit() { 70 71 int exitValue = 0; 73 for(Iterator i = connectionHolders.iterator(); i.hasNext(); ) { 74 Connection c = ((ConnectionHolder)i.next()).getConnection(); 75 try { 76 c.close(); 77 } 78 catch(Exception e) { 79 exitValue ++; 80 log.warn("Trouble closing connection "+c, e); 81 } 82 } 83 System.exit(exitValue); 84 } 85 86 89 public void runtime() throws Exception { 90 91 System.out.println(); 92 System.out.println("JMS Runtime: "); 93 System.out.println(); 94 95 System.out.print("Connection Factories: "); 96 if (connectionFactories.size() == 0) { 97 System.out.println("No Known ConnectionFactories"); 98 } 99 else { 100 System.out.println(); 101 103 for(Iterator i = connectionFactories.keySet().iterator(); i.hasNext(); ) { 104 String jndiName = (String )i.next(); 105 ConnectionFactory cf = (ConnectionFactory )connectionFactories.get(jndiName); 106 System.out.println("\t"+jndiName+" - "+cf); 107 } 108 } 109 System.out.print("Destinations: "); 110 if (destinations.size() == 0) { 111 System.out.println("No Known Destinations"); 112 } 113 else { 114 System.out.println(); 115 for(Iterator i = destinations.keySet().iterator(); i.hasNext(); ) { 116 String jndiName = (String )i.next(); 117 Destination d = (Destination )destinations.get(jndiName); 118 System.out.println("\t"+jndiName+" - "+d.getClass().getName()); 119 } 120 } 121 System.out.println(); 122 System.out.print("Connections"); 123 if (connectionHolders.size() == 0) { 124 System.out.println(": No Active Connections"); 125 } 126 else { 127 System.out.println(": "); 128 int idx = 0; 129 for(Iterator ci = connectionHolders.iterator(); ci.hasNext(); idx++) { 130 ConnectionHolder ch = (ConnectionHolder)ci.next(); 131 Connection c = ch.getConnection(); 132 ConnectionFactory cf = ch.getConnectionFactory(); 133 String cfJNDIName = getConnectionFactoryJNDIName(cf); 134 List sessionHolders = ch.getSessionHolders(); 135 System.out.println("\t" + idx + " " + c + " produced by '" + cfJNDIName + "'"); 136 System.out.print("\t\tSessions: "); 137 if (sessionHolders.isEmpty()) { 138 System.out.println("No Active Sessions"); 139 } 140 else { 141 System.out.println(); 142 int sidx = 0; 143 for(Iterator i = sessionHolders.iterator(); i.hasNext(); sidx++) { 144 SessionHolder h = (SessionHolder)i.next(); 145 Session s = h.getSession(); 146 System.out.println("\t\tSession "+idx+"."+sidx+" ("+ 147 transactedToString(s.getTransacted())+", "+ 148 acknowledgeModeToString(s.getAcknowledgeMode())+"): "); 149 List producers = h.getProducers(); 150 if (producers.size() == 0) { 151 System.out.println("\t\t\tNo Producers"); 152 } 153 else { 154 int pidx = 0; 155 for(Iterator j = producers.iterator(); j.hasNext(); pidx++) { 156 MessageProducer p = (MessageProducer )j.next(); 157 System.out.println("\t\t\tProducer "+idx+"."+sidx+"."+pidx+" for "+ 158 getDestinationJNDIName(p.getDestination())); 159 } 160 } 161 List consumers = h.getConsumers(); 162 if (consumers.size() == 0) { 163 System.out.println("\t\t\tNo Consumers"); 164 } 165 else { 166 int cidx = 0; 167 for(Iterator j = consumers.iterator(); j.hasNext(); cidx++) { 168 MessageConsumer mc = (MessageConsumer )j.next(); 169 System.out.print("\t\t\tConsumer " +idx+"."+sidx+"."+cidx+" "+mc); 170 if (mc.getMessageListener() != null) { 171 System.out.println(", MessageListener ON"); 172 } 173 else { 174 System.out.println(", MessageListener OFF"); 175 } 176 } 177 } 178 } 179 } 180 } 181 } 182 System.out.println(); 183 System.out.println(); 184 } 185 186 189 public void lookupConnectionFactory(String name) throws Exception { 190 191 ConnectionFactory cf = (ConnectionFactory )initialContext.lookup(name); 192 connectionFactories.put(name, cf); 193 } 194 195 199 public void lookupDestination(String destinationJNDIName) throws Exception { 200 201 Destination d = (Destination )initialContext.lookup(destinationJNDIName); 202 destinations.put(destinationJNDIName, d); 203 } 204 205 public void createConnection(String connectionFactoryJNDIName) throws Exception { 206 207 lookupConnectionFactory(connectionFactoryJNDIName); 208 ConnectionFactory cf = 209 (ConnectionFactory )connectionFactories.get(connectionFactoryJNDIName); 210 Connection c = cf.createConnection(); 211 ConnectionHolder ch = new ConnectionHolder(c, cf, new ArrayList ()); 212 connectionHolders.add(ch); 213 } 214 215 public void createConnection() throws Exception { 217 218 Set names = connectionFactories.keySet(); 219 if (names.isEmpty()) { 220 log.error("No ConnectionFactory has been looked up yet!"); 221 return; 222 } 223 if (names.size() > 1) { 224 String msg = 225 "There is more than one ConnectionFactory available. Specify the JNDI name when "+ 226 "creating a connection"; 227 log.error(msg); 228 return; 229 } 230 createConnection((String )(names.toArray()[0])); 231 } 232 233 234 public void start(int index) throws Exception { 235 236 try { 237 connectionOK(index); 238 } 239 catch(Exception e) { 240 log.error(e.getMessage()); 241 return; 242 } 243 Connection c = ((ConnectionHolder)connectionHolders.get(index)).getConnection(); 244 c.start(); 245 } 246 247 public void start() throws Exception { 249 if (connectionHolders.size() == 0) { 250 log.error("No Connection has been created yet."); 251 return; 252 } 253 if (connectionHolders.size() > 1) { 254 log.error("There are more than one active Connections. Use start(index)."); 255 return; 256 } 257 start(0); 258 } 259 260 261 public void stop(int index) throws Exception { 262 263 try { 264 connectionOK(index); 265 } 266 catch(Exception e) { 267 log.error(e.getMessage()); 268 return; 269 } 270 Connection c = ((ConnectionHolder)connectionHolders.get(index)).getConnection(); 271 c.stop(); 272 } 273 274 public void close(int index) throws Exception { 275 276 try { 277 connectionOK(index); 278 } 279 catch(Exception e) { 280 log.error(e.getMessage()); 281 return; 282 } 283 ConnectionHolder ch = (ConnectionHolder)connectionHolders.get(index); 284 Connection c = ch.getConnection(); 285 c.close(); 286 ch.destroy(); 287 connectionHolders.remove(index); 288 289 } 290 291 292 293 294 295 305 public void createSession(int index, boolean transacted, String acknowledgeModeString) 306 throws Exception { 307 308 try { 309 connectionOK(index); 310 } 311 catch(Exception e) { 312 log.error(e.getMessage()); 313 return; 314 } 315 316 int acknowledgeMode = -1; 317 318 try { 319 acknowledgeMode = parseAcknowledgeModeString(acknowledgeModeString); 320 } 321 catch(Exception e) { 322 return; 324 } 325 326 ConnectionHolder ch = (ConnectionHolder)connectionHolders.get(index); 327 List sessionHolders = ch.getSessionHolders(); 328 Session s = ch.getConnection().createSession(transacted, acknowledgeMode); 329 sessionHolders.add(new SessionHolder(s, new ArrayList (), new ArrayList ())); 330 331 } 332 333 339 public void createProducer(String sessionID, String destinationJNDIName) throws Exception { 340 341 int[] indices = parseCompositeID2(sessionID); 342 int connIdx = indices[0]; 343 int sessionIdx = indices[1]; 344 345 try { 346 connectionOK(connIdx); 347 } 348 catch(Exception e) { 349 log.error(e.getMessage()); 350 return; 351 } 352 353 List sessionHolders = 354 ((ConnectionHolder)connectionHolders.get(connIdx)).getSessionHolders(); 355 356 if (sessionIdx >= sessionHolders.size()) { 357 String msg = 358 "There is no Session with the index "+sessionIdx+". Currently there are "+ 359 sessionHolders.size()+" active Sessions for this Connection."; 360 log.error(msg); 361 return; 362 } 363 364 SessionHolder h = (SessionHolder)sessionHolders.get(sessionIdx); 365 Session s = h.getSession(); 366 Destination d = getDestination(destinationJNDIName); 367 MessageProducer p = s.createProducer(d); 368 h.getProducers().add(p); 369 } 370 371 377 public void createConsumer(String sessionID, String destinationJNDIName) throws Exception { 378 379 int[] indices = parseCompositeID2(sessionID); 380 int connIdx = indices[0]; 381 int sessionIdx = indices[1]; 382 383 try { 384 connectionOK(connIdx); 385 } 386 catch(Exception e) { 387 log.error(e.getMessage()); 388 return; 389 } 390 391 List sessionHolders = 392 ((ConnectionHolder)connectionHolders.get(connIdx)).getSessionHolders(); 393 394 if (sessionIdx >= sessionHolders.size()) { 395 String msg = 396 "There is no Session with the index "+sessionIdx+". Currently there are "+ 397 sessionHolders.size()+" active Sessions for this Connection."; 398 log.error(msg); 399 return; 400 } 401 402 SessionHolder h = (SessionHolder)sessionHolders.get(sessionIdx); 403 Session s = h.getSession(); 404 Destination d = getDestination(destinationJNDIName); 405 MessageConsumer c = s.createConsumer(d); 406 h.getConsumers().add(c); 407 } 408 409 414 public void closeConsumer(String consumerID) throws Exception { 415 416 MessageConsumer c = null; 417 try { 418 c = (MessageConsumer )getSessionChild(consumerID, false); 419 } 420 catch(Exception e) { 421 log.error(e.getMessage()); 422 return; 423 } 424 c.close(); 425 getSessionHolder(consumerID).getConsumers().remove(c); 426 } 427 428 429 435 public void setMessageListener(String consumerID) throws Exception { 436 437 MessageConsumer c = null; 438 try { 439 c = (MessageConsumer )getSessionChild(consumerID, false); 440 } 441 catch(Exception e) { 442 log.error(e.getMessage()); 443 return; 444 } 445 446 final MessageConsumer myConsumer = c; 449 c.setMessageListener(new MessageListener () { 450 public void onMessage(Message message) { 451 try { 452 String myConsumersID = getSessionChildID(myConsumer); 453 String output = "Consumer "+myConsumersID+": "; 454 if (message instanceof TextMessage ) { 455 output += ((TextMessage )message).getText(); 456 } 457 else { 458 output += message.toString(); 459 } 460 System.out.println(output); 461 } 462 catch(Exception e) { 463 log.error("Failed to process message", e); 464 } 465 } 466 }); 467 } 468 469 470 473 public void send(String producerID, String payload) throws Exception { 474 475 TextMessage tm = getSession(producerID).createTextMessage(); 476 tm.setText(payload); 477 MessageProducer p = (MessageProducer )getSessionChild(producerID, true); 478 p.send(tm); 479 } 480 481 485 490 public void forward(String consumerID, String producerID) throws Exception { 491 492 final MessageConsumer c = (MessageConsumer )getSessionChild(consumerID, false); 493 final MessageProducer p = (MessageProducer )getSessionChild(producerID, true); 494 MessageListener l = new MessageListener () { 495 public void onMessage(Message message) { 496 try { 497 String consumerID = getSessionChildID(c); 498 String producerID = getSessionChildID(p); 499 p.send(message); 500 String msg = 501 "Consumer "+consumerID+" forwarded message to producer "+producerID; 502 System.out.println(msg); 503 } 504 catch(Exception e) { 505 log.error("Failed to process message", e); 506 } 507 } 508 }; 509 c.setMessageListener(l); 510 } 511 512 516 517 521 private void initJNDI() throws Exception { 522 523 initialContext = new InitialContext (); 524 } 525 526 529 private void connectionOK(int index) throws Exception { 530 531 int size = connectionHolders.size(); 532 String msg = null; 533 if (size == 0) { 534 msg = "No active Connection created yet!"; 535 } 536 else if (index < 0 || index >= size) { 537 msg = 538 "No such Connection index. Valid indexes are 0"+ 539 (size == 0 ? "":" ... "+(size - 1))+"."; 540 } 541 542 if (msg != null) { 543 throw new Exception (msg); 544 } 545 } 546 547 548 private int parseAcknowledgeModeString(String s) throws Exception { 549 550 s = s.toUpperCase(); 551 if ("AUTO_ACKNOWLEDGE".equals(s)) { 552 return Session.AUTO_ACKNOWLEDGE; 553 } 554 else if ("CLIENT_ACKNOWLEDGE".equals(s)) { 555 return Session.CLIENT_ACKNOWLEDGE; 556 } 557 else if ("DUPS_OK_ACKNOWLEDGE".equals(s)) { 558 return Session.DUPS_OK_ACKNOWLEDGE; 559 } 560 else { 561 log.error("Unknow session acknowledment type: "+s); 562 throw new Exception (); 563 } 564 } 565 566 567 private String acknowledgeModeToString(int a) { 568 if (a == Session.AUTO_ACKNOWLEDGE) { 569 return "AUTO_ACKNOWLEDGE"; 570 } 571 else if (a == Session.CLIENT_ACKNOWLEDGE) { 572 return "CLIENT_ACKNOWLEDGE"; 573 } 574 else if (a == Session.DUPS_OK_ACKNOWLEDGE) { 575 return "DUPS_OK_ACKNOWLEDGE"; 576 } 577 else { 578 return "UNKNOWN_ACKNOWLEDGE_TYPE"; 579 } 580 } 581 582 583 private String transactedToString(boolean t) { 584 if (t) { 585 return "TRANSACTED"; 586 } 587 return "NON TRANSACTED"; 588 } 589 590 591 592 596 private Destination getDestination(String destinationJNDIName) throws Exception { 597 598 Destination d = (Destination )destinations.get(destinationJNDIName); 599 if (d == null) { 600 lookupDestination(destinationJNDIName); 601 d = (Destination )destinations.get(destinationJNDIName); 602 } 603 return d; 604 } 605 606 607 608 612 private String getDestinationJNDIName(Destination d) throws Exception { 613 614 for(Iterator i = destinations.keySet().iterator(); i.hasNext(); ) { 615 String name = (String )i.next(); 616 if (d.equals(destinations.get(name))) { 617 return name; 618 } 619 } 620 return null; 621 } 622 623 627 private String getConnectionFactoryJNDIName(ConnectionFactory cf) throws Exception { 628 for(Iterator i = connectionFactories.keySet().iterator(); i.hasNext(); ) { 629 String name = (String )i.next(); 630 if (cf.equals(connectionFactories.get(name))) { 631 return name; 632 } 633 } 634 return null; 635 } 636 637 638 639 644 private int[] parseCompositeID2(String compositeID) throws Exception { 645 646 try { 647 int first, last; 648 int i = compositeID.indexOf('.'); 649 first = Integer.parseInt(compositeID.substring(0, i)); 650 last = Integer.parseInt(compositeID.substring(i+1)); 651 return new int[] { first, last }; 652 } 653 catch(Exception e) { 654 String msg = "Invalid ID format: "+compositeID; 655 throw new Exception (msg); 656 } 657 } 658 659 664 private int[] parseCompositeID3(String compositeID) throws Exception { 665 666 try { 667 int i1; 668 int i = compositeID.indexOf('.'); 669 i1 = Integer.parseInt(compositeID.substring(0, i)); 670 int[] c = parseCompositeID2(compositeID.substring(i+1)); 671 return new int[] { i1, c[0], c[1] }; 672 } 673 catch(Exception e) { 674 String msg = "Invalid ID format: "+compositeID; 675 throw new Exception (msg); 676 } 677 } 678 679 680 686 private SessionHolder getSessionHolder(String compositeID) throws Exception { 687 688 int[] indices = parseCompositeID3(compositeID); 689 int connIdx = indices[0]; 690 int sessionIdx = indices[1]; 691 692 connectionOK(connIdx); 693 694 List sHolders = ((ConnectionHolder)connectionHolders.get(connIdx)).getSessionHolders(); 695 if (sessionIdx < 0 || sessionIdx >= sHolders.size()) { 696 String msg = "Invalid Session index: "+sessionIdx; 697 throw new Exception (msg); 698 } 699 return (SessionHolder)sHolders.get(sessionIdx); 700 } 701 702 703 709 private Session getSession(String compositeID) throws Exception { 710 711 return getSessionHolder(compositeID).getSession(); 712 } 713 714 723 private Object getSessionChild(String compositeID, boolean isProducer) throws Exception { 724 725 SessionHolder h = getSessionHolder(compositeID); 726 int[] indices = parseCompositeID3(compositeID); 727 int childIdx = indices[2]; 728 List l = isProducer ? h.getProducers() : h.getConsumers(); 729 if (childIdx < 0 || childIdx >= l.size()) { 730 String msg = "Invalid "+(isProducer?"producer":"consumer")+" index: "+childIdx; 731 throw new Exception (msg); 732 } 733 return l.get(childIdx); 734 } 735 736 739 private String getSessionChildID(Object sessionChild) { 740 741 String id = null; 742 int cidx = 0; 743 for(Iterator ci = connectionHolders.iterator(); ci.hasNext(); cidx++) { 744 List sh = ((ConnectionHolder)ci.next()).getSessionHolders(); 745 int sidx = 0; 746 for(Iterator i = sh.iterator(); i.hasNext(); sidx++) { 747 SessionHolder h = (SessionHolder)i.next(); 748 int idx = h.getIndex(sessionChild); 749 if (idx == -1) { 750 continue; 751 } 752 return 753 Integer.toString(cidx)+"."+Integer.toString(sidx)+"."+Integer.toString(idx); 754 } 755 } 756 return id; 757 } 758 759 763 766 private class ConnectionHolder { 767 768 private Connection c; 769 private ConnectionFactory cf; 770 private List sessionHolders; 771 772 public ConnectionHolder(Connection c, ConnectionFactory cf, List sessionHolders) { 773 774 this.c = c; 775 this.cf = cf; 776 this.sessionHolders = sessionHolders; 777 } 778 779 public Connection getConnection() { 780 return c; 781 } 782 783 public ConnectionFactory getConnectionFactory() { 784 return cf; 785 } 786 787 790 public List getSessionHolders() { 791 return sessionHolders; 792 } 793 794 798 public void destroy() { 799 c = null; 800 cf = null; 801 for(Iterator i = sessionHolders.iterator(); i.hasNext(); ) { 802 SessionHolder h = (SessionHolder)i.next(); 803 h.destroy(); 804 } 805 sessionHolders.clear(); 806 sessionHolders = null; 807 } 808 } 809 810 811 814 private class SessionHolder { 815 816 private Session s; 817 private List producers; private List consumers; 820 public SessionHolder(Session s, List producers, List consumers) { 821 822 this.s = s; 823 this.producers = producers; 824 this.consumers = consumers; 825 } 826 827 public Session getSession() { 828 return s; 829 } 830 831 834 public List getProducers() { 835 return producers; 836 } 837 838 841 public List getConsumers() { 842 return consumers; 843 } 844 845 849 public List getChildren(Object likeThis) { 850 if (likeThis instanceof MessageProducer ) { 851 return producers; 852 } 853 else if (likeThis instanceof MessageConsumer ) { 854 return consumers; 855 } 856 return null; 857 } 858 859 863 public int getIndex(Object sessionChild) { 864 865 List l = getChildren(sessionChild); 866 if (l == null) { 867 return -1; 869 } 870 return l.indexOf(sessionChild); 871 } 872 873 877 public void destroy() { 878 s = null; 879 producers.clear(); 880 consumers.clear(); 881 producers = null; 882 consumers = null; 883 } 884 885 886 } 887 888 892 896 897 898 } 899 900 901 902 | Popular Tags |