1 16 17 package org.apache.axis.transport.jms; 18 19 import org.apache.axis.components.jms.JMSVendorAdapter; 20 21 import javax.jms.BytesMessage ; 22 import javax.jms.ConnectionFactory ; 23 import javax.jms.Destination ; 24 import javax.jms.ExceptionListener ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import java.io.ByteArrayOutputStream ; 31 import java.util.HashMap ; 32 import java.util.Iterator ; 33 import java.util.LinkedList ; 34 import java.util.Map ; 35 36 40 52 public abstract class JMSConnector 53 { 54 protected int m_numRetries; 55 protected long m_connectRetryInterval; 56 protected long m_interactRetryInterval; 57 protected long m_timeoutTime; 58 protected long m_poolTimeout; 59 protected AsyncConnection m_receiveConnection; 60 protected SyncConnection m_sendConnection; 61 protected int m_numSessions; 62 protected boolean m_allowReceive; 63 protected JMSVendorAdapter m_adapter; 64 protected JMSURLHelper m_jmsurl; 65 66 public JMSConnector(ConnectionFactory connectionFactory, 67 int numRetries, 68 int numSessions, 69 long connectRetryInterval, 70 long interactRetryInterval, 71 long timeoutTime, 72 boolean allowReceive, 73 String clientID, 74 String username, 75 String password, 76 JMSVendorAdapter adapter, 77 JMSURLHelper jmsurl) 78 throws JMSException 79 { 80 m_numRetries = numRetries; 81 m_connectRetryInterval = connectRetryInterval; 82 m_interactRetryInterval = interactRetryInterval; 83 m_timeoutTime = timeoutTime; 84 m_poolTimeout = timeoutTime/(long)numRetries; 85 m_numSessions = numSessions; 86 m_allowReceive = allowReceive; 87 m_adapter = adapter; 88 m_jmsurl = jmsurl; 89 90 javax.jms.Connection sendConnection = createConnectionWithRetry( 94 connectionFactory, 95 username, 96 password); 97 m_sendConnection = createSyncConnection(connectionFactory, sendConnection, 98 m_numSessions, "SendThread", 99 clientID, 100 username, 101 password); 102 103 m_sendConnection.start(); 104 105 if(m_allowReceive) 106 { 107 javax.jms.Connection receiveConnection = createConnectionWithRetry( 108 connectionFactory, 109 username, 110 password); 111 m_receiveConnection = createAsyncConnection(connectionFactory, 112 receiveConnection, 113 "ReceiveThread", 114 clientID, 115 username, 116 password); 117 m_receiveConnection.start(); 118 } 119 } 120 121 public int getNumRetries() 122 { 123 return m_numRetries; 124 } 125 126 public int numSessions() 127 { 128 return m_numSessions; 129 } 130 131 public ConnectionFactory getConnectionFactory() 132 { 133 return getSendConnection().getConnectionFactory(); 135 } 136 137 public String getClientID() 138 { 139 return getSendConnection().getClientID(); 140 } 141 142 public String getUsername() 143 { 144 return getSendConnection().getUsername(); 145 } 146 147 public String getPassword() 148 { 149 return getSendConnection().getPassword(); 150 } 151 152 public JMSVendorAdapter getVendorAdapter() 153 { 154 return m_adapter; 155 } 156 157 public JMSURLHelper getJMSURL() 158 { 159 return m_jmsurl; 160 } 161 162 protected javax.jms.Connection createConnectionWithRetry( 163 ConnectionFactory connectionFactory, 164 String username, 165 String password) 166 throws JMSException 167 { 168 javax.jms.Connection connection = null; 169 for(int numTries = 1; connection == null; numTries++) 170 { 171 try 172 { 173 connection = internalConnect(connectionFactory, username, password); 174 } 175 catch(JMSException jmse) 176 { 177 if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries) 178 throw jmse; 179 else 180 try{Thread.sleep(m_connectRetryInterval);}catch(InterruptedException ie){}; 181 } 182 } 183 return connection; 184 } 185 186 public void stop() 187 { 188 JMSConnectorManager.getInstance().removeConnectorFromPool(this); 189 190 m_sendConnection.stopConnection(); 191 if(m_allowReceive) 192 m_receiveConnection.stopConnection(); 193 } 194 195 public void start() 196 { 197 m_sendConnection.startConnection(); 198 if(m_allowReceive) 199 m_receiveConnection.startConnection(); 200 201 JMSConnectorManager.getInstance().addConnectorToPool(this); 202 } 203 204 public void shutdown() 205 { 206 m_sendConnection.shutdown(); 207 if(m_allowReceive) 208 m_receiveConnection.shutdown(); 209 } 210 211 public abstract JMSEndpoint createEndpoint(String destinationName) 212 throws JMSException ; 213 214 public abstract JMSEndpoint createEndpoint(Destination destination) 215 throws JMSException ; 216 217 218 protected abstract javax.jms.Connection internalConnect( 219 ConnectionFactory connectionFactory, 220 String username, 221 String password) 222 throws JMSException ; 223 224 private abstract class Connection extends Thread implements ExceptionListener 225 { 226 private ConnectionFactory m_connectionFactory; 227 protected javax.jms.Connection m_connection; 228 229 protected boolean m_isActive; 230 private boolean m_needsToConnect; 231 private boolean m_startConnection; 232 private String m_clientID; 233 private String m_username; 234 private String m_password; 235 236 private Object m_jmsLock; 237 private Object m_lifecycleLock; 238 239 protected Connection(ConnectionFactory connectionFactory, 240 javax.jms.Connection connection, 241 String threadName, 242 String clientID, 243 String username, 244 String password) 245 throws JMSException 246 { 247 super(threadName); 248 m_connectionFactory = connectionFactory; 249 250 m_clientID = clientID; 251 m_username = username; 252 m_password = password; 253 254 m_jmsLock = new Object (); 255 m_lifecycleLock = new Object (); 256 257 if (connection != null) 258 { 259 m_needsToConnect = false; 260 m_connection = connection; 261 m_connection.setExceptionListener(this); 262 if(m_clientID != null) 263 m_connection.setClientID(m_clientID); 264 } 265 else 266 { 267 m_needsToConnect = true; 268 } 269 270 m_isActive = true; 271 } 272 273 public ConnectionFactory getConnectionFactory() 274 { 275 return m_connectionFactory; 276 } 277 278 public String getClientID() 279 { 280 return m_clientID; 281 } 282 public String getUsername() 283 { 284 return m_username; 285 } 286 public String getPassword() 287 { 288 return m_password; 289 } 290 291 294 295 public void run() 296 { 297 while (m_isActive) 300 { 301 if (m_needsToConnect) 302 { 303 m_connection = null; 304 try 305 { 306 m_connection = internalConnect(m_connectionFactory, 307 m_username, m_password); 308 m_connection.setExceptionListener(this); 309 if(m_clientID != null) 310 m_connection.setClientID(m_clientID); 311 } 312 catch(JMSException e) 313 { 314 try { Thread.sleep(m_connectRetryInterval); } catch(InterruptedException ie) { } 316 continue; 317 } 318 } 319 else 320 m_needsToConnect = true; 323 try 325 { 326 internalOnConnect(); 327 } 328 catch(Exception e) 329 { 330 continue; 333 } 334 335 synchronized(m_jmsLock) 336 { 337 try { m_jmsLock.wait(); } catch(InterruptedException ie) { } } 339 } 340 341 internalOnShutdown(); 343 } 344 345 346 347 void startConnection() 348 { 349 synchronized(m_lifecycleLock) 350 { 351 if(m_startConnection) 352 return; 353 m_startConnection = true; 354 try {m_connection.start();}catch(Throwable e) { } } 356 } 357 358 void stopConnection() 359 { 360 synchronized(m_lifecycleLock) 361 { 362 if(!m_startConnection) 363 return; 364 m_startConnection = false; 365 try {m_connection.stop();}catch(Throwable e) { } } 367 } 368 369 void shutdown() 370 { 371 m_isActive = false; 372 synchronized(m_jmsLock) 373 { 374 m_jmsLock.notifyAll(); 375 } 376 } 377 378 379 380 public void onException(JMSException exception) 381 { 382 if(m_adapter.isRecoverable(exception, 383 JMSVendorAdapter.ON_EXCEPTION_ACTION)) 384 return; 385 onException(); 386 synchronized(m_jmsLock) 387 { 388 m_jmsLock.notifyAll(); 389 } 390 } 391 392 private final void internalOnConnect() 393 throws Exception 394 { 395 onConnect(); 396 synchronized(m_lifecycleLock) 397 { 398 if(m_startConnection) 399 { 400 try {m_connection.start();}catch(Throwable e) { } } 402 } 403 } 404 405 private final void internalOnShutdown() 406 { 407 stopConnection(); 408 onShutdown(); 409 try { m_connection.close(); } catch(Throwable e) { } } 411 412 protected abstract void onConnect()throws Exception ; 413 protected abstract void onShutdown(); 414 protected abstract void onException(); 415 } 416 417 protected abstract SyncConnection createSyncConnection(ConnectionFactory factory, 418 javax.jms.Connection connection, 419 int numSessions, 420 String threadName, 421 String clientID, 422 String username, 423 String password) 424 425 throws JMSException ; 426 427 SyncConnection getSendConnection() 428 { 429 return m_sendConnection; 430 } 431 432 protected abstract class SyncConnection extends Connection 433 { 434 LinkedList m_senders; 435 int m_numSessions; 436 Object m_senderLock; 437 438 SyncConnection(ConnectionFactory connectionFactory, 439 javax.jms.Connection connection, 440 int numSessions, 441 String threadName, 442 String clientID, 443 String username, 444 String password) 445 throws JMSException 446 { 447 super(connectionFactory, connection, threadName, 448 clientID, username, password); 449 m_senders = new LinkedList (); 450 m_numSessions = numSessions; 451 m_senderLock = new Object (); 452 } 453 454 protected abstract SendSession createSendSession(javax.jms.Connection connection) 455 throws JMSException ; 456 457 protected void onConnect() 458 throws JMSException 459 { 460 synchronized(m_senderLock) 461 { 462 for(int i = 0; i < m_numSessions; i++) 463 { 464 m_senders.add(createSendSession(m_connection)); 465 } 466 m_senderLock.notifyAll(); 467 } 468 } 469 470 byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties) 471 throws Exception 472 { 473 long timeoutTime = System.currentTimeMillis() + timeout; 474 while(true) 475 { 476 if(System.currentTimeMillis() > timeoutTime) 477 { 478 throw new InvokeTimeoutException("Unable to complete call in time allotted"); 479 } 480 481 SendSession sendSession = null; 482 try 483 { 484 sendSession = getSessionFromPool(m_poolTimeout); 485 byte[] response = sendSession.call(endpoint, 486 message, 487 timeoutTime - System.currentTimeMillis(), 488 properties); 489 returnSessionToPool(sendSession); 490 if(response == null) 491 { 492 throw new InvokeTimeoutException("Unable to complete call in time allotted"); 493 } 494 return response; 495 } 496 catch(JMSException jmse) 497 { 498 if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) 499 { 500 returnSessionToPool(sendSession); 503 throw jmse; 504 } 505 506 Thread.yield(); 510 continue; 511 } 512 catch(NullPointerException npe) 513 { 514 Thread.yield(); 515 continue; 516 } 517 } 518 } 519 520 522 void send(JMSEndpoint endpoint, byte[] message, HashMap properties) 523 throws Exception 524 { 525 long timeoutTime = System.currentTimeMillis() + m_timeoutTime; 526 while(true) 527 { 528 if(System.currentTimeMillis() > timeoutTime) 529 { 530 throw new InvokeTimeoutException("Cannot complete send in time allotted"); 531 } 532 533 SendSession sendSession = null; 534 try 535 { 536 sendSession = getSessionFromPool(m_poolTimeout); 537 sendSession.send(endpoint, message, properties); 538 returnSessionToPool(sendSession); 539 } 540 catch(JMSException jmse) 541 { 542 if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) 543 { 544 returnSessionToPool(sendSession); 547 throw jmse; 548 } 549 Thread.yield(); 553 continue; 554 } 555 catch(NullPointerException npe) 556 { 557 Thread.yield(); 559 continue; 560 } 561 break; 562 } 563 } 564 565 protected void onException() 566 { 567 synchronized(m_senderLock) 568 { 569 m_senders.clear(); 570 } 571 } 572 573 protected void onShutdown() 574 { 575 synchronized(m_senderLock) 576 { 577 Iterator senders = m_senders.iterator(); 578 while(senders.hasNext()) 579 { 580 SendSession session = (SendSession)senders.next(); 581 session.cleanup(); 582 } 583 m_senders.clear(); 584 } 585 } 586 587 private SendSession getSessionFromPool(long timeout) 588 { 589 synchronized(m_senderLock) 590 { 591 while(m_senders.size() == 0) 592 { 593 try 594 { 595 m_senderLock.wait(timeout); 596 if(m_senders.size() == 0) 597 { 598 return null; 599 } 600 } 601 catch(InterruptedException ignore) 602 { 603 return null; 604 } 605 } 606 return (SendSession)m_senders.removeFirst(); 607 } 608 } 609 610 private void returnSessionToPool(SendSession sendSession) 611 { 612 synchronized(m_senderLock) 613 { 614 m_senders.addLast(sendSession); 615 m_senderLock.notifyAll(); 616 } 617 } 618 619 protected abstract class SendSession extends ConnectorSession 620 { 621 MessageProducer m_producer; 622 623 SendSession(Session session, 624 MessageProducer producer) 625 throws JMSException 626 { 627 super(session); 628 m_producer = producer; 629 } 630 631 protected abstract Destination createTemporaryDestination() 632 throws JMSException ; 633 634 protected abstract void deleteTemporaryDestination(Destination destination) 635 throws JMSException ; 636 637 protected abstract MessageConsumer createConsumer(Destination destination) 638 throws JMSException ; 639 640 protected abstract void send(Destination destination, 641 Message message, 642 int deliveryMode, 643 int priority, 644 long timeToLive) 645 throws JMSException ; 646 647 void send(JMSEndpoint endpoint, byte[] message, HashMap properties) 648 throws Exception 649 { 650 BytesMessage jmsMessage = m_session.createBytesMessage(); 651 jmsMessage.writeBytes(message); 652 int deliveryMode = extractDeliveryMode(properties); 653 int priority = extractPriority(properties); 654 long timeToLive = extractTimeToLive(properties); 655 656 if(properties != null && !properties.isEmpty()) 657 setProperties(properties, jmsMessage); 658 659 send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, 660 priority, timeToLive); 661 } 662 663 664 void cleanup() 665 { 666 try{m_producer.close();}catch(Throwable t){} 667 try{m_session.close();}catch(Throwable t){} 668 } 669 670 byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, 671 HashMap properties) 672 throws Exception 673 { 674 Destination reply = createTemporaryDestination(); 675 MessageConsumer subscriber = createConsumer(reply); 676 BytesMessage jmsMessage = m_session.createBytesMessage(); 677 jmsMessage.writeBytes(message); 678 jmsMessage.setJMSReplyTo(reply); 679 680 int deliveryMode = extractDeliveryMode(properties); 681 int priority = extractPriority(properties); 682 long timeToLive = extractTimeToLive(properties); 683 684 if(properties != null && !properties.isEmpty()) 685 setProperties(properties, jmsMessage); 686 687 send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, 688 priority, timeToLive); 689 BytesMessage response = null; 690 try { 691 response = (BytesMessage )subscriber.receive(timeout); 692 } catch (ClassCastException cce) { 693 throw new InvokeException 694 ("Error: unexpected message type received - expected BytesMessage"); 695 } 696 byte[] respBytes = null; 697 if(response != null) 698 { 699 byte[] buffer = new byte[8 * 1024]; 700 ByteArrayOutputStream out = new ByteArrayOutputStream (); 701 for(int bytesRead = response.readBytes(buffer); 702 bytesRead != -1; bytesRead = response.readBytes(buffer)) 703 { 704 out.write(buffer, 0, bytesRead); 705 } 706 respBytes = out.toByteArray(); 707 } 708 subscriber.close(); 709 deleteTemporaryDestination(reply); 710 return respBytes; 711 } 712 713 private int extractPriority(HashMap properties) 714 { 715 return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY, 716 JMSConstants.DEFAULT_PRIORITY); 717 } 718 719 private int extractDeliveryMode(HashMap properties) 720 { 721 return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE, 722 JMSConstants.DEFAULT_DELIVERY_MODE); 723 } 724 725 private long extractTimeToLive(HashMap properties) 726 { 727 return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE, 728 JMSConstants.DEFAULT_TIME_TO_LIVE); 729 } 730 731 private void setProperties(HashMap properties, Message message) 732 throws JMSException 733 { 734 Iterator propertyIter = properties.entrySet().iterator(); 735 while(propertyIter.hasNext()) 736 { 737 Map.Entry property = (Map.Entry )propertyIter.next(); 738 setProperty((String )property.getKey(), property.getValue(), 739 message); 740 } 741 } 742 743 private void setProperty(String property, Object value, Message message) 744 throws JMSException 745 { 746 if(property == null) 747 return; 748 if(property.equals(JMSConstants.JMS_CORRELATION_ID)) 749 message.setJMSCorrelationID((String )value); 750 else if(property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) 751 message.setJMSCorrelationIDAsBytes((byte[])value); 752 else if(property.equals(JMSConstants.JMS_TYPE)) 753 message.setJMSType((String )value); 754 else 755 message.setObjectProperty(property, value); 756 } 757 } 758 } 759 760 AsyncConnection getReceiveConnection() 761 { 762 return m_receiveConnection; 763 } 764 765 protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory, 766 javax.jms.Connection connection, 767 String threadName, 768 String clientID, 769 String username, 770 String password) 771 772 throws JMSException ; 773 774 protected abstract class AsyncConnection extends Connection 775 { 776 HashMap m_subscriptions; 777 Object m_subscriptionLock; 778 779 protected AsyncConnection(ConnectionFactory connectionFactory, 780 javax.jms.Connection connection, 781 String threadName, 782 String clientID, 783 String username, 784 String password) 785 throws JMSException 786 { 787 super(connectionFactory, connection, threadName, 788 clientID, username, password); 789 m_subscriptions = new HashMap (); 790 m_subscriptionLock = new Object (); 791 } 792 793 protected abstract ListenerSession createListenerSession( 794 javax.jms.Connection connection, 795 Subscription subscription) 796 throws Exception ; 797 798 protected void onShutdown() 799 { 800 synchronized(m_subscriptionLock) 801 { 802 Iterator subscriptions = m_subscriptions.keySet().iterator(); 803 while(subscriptions.hasNext()) 804 { 805 Subscription subscription = (Subscription)subscriptions.next(); 806 ListenerSession session = (ListenerSession) 807 m_subscriptions.get(subscription); 808 if(session != null) 809 { 810 session.cleanup(); 811 } 812 813 } 814 m_subscriptions.clear(); 815 } 816 } 817 818 822 void subscribe(Subscription subscription) 823 throws Exception 824 { 825 long timeoutTime = System.currentTimeMillis() + m_timeoutTime; 826 synchronized(m_subscriptionLock) 827 { 828 if(m_subscriptions.containsKey(subscription)) 829 return; 830 while(true) 831 { 832 if(System.currentTimeMillis() > timeoutTime) 833 { 834 throw new InvokeTimeoutException("Cannot subscribe listener"); 835 } 836 837 try 838 { 839 ListenerSession session = createListenerSession(m_connection, 840 subscription); 841 m_subscriptions.put(subscription, session); 842 break; 843 } 844 catch(JMSException jmse) 845 { 846 if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) 847 { 848 throw jmse; 849 } 850 851 try{m_subscriptionLock.wait(m_interactRetryInterval);} 852 catch(InterruptedException ignore){} 853 Thread.yield(); 855 continue; 856 } 857 catch(NullPointerException jmse) 858 { 859 try{m_subscriptionLock.wait(m_interactRetryInterval);} 861 catch(InterruptedException ignore){} 862 Thread.yield(); 864 continue; 865 } 866 } 867 } 868 } 869 870 void unsubscribe(Subscription subscription) 871 { 872 long timeoutTime = System.currentTimeMillis() + m_timeoutTime; 873 synchronized(m_subscriptionLock) 874 { 875 if(!m_subscriptions.containsKey(subscription)) 876 return; 877 while(true) 878 { 879 if(System.currentTimeMillis() > timeoutTime) 880 { 881 throw new InvokeTimeoutException("Cannot unsubscribe listener"); 882 } 883 884 Thread.yield(); 886 try 887 { 888 ListenerSession session = (ListenerSession) 889 m_subscriptions.get(subscription); 890 session.cleanup(); 891 m_subscriptions.remove(subscription); 892 break; 893 } 894 catch(NullPointerException jmse) 895 { 896 try{m_subscriptionLock.wait(m_interactRetryInterval);} 898 catch(InterruptedException ignore){} 899 continue; 900 } 901 } 902 } 903 } 904 905 protected void onConnect() 906 throws Exception 907 { 908 synchronized(m_subscriptionLock) 909 { 910 Iterator subscriptions = m_subscriptions.keySet().iterator(); 911 while(subscriptions.hasNext()) 912 { 913 Subscription subscription = (Subscription)subscriptions.next(); 914 915 if(m_subscriptions.get(subscription) == null) 916 { 917 m_subscriptions.put(subscription, 918 createListenerSession(m_connection, subscription)); 919 } 920 } 921 m_subscriptionLock.notifyAll(); 922 } 923 } 924 925 protected void onException() 926 { 927 synchronized(m_subscriptionLock) 928 { 929 Iterator subscriptions = m_subscriptions.keySet().iterator(); 930 while(subscriptions.hasNext()) 931 { 932 Subscription subscription = (Subscription)subscriptions.next(); 933 m_subscriptions.put(subscription, null); 934 } 935 } 936 } 937 938 939 940 protected class ListenerSession extends ConnectorSession 941 { 942 protected MessageConsumer m_consumer; 943 protected Subscription m_subscription; 944 945 ListenerSession(Session session, 946 MessageConsumer consumer, 947 Subscription subscription) 948 throws Exception 949 { 950 super(session); 951 m_subscription = subscription; 952 m_consumer = consumer; 953 Destination destination = subscription.m_endpoint.getDestination(m_session); 954 m_consumer.setMessageListener(subscription.m_listener); 955 } 956 957 void cleanup() 958 { 959 try{m_consumer.close();}catch(Exception ignore){} 960 try{m_session.close();}catch(Exception ignore){} 961 } 962 963 } 964 } 965 966 private abstract class ConnectorSession 967 { 968 Session m_session; 969 970 ConnectorSession(Session session) 971 throws JMSException 972 { 973 m_session = session; 974 } 975 } 976 } | Popular Tags |