1 55 56 package org.jboss.axis.transport.jms; 57 58 import org.jboss.axis.components.jms.JMSVendorAdapter; 59 60 import javax.jms.BytesMessage ; 61 import javax.jms.ConnectionFactory ; 62 import javax.jms.Destination ; 63 import javax.jms.ExceptionListener ; 64 import javax.jms.JMSException ; 65 import javax.jms.Message ; 66 import javax.jms.MessageConsumer ; 67 import javax.jms.MessageProducer ; 68 import javax.jms.Session ; 69 import java.io.ByteArrayOutputStream ; 70 import java.util.HashMap ; 71 import java.util.Iterator ; 72 import java.util.LinkedList ; 73 import java.util.Map ; 74 75 79 90 public abstract class JMSConnector 91 { 92 protected int m_numRetries; 93 protected long m_connectRetryInterval; 94 protected long m_interactRetryInterval; 95 protected long m_timeoutTime; 96 protected long m_poolTimeout; 97 protected AsyncConnection m_receiveConnection; 98 protected SyncConnection m_sendConnection; 99 protected int m_numSessions; 100 protected boolean m_allowReceive; 101 protected JMSVendorAdapter m_adapter; 102 103 public JMSConnector(ConnectionFactory connectionFactory, 104 int numRetries, 105 int numSessions, 106 long connectRetryInterval, 107 long interactRetryInterval, 108 long timeoutTime, 109 boolean allowReceive, 110 String clientID, 111 String username, 112 String password, 113 JMSVendorAdapter adapter) 114 throws JMSException 115 { 116 m_numRetries = numRetries; 117 m_connectRetryInterval = connectRetryInterval; 118 m_interactRetryInterval = interactRetryInterval; 119 m_timeoutTime = timeoutTime; 120 m_poolTimeout = timeoutTime / (long)numRetries; 121 m_numSessions = numSessions; 122 m_allowReceive = allowReceive; 123 m_adapter = adapter; 124 125 javax.jms.Connection sendConnection = createConnectionWithRetry(connectionFactory, 129 username, 130 password); 131 m_sendConnection = createSyncConnection(connectionFactory, sendConnection, 132 m_numSessions, "SendThread", 133 clientID, 134 username, 135 password); 136 137 m_sendConnection.start(); 138 139 if (m_allowReceive) 140 { 141 javax.jms.Connection receiveConnection = createConnectionWithRetry(connectionFactory, 142 username, 143 password); 144 m_receiveConnection = createAsyncConnection(connectionFactory, 145 receiveConnection, 146 "ReceiveThread", 147 clientID, 148 username, 149 password); 150 m_receiveConnection.start(); 151 } 152 } 153 154 protected javax.jms.Connection createConnectionWithRetry(ConnectionFactory connectionFactory, 155 String username, 156 String password) 157 throws JMSException 158 { 159 javax.jms.Connection connection = null; 160 for (int numTries = 1; connection == null; numTries++) 161 { 162 try 163 { 164 connection = internalConnect(connectionFactory, username, password); 165 } 166 catch (JMSException jmse) 167 { 168 if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries) 169 throw jmse; 170 else 171 try 172 { 173 Thread.sleep(m_connectRetryInterval); 174 } 175 catch (InterruptedException ie) 176 { 177 } 178 ; 179 } 180 } 181 return connection; 182 } 183 184 public void stop() 185 { 186 m_sendConnection.stopConnection(); 187 if (m_allowReceive) 188 m_receiveConnection.stopConnection(); 189 } 190 191 public void start() 192 { 193 m_sendConnection.startConnection(); 194 if (m_allowReceive) 195 m_receiveConnection.startConnection(); 196 } 197 198 public void shutdown() 199 { 200 m_sendConnection.shutdown(); 201 if (m_allowReceive) 202 m_receiveConnection.shutdown(); 203 } 204 205 public abstract JMSEndpoint createEndpoint(String destinationName) 206 throws JMSException ; 207 208 public abstract JMSEndpoint createEndpoint(Destination destination) 209 throws JMSException ; 210 211 212 protected abstract javax.jms.Connection internalConnect(ConnectionFactory connectionFactory, 213 String username, 214 String password) 215 throws JMSException ; 216 217 private abstract class Connection extends Thread implements ExceptionListener 218 { 219 private ConnectionFactory m_connectionFactory; 220 protected javax.jms.Connection m_connection; 221 222 protected boolean m_isActive; 223 private boolean m_needsToConnect; 224 private boolean m_startConnection; 225 private String m_clientID; 226 private String m_username; 227 private String m_password; 228 229 private Object m_jmsLock; 230 private Object m_lifecycleLock; 231 232 233 protected Connection(ConnectionFactory connectionFactory, 234 javax.jms.Connection connection, 235 String threadName, 236 String clientID, 237 String username, 238 String password) 239 throws JMSException 240 { 241 super(threadName); 242 m_connectionFactory = connectionFactory; 243 244 m_clientID = clientID; 245 m_username = username; 246 m_password = password; 247 248 m_jmsLock = new Object (); 249 m_lifecycleLock = new Object (); 250 251 if (connection != null) 252 { 253 m_needsToConnect = false; 254 m_connection = connection; 255 m_connection.setExceptionListener(this); 256 if (m_clientID != null) 257 m_connection.setClientID(m_clientID); 258 } 259 else 260 { 261 m_needsToConnect = true; 262 } 263 264 m_isActive = true; 265 } 266 267 270 271 public void run() 272 { 273 while (m_isActive) 276 { 277 if (m_needsToConnect) 278 { 279 m_connection = null; 280 try 281 { 282 m_connection = internalConnect(m_connectionFactory, 283 m_username, m_password); 284 m_connection.setExceptionListener(this); 285 if (m_clientID != null) 286 m_connection.setClientID(m_clientID); 287 } 288 catch (JMSException e) 289 { 290 try 292 { 293 Thread.sleep(m_connectRetryInterval); 294 } 295 catch (InterruptedException ie) 296 { 297 } 298 continue; 299 } 300 } 301 else 302 m_needsToConnect = true; 305 try 307 { 308 internalOnConnect(); 309 } 310 catch (Exception e) 311 { 312 continue; 315 } 316 317 synchronized (m_jmsLock) 318 { 319 try 320 { 321 m_jmsLock.wait(); 322 } 323 catch (InterruptedException ie) 324 { 325 } } 327 } 328 329 internalOnShutdown(); 331 } 332 333 334 void startConnection() 335 { 336 synchronized (m_lifecycleLock) 337 { 338 if (m_startConnection) 339 return; 340 m_startConnection = true; 341 try 342 { 343 m_connection.start(); 344 } 345 catch (Throwable e) 346 { 347 } } 349 } 350 351 void stopConnection() 352 { 353 synchronized (m_lifecycleLock) 354 { 355 if (!m_startConnection) 356 return; 357 m_startConnection = false; 358 try 359 { 360 m_connection.stop(); 361 } 362 catch (Throwable e) 363 { 364 } } 366 } 367 368 void shutdown() 369 { 370 m_isActive = false; 371 synchronized (m_jmsLock) 372 { 373 m_jmsLock.notifyAll(); 374 } 375 } 376 377 378 public void onException(JMSException exception) 379 { 380 if (m_adapter.isRecoverable(exception, 381 JMSVendorAdapter.ON_EXCEPTION_ACTION)) 382 return; 383 onException(); 384 synchronized (m_jmsLock) 385 { 386 m_jmsLock.notifyAll(); 387 } 388 } 389 390 private final void internalOnConnect() 391 throws Exception 392 { 393 onConnect(); 394 synchronized (m_lifecycleLock) 395 { 396 if (m_startConnection) 397 { 398 try 399 { 400 m_connection.start(); 401 } 402 catch (Throwable e) 403 { 404 } } 406 } 407 } 408 409 private final void internalOnShutdown() 410 { 411 stopConnection(); 412 onShutdown(); 413 try 414 { 415 m_connection.close(); 416 } 417 catch (Throwable e) 418 { 419 } } 421 422 protected abstract void onConnect() throws Exception ; 423 424 protected abstract void onShutdown(); 425 426 protected abstract void onException(); 427 } 428 429 protected abstract SyncConnection createSyncConnection(ConnectionFactory factory, 430 javax.jms.Connection connection, 431 int numSessions, 432 String threadName, 433 String clientID, 434 String username, 435 String password) 436 437 throws JMSException ; 438 439 SyncConnection getSendConnection() 440 { 441 return m_sendConnection; 442 } 443 444 protected abstract class SyncConnection extends Connection 445 { 446 LinkedList m_senders; 447 int m_numSessions; 448 Object m_senderLock; 449 450 SyncConnection(ConnectionFactory connectionFactory, 451 javax.jms.Connection connection, 452 int numSessions, 453 String threadName, 454 String clientID, 455 String username, 456 String password) 457 throws JMSException 458 { 459 super(connectionFactory, connection, threadName, 460 clientID, username, password); 461 m_senders = new LinkedList (); 462 m_numSessions = numSessions; 463 m_senderLock = new Object (); 464 } 465 466 protected abstract SendSession createSendSession(javax.jms.Connection connection) 467 throws JMSException ; 468 469 protected void onConnect() 470 throws JMSException 471 { 472 synchronized (m_senderLock) 473 { 474 for (int i = 0; i < m_numSessions; i++) 475 { 476 m_senders.add(createSendSession(m_connection)); 477 } 478 m_senderLock.notifyAll(); 479 } 480 } 481 482 byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties) 483 throws Exception 484 { 485 long timeoutTime = System.currentTimeMillis() + timeout; 486 while (true) 487 { 488 if (System.currentTimeMillis() > timeoutTime) 489 { 490 throw new InvokeTimeoutException("Unable to complete call in time allotted"); 491 } 492 493 SendSession sendSession = null; 494 try 495 { 496 sendSession = getSessionFromPool(m_poolTimeout); 497 byte[] response = sendSession.call(endpoint, 498 message, 499 timeoutTime - System.currentTimeMillis(), 500 properties); 501 returnSessionToPool(sendSession); 502 if (response == null) 503 { 504 throw new InvokeTimeoutException("Unable to complete call in time allotted"); 505 } 506 return response; 507 } 508 catch (JMSException jmse) 509 { 510 if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) 511 { 512 returnSessionToPool(sendSession); 515 throw jmse; 516 } 517 518 Thread.yield(); 522 continue; 523 } 524 catch (NullPointerException npe) 525 { 526 Thread.yield(); 527 continue; 528 } 529 } 530 } 531 532 534 void send(JMSEndpoint endpoint, byte[] message, HashMap properties) 535 throws Exception 536 { 537 long timeoutTime = System.currentTimeMillis() + m_timeoutTime; 538 while (true) 539 { 540 if (System.currentTimeMillis() > timeoutTime) 541 { 542 throw new InvokeTimeoutException("Cannot complete send in time allotted"); 543 } 544 545 SendSession sendSession = null; 546 try 547 { 548 sendSession = getSessionFromPool(m_poolTimeout); 549 sendSession.send(endpoint, message, properties); 550 returnSessionToPool(sendSession); 551 } 552 catch (JMSException jmse) 553 { 554 if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) 555 { 556 returnSessionToPool(sendSession); 559 throw jmse; 560 } 561 Thread.yield(); 565 continue; 566 } 567 catch (NullPointerException npe) 568 { 569 Thread.yield(); 571 continue; 572 } 573 break; 574 } 575 } 576 577 protected void onException() 578 { 579 synchronized (m_senderLock) 580 { 581 m_senders.clear(); 582 } 583 } 584 585 protected void onShutdown() 586 { 587 synchronized (m_senderLock) 588 { 589 Iterator senders = m_senders.iterator(); 590 while (senders.hasNext()) 591 { 592 SendSession session = (SendSession)senders.next(); 593 session.cleanup(); 594 } 595 m_senders.clear(); 596 } 597 } 598 599 private SendSession getSessionFromPool(long timeout) 600 { 601 synchronized (m_senderLock) 602 { 603 while (m_senders.size() == 0) 604 { 605 try 606 { 607 m_senderLock.wait(timeout); 608 if (m_senders.size() == 0) 609 { 610 return null; 611 } 612 } 613 catch (InterruptedException ignore) 614 { 615 return null; 616 } 617 } 618 return (SendSession)m_senders.removeFirst(); 619 } 620 } 621 622 private void returnSessionToPool(SendSession sendSession) 623 { 624 synchronized (m_senderLock) 625 { 626 m_senders.addLast(sendSession); 627 m_senderLock.notifyAll(); 628 } 629 } 630 631 protected abstract class SendSession extends ConnectorSession 632 { 633 MessageProducer m_producer; 634 635 SendSession(Session session, 636 MessageProducer producer) 637 throws JMSException 638 { 639 super(session); 640 m_producer = producer; 641 } 642 643 protected abstract Destination createTemporaryDestination() 644 throws JMSException ; 645 646 protected abstract void deleteTemporaryDestination(Destination destination) 647 throws JMSException ; 648 649 protected abstract MessageConsumer createConsumer(Destination destination) 650 throws JMSException ; 651 652 protected abstract void send(Destination destination, 653 Message message, 654 int deliveryMode, 655 int priority, 656 long timeToLive) 657 throws JMSException ; 658 659 void send(JMSEndpoint endpoint, byte[] message, HashMap properties) 660 throws Exception 661 { 662 BytesMessage jmsMessage = m_session.createBytesMessage(); 663 jmsMessage.writeBytes(message); 664 int deliveryMode = extractDeliveryMode(properties); 665 int priority = extractPriority(properties); 666 long timeToLive = extractTimeToLive(properties); 667 668 if (properties != null && !properties.isEmpty()) 669 setProperties(properties, jmsMessage); 670 671 send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, 672 priority, timeToLive); 673 } 674 675 676 void cleanup() 677 { 678 try 679 { 680 m_producer.close(); 681 } 682 catch (Throwable t) 683 { 684 } 685 try 686 { 687 m_session.close(); 688 } 689 catch (Throwable t) 690 { 691 } 692 } 693 694 byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, 695 HashMap properties) 696 throws Exception 697 { 698 Destination reply = createTemporaryDestination(); 699 MessageConsumer subscriber = createConsumer(reply); 700 BytesMessage jmsMessage = m_session.createBytesMessage(); 701 jmsMessage.writeBytes(message); 702 jmsMessage.setJMSReplyTo(reply); 703 704 int deliveryMode = extractDeliveryMode(properties); 705 int priority = extractPriority(properties); 706 long timeToLive = extractTimeToLive(properties); 707 708 if (properties != null && !properties.isEmpty()) 709 setProperties(properties, jmsMessage); 710 711 send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, 712 priority, timeToLive); 713 BytesMessage response = null; 714 try 715 { 716 response = (BytesMessage )subscriber.receive(timeout); 717 } 718 catch (ClassCastException cce) 719 { 720 throw new InvokeException 721 ("Error: unexpected message type received - expected BytesMessage"); 722 } 723 byte[] respBytes = null; 724 if (response != null) 725 { 726 byte[] buffer = new byte[8 * 1024]; 727 ByteArrayOutputStream out = new ByteArrayOutputStream (); 728 for (int bytesRead = response.readBytes(buffer); 729 bytesRead != -1; bytesRead = response.readBytes(buffer)) 730 { 731 out.write(buffer, 0, bytesRead); 732 } 733 respBytes = out.toByteArray(); 734 } 735 subscriber.close(); 736 deleteTemporaryDestination(reply); 737 return respBytes; 738 } 739 740 private int extractPriority(HashMap properties) 741 { 742 return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY, 743 JMSConstants.DEFAULT_PRIORITY); 744 } 745 746 private int extractDeliveryMode(HashMap properties) 747 { 748 return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE, 749 JMSConstants.DEFAULT_DELIVERY_MODE); 750 } 751 752 private long extractTimeToLive(HashMap properties) 753 { 754 return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE, 755 JMSConstants.DEFAULT_TIME_TO_LIVE); 756 } 757 758 private void setProperties(HashMap properties, Message message) 759 throws JMSException 760 { 761 Iterator propertyIter = properties.entrySet().iterator(); 762 while (propertyIter.hasNext()) 763 { 764 Map.Entry property = (Map.Entry )propertyIter.next(); 765 setProperty((String )property.getKey(), property.getValue(), 766 message); 767 } 768 } 769 770 private void setProperty(String property, Object value, Message message) 771 throws JMSException 772 { 773 if (property == null) 774 return; 775 if (property.equals(JMSConstants.JMS_CORRELATION_ID)) 776 message.setJMSCorrelationID((String )value); 777 else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) 778 message.setJMSCorrelationIDAsBytes((byte[])value); 779 else if (property.equals(JMSConstants.JMS_TYPE)) 780 message.setJMSType((String )value); 781 else 782 message.setObjectProperty(property, value); 783 } 784 } 785 } 786 787 AsyncConnection getReceiveConnection() 788 { 789 return m_receiveConnection; 790 } 791 792 protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory, 793 javax.jms.Connection connection, 794 String threadName, 795 String clientID, 796 String username, 797 String password) 798 799 throws JMSException ; 800 801 protected abstract class AsyncConnection extends Connection 802 { 803 HashMap m_subscriptions; 804 Object m_subscriptionLock; 805 806 protected AsyncConnection(ConnectionFactory connectionFactory, 807 javax.jms.Connection connection, 808 String threadName, 809 String clientID, 810 String username, 811 String password) 812 throws JMSException 813 { 814 super(connectionFactory, connection, threadName, 815 clientID, username, password); 816 m_subscriptions = new HashMap (); 817 m_subscriptionLock = new Object (); 818 } 819 820 protected abstract ListenerSession createListenerSession(javax.jms.Connection connection, 821 Subscription subscription) 822 throws Exception ; 823 824 protected void onShutdown() 825 { 826 synchronized (m_subscriptionLock) 827 { 828 Iterator subscriptions = m_subscriptions.keySet().iterator(); 829 while (subscriptions.hasNext()) 830 { 831 Subscription subscription = (Subscription)subscriptions.next(); 832 ListenerSession session = (ListenerSession) 833 m_subscriptions.get(subscription); 834 if (session != null) 835 { 836 session.cleanup(); 837 } 838 839 } 840 m_subscriptions.clear(); 841 } 842 } 843 844 848 void subscribe(Subscription subscription) 849 throws Exception 850 { 851 long timeoutTime = System.currentTimeMillis() + m_timeoutTime; 852 synchronized (m_subscriptionLock) 853 { 854 if (m_subscriptions.containsKey(subscription)) 855 return; 856 while (true) 857 { 858 if (System.currentTimeMillis() > timeoutTime) 859 { 860 throw new InvokeTimeoutException("Cannot subscribe listener"); 861 } 862 863 try 864 { 865 ListenerSession session = createListenerSession(m_connection, 866 subscription); 867 m_subscriptions.put(subscription, session); 868 break; 869 } 870 catch (JMSException jmse) 871 { 872 if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) 873 { 874 throw jmse; 875 } 876 877 try 878 { 879 m_subscriptionLock.wait(m_interactRetryInterval); 880 } 881 catch (InterruptedException ignore) 882 { 883 } 884 Thread.yield(); 886 continue; 887 } 888 catch (NullPointerException jmse) 889 { 890 try 892 { 893 m_subscriptionLock.wait(m_interactRetryInterval); 894 } 895 catch (InterruptedException ignore) 896 { 897 } 898 Thread.yield(); 900 continue; 901 } 902 } 903 } 904 } 905 906 void unsubscribe(Subscription subscription) 907 { 908 long timeoutTime = System.currentTimeMillis() + m_timeoutTime; 909 synchronized (m_subscriptionLock) 910 { 911 if (!m_subscriptions.containsKey(subscription)) 912 return; 913 while (true) 914 { 915 if (System.currentTimeMillis() > timeoutTime) 916 { 917 throw new InvokeTimeoutException("Cannot unsubscribe listener"); 918 } 919 920 Thread.yield(); 922 try 923 { 924 ListenerSession session = (ListenerSession) 925 m_subscriptions.get(subscription); 926 session.cleanup(); 927 m_subscriptions.remove(subscription); 928 break; 929 } 930 catch (NullPointerException jmse) 931 { 932 try 934 { 935 m_subscriptionLock.wait(m_interactRetryInterval); 936 } 937 catch (InterruptedException ignore) 938 { 939 } 940 continue; 941 } 942 } 943 } 944 } 945 946 protected void onConnect() 947 throws Exception 948 { 949 synchronized (m_subscriptionLock) 950 { 951 Iterator subscriptions = m_subscriptions.keySet().iterator(); 952 while (subscriptions.hasNext()) 953 { 954 Subscription subscription = (Subscription)subscriptions.next(); 955 956 if (m_subscriptions.get(subscription) == null) 957 { 958 m_subscriptions.put(subscription, 959 createListenerSession(m_connection, subscription)); 960 } 961 } 962 m_subscriptionLock.notifyAll(); 963 } 964 } 965 966 protected void onException() 967 { 968 synchronized (m_subscriptionLock) 969 { 970 Iterator subscriptions = m_subscriptions.keySet().iterator(); 971 while (subscriptions.hasNext()) 972 { 973 Subscription subscription = (Subscription)subscriptions.next(); 974 m_subscriptions.put(subscription, null); 975 } 976 } 977 } 978 979 980 protected class ListenerSession extends ConnectorSession 981 { 982 protected MessageConsumer m_consumer; 983 protected Subscription m_subscription; 984 985 ListenerSession(Session session, 986 MessageConsumer consumer, 987 Subscription subscription) 988 throws Exception 989 { 990 super(session); 991 m_subscription = subscription; 992 m_consumer = consumer; 993 Destination destination = subscription.m_endpoint.getDestination(m_session); 994 m_consumer.setMessageListener(subscription.m_listener); 995 } 996 997 void cleanup() 998 { 999 try 1000 { 1001 m_consumer.close(); 1002 } 1003 catch (Exception ignore) 1004 { 1005 } 1006 try 1007 { 1008 m_session.close(); 1009 } 1010 catch (Exception ignore) 1011 { 1012 } 1013 } 1014 1015 } 1016 } 1017 1018 1019 private abstract class ConnectorSession 1020 { 1021 Session m_session; 1022 1023 ConnectorSession(Session session) 1024 throws JMSException 1025 { 1026 m_session = session; 1027 } 1028 1029 } 1030 1031} | Popular Tags |