1 22 package org.jboss.mq; 23 24 import java.io.IOException ; 25 import java.io.Serializable ; 26 import java.util.Arrays ; 27 import java.util.HashMap ; 28 import java.util.HashSet ; 29 import java.util.LinkedList ; 30 31 import javax.jms.ConnectionMetaData ; 32 import javax.jms.Destination ; 33 import javax.jms.ExceptionListener ; 34 import javax.jms.IllegalStateException ; 35 import javax.jms.JMSException ; 36 import javax.jms.JMSSecurityException ; 37 import javax.jms.Queue ; 38 import javax.jms.TemporaryQueue ; 39 import javax.jms.TemporaryTopic ; 40 import javax.transaction.xa.Xid ; 41 42 import org.jboss.logging.Logger; 43 import org.jboss.mq.il.ClientILService; 44 import org.jboss.mq.il.ServerIL; 45 import org.jboss.util.UnreachableStatementException; 46 47 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 48 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 49 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 50 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 51 52 102 public abstract class Connection implements Serializable , javax.jms.Connection 103 { 104 105 private static final long serialVersionUID = 87938199839407082L; 106 107 108 private static ThreadGroup threadGroup = new ThreadGroup ("JBossMQ Client Threads"); 109 110 111 static Logger log = Logger.getLogger(Connection.class); 112 113 114 static boolean trace = log.isTraceEnabled(); 115 116 117 static protected ClockDaemon clockDaemon = new ClockDaemon(); 118 119 120 public HashMap destinationSubscriptions = new HashMap (); 121 122 123 public HashMap subscriptions = new HashMap (); 124 125 126 public boolean modeStop; 127 128 129 protected ServerIL serverIL; 130 131 132 protected String clientID; 133 134 135 protected ConnectionToken connectionToken; 136 137 138 protected ClientILService clientILService; 139 140 141 protected long pingPeriod = 1000 * 60; 142 143 144 protected boolean ponged = true; 145 146 147 Semaphore pingTaskSemaphore = new Semaphore(1); 148 149 150 Object pingTaskId; 151 152 153 private SynchronizedBoolean closing = new SynchronizedBoolean(false); 154 155 156 private volatile boolean setClientIdAllowed = true; 157 158 159 HashSet createdSessions; 160 161 162 int subscriptionCounter = Integer.MIN_VALUE; 163 164 165 Object subCountLock = new Object (); 166 167 168 private SynchronizedBoolean closed = new SynchronizedBoolean(false); 169 170 171 SpyXAResourceManager spyXAResourceManager; 172 173 174 GenericConnectionFactory genericConnectionFactory; 175 176 177 private int lastMessageID; 178 179 180 private ExceptionListener exceptionListener; 181 182 183 private Object elLock = new Object (); 184 185 186 private Thread elThread; 187 188 189 private StringBuffer sb = new StringBuffer (); 190 191 192 private char[] charStack = new char[22]; 193 194 195 String sessionId; 196 197 198 protected HashSet temps = new HashSet (); 199 200 static 201 { 202 log.debug("Setting the clockDaemon's thread factory"); 203 clockDaemon.setThreadFactory(new ThreadFactory() 204 { 205 public Thread newThread(Runnable r) 206 { 207 Thread t = new Thread (getThreadGroup(), r, "Connection Monitor Thread"); 208 t.setDaemon(true); 209 return t; 210 } 211 }); 212 } 213 214 public static ThreadGroup getThreadGroup() 215 { 216 if (threadGroup.isDestroyed()) 217 threadGroup = new ThreadGroup ("JBossMQ Client Threads"); 218 return threadGroup; 219 } 220 221 229 Connection(String userName, String password, GenericConnectionFactory genericConnectionFactory) throws JMSException 230 { 231 createdSessions = new HashSet (); 233 connectionToken = null; 234 lastMessageID = 0; 235 modeStop = true; 236 237 if (trace) 238 log.trace("Connection Initializing userName=" + userName + " " + this); 239 this.genericConnectionFactory = genericConnectionFactory; 240 genericConnectionFactory.initialise(this); 241 242 if (trace) 244 log.trace("Getting the serverIL " + this); 245 serverIL = genericConnectionFactory.createServerIL(); 246 if (trace) 247 log.trace("serverIL=" + serverIL + " " + this); 248 249 try 251 { 252 authenticate(userName, password); 253 254 if (userName != null) 255 askForAnID(userName, password); 256 257 startILService(); 258 } 259 catch (Throwable t) 260 { 261 try 263 { 264 serverIL.connectionClosing(null); 265 } 266 catch (Throwable t2) 267 { 268 log.debug("Error closing the connection", t2); 269 } 270 271 SpyJMSException.rethrowAsJMSException("Failed to create connection", t); 272 } 273 274 try 276 { 277 if (trace) 278 log.trace("Creating XAResourceManager " + this); 279 280 spyXAResourceManager = new SpyXAResourceManager(this); 282 283 if (trace) 284 log.trace("Starting the ping thread " + this); 285 startPingThread(); 286 287 if (trace) 288 log.trace("Connection establishment successful " + this); 289 } 290 catch (Throwable t) 291 { 292 try 295 { 296 serverIL.connectionClosing(connectionToken); 297 } 298 catch (Throwable t2) 299 { 300 log.debug("Error closing the connection", t2); 301 } 302 try 303 { 304 stopILService(); 305 } 306 catch (Throwable t2) 307 { 308 log.debug("Error stopping the client IL", t2); 309 } 310 311 SpyJMSException.rethrowAsJMSException("Failed to create connection", t); 312 } 313 } 314 315 321 Connection(GenericConnectionFactory genericConnectionFactory) throws JMSException 322 { 323 this(null, null, genericConnectionFactory); 324 } 325 326 331 public ServerIL getServerIL() 332 { 333 return serverIL; 334 } 335 336 339 public void asynchClose() 340 { 341 } 343 344 349 public void asynchDeleteTemporaryDestination(SpyDestination dest) 350 { 351 if (trace) 352 log.trace("Deleting temporary destination " + dest); 353 try 354 { 355 deleteTemporaryDestination(dest); 356 } 357 catch (Throwable t) 358 { 359 asynchFailure("Error deleting temporary destination " + dest, t); 360 } 361 } 362 363 368 public void asynchDeliver(ReceiveRequest requests[]) 369 { 370 if (closing.get()) 372 return; 373 374 if (trace) 375 log.trace("Async deliver requests=" + Arrays.asList(requests) + " " + this); 376 377 try 378 { 379 for (int i = 0; i < requests.length; i++) 380 { 381 ReceiveRequest r = requests[i]; 382 if (trace) 383 log.trace("Processing request=" + r + " " + this); 384 385 SpyConsumer consumer = (SpyConsumer) subscriptions.get(r.subscriptionId); 386 r.message.createAcknowledgementRequest(r.subscriptionId.intValue()); 387 388 if (consumer == null) 389 { 390 send(r.message.getAcknowledgementRequest(false)); 391 log.debug("WARNING: NACK issued due to non existent subscription " + r.message.header.messageId); 392 continue; 393 } 394 395 if (trace) 396 log.trace("Delivering messageid=" + r.message.header.messageId + " to consumer=" + consumer); 397 398 consumer.addMessage(r.message); 399 } 400 } 401 catch (Throwable t) 402 { 403 asynchFailure("Error during async delivery", t); 404 } 405 } 406 412 public void asynchFailure(String reason, Throwable t) 413 { 414 if (trace) 415 log.trace("Notified of failure reason=" + reason + " " + this, t); 416 417 if (closing.get()) 419 return; 420 421 JMSException excep = SpyJMSException.getAsJMSException(reason, t); 422 423 synchronized (elLock) 424 { 425 ExceptionListener el = exceptionListener; 426 if (el != null && elThread == null) 427 { 428 try 429 { 430 Runnable run = new ExceptionListenerRunnable(el, excep); 431 elThread = new Thread (getThreadGroup(), run, "ExceptionListener " + this); 432 elThread.setDaemon(false); 433 elThread.start(); 434 } 435 catch (Throwable t1) 436 { 437 log.warn("Connection failure: ", excep); 438 log.warn("Unable to start exception listener thread: ", t1); 439 } 440 } 441 else if (elThread != null) 442 log.warn("Connection failure, already in the exception listener", excep); 443 else 444 log.warn("Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect", excep); 445 } 446 } 447 448 453 public void asynchPong(long serverTime) 454 { 455 if (trace) 456 log.trace("PONG serverTime=" + serverTime + " " + this); 457 ponged = true; 458 } 459 460 466 public void deleteTemporaryDestination(SpyDestination dest) throws JMSException 467 { 468 checkClosed(); 469 if (trace) 470 log.trace("DeleteDestination dest=" + dest + " " + this); 471 try 472 { 473 serverIL.deleteTemporaryDestination(connectionToken, dest); 475 476 synchronized (subscriptions) 478 { 479 destinationSubscriptions.remove(dest); 480 } 481 482 synchronized (temps) 484 { 485 temps.remove(dest); 486 } 487 } 488 catch (Throwable t) 489 { 490 491 SpyJMSException.rethrowAsJMSException("Cannot delete the TemporaryDestination", t); 492 } 493 } 494 495 public void setClientID(String cID) throws JMSException 496 { 497 checkClosed(); 498 if (clientID != null) 499 throw new IllegalStateException ("The connection has already a clientID"); 500 if (setClientIdAllowed == false) 501 throw new IllegalStateException ("SetClientID was not called emediately after creation of connection"); 502 503 if (trace) 504 log.trace("SetClientID clientID=" + clientID + " " + this); 505 506 try 507 { 508 serverIL.checkID(cID); 509 } 510 catch (Throwable t) 511 { 512 SpyJMSException.rethrowAsJMSException("Cannot connect to the JMSServer", t); 513 } 514 515 clientID = cID; 516 connectionToken.setClientID(clientID); 517 } 518 519 public String getClientID() throws JMSException 520 { 521 checkClosed(); 522 return clientID; 523 } 524 525 public ExceptionListener getExceptionListener() throws JMSException 526 { 527 checkClosed(); 528 checkClientID(); 529 return exceptionListener; 530 } 531 532 public void setExceptionListener(ExceptionListener listener) throws JMSException 533 { 534 checkClosed(); 535 checkClientID(); 536 537 exceptionListener = listener; 538 } 539 540 public ConnectionMetaData getMetaData() throws JMSException 541 { 542 checkClosed(); 543 checkClientID(); 544 545 return new SpyConnectionMetaData(); 546 } 547 548 public synchronized void close() throws JMSException 549 { 550 if (closed.get()) 551 return; 552 if (trace) 553 log.trace("Closing connection " + this); 554 555 closing.set(true); 556 557 exceptionListener = null; 559 560 JMSException exception = null; 562 563 try 564 { 565 doStop(); 566 } 567 catch (Throwable t) 568 { 569 log.trace("Error during stop", t); 570 } 571 572 if (trace) 573 log.trace("Closing sessions " + this); 574 Object [] vect = null; 575 synchronized (createdSessions) 576 { 577 vect = createdSessions.toArray(); 578 } 579 for (int i = 0; i < vect.length; i++) 580 { 581 SpySession session = (SpySession) vect[i]; 582 try 583 { 584 session.close(); 585 } 586 catch (Throwable t) 587 { 588 if (trace) 589 log.trace("Error closing session " + session, t); 590 } 591 } 592 if (trace) 593 log.trace("Closed sessions " + this); 594 595 if (trace) 596 log.trace("Notifying the server of close " + this); 597 try 598 { 599 serverIL.connectionClosing(connectionToken); 600 } 601 catch (Throwable t) 602 { 603 log.trace("Cannot close properly the connection", t); 604 } 605 606 if (trace) 607 log.trace("Stopping ping thread " + this); 608 try 609 { 610 stopPingThread(); 611 } 612 catch (Throwable t) 613 { 614 if (exception == null) 615 exception = SpyJMSException.getAsJMSException("Cannot stop the ping thread", t); 616 } 617 618 if (trace) 619 log.trace("Stopping the ClientIL service " + this); 620 try 621 { 622 stopILService(); 623 } 624 catch (Throwable t) 625 { 626 log.trace("Cannot stop the client il service", t); 627 } 628 629 closed.set(true); 632 633 if (trace) 634 log.trace("Disconnected from server " + this); 635 636 if (exception != null) 638 throw exception; 639 } 640 641 public void start() throws JMSException 642 { 643 checkClosed(); 644 checkClientID(); 645 646 if (modeStop == false) 647 return; 648 modeStop = false; 649 650 if (trace) 651 log.trace("Starting connection " + this); 652 653 try 654 { 655 serverIL.setEnabled(connectionToken, true); 656 } 657 catch (Throwable t) 658 { 659 SpyJMSException.rethrowAsJMSException("Cannot enable the connection with the JMS server", t); 660 } 661 } 662 663 public void stop() throws JMSException 664 { 665 checkClosed(); 666 checkClientID(); 667 doStop(); 668 } 669 670 public String toString() 671 { 672 StringBuffer buffer = new StringBuffer (); 673 buffer.append("Connection@").append(System.identityHashCode(this)); 674 buffer.append('['); 675 if (connectionToken != null) 676 buffer.append("token=").append(connectionToken); 677 else 678 buffer.append("clientID=").append(clientID); 679 if (closed.get()) 680 buffer.append(" CLOSED"); 681 else if (closing.get()) 682 buffer.append(" CLOSING"); 683 buffer.append(" rcvstate="); 684 if (modeStop) 685 buffer.append("STOPPED"); 686 else 687 buffer.append("STARTED"); 688 buffer.append(']'); 689 return buffer.toString(); 690 } 691 692 706 String getNewMessageID() throws JMSException 707 { 708 checkClosed(); 709 synchronized (sb) 710 { 711 sb.setLength(0); 712 sb.append(clientID); 713 sb.append('-'); 714 long time = System.currentTimeMillis(); 715 int count = 0; 716 do 717 { 718 charStack[count] = (char) ('0' + (time % 10)); 719 time = time / 10; 720 ++count; 721 } 722 while (time != 0); 723 --count; 724 for (; count >= 0; --count) 725 { 726 sb.append(charStack[count]); 727 } 728 ++lastMessageID; 729 if (lastMessageID < 0) 731 { 732 lastMessageID = 0; 733 } 734 int id = lastMessageID; 735 count = 0; 736 do 737 { 738 charStack[count] = (char) ('0' + (id % 10)); 739 id = id / 10; 740 ++count; 741 } 742 while (id != 0); 743 --count; 744 for (; count >= 0; --count) 745 { 746 sb.append(charStack[count]); 747 } 748 return sb.toString(); 749 } 750 } 751 752 761 void addConsumer(SpyConsumer consumer) throws JMSException 762 { 763 checkClosed(); 764 Subscription req = consumer.getSubscription(); 765 synchronized (subCountLock) 766 { 767 req.subscriptionId = subscriptionCounter++; 768 } 769 req.connectionToken = connectionToken; 770 if (trace) 771 log.trace("addConsumer sub=" + req); 772 773 try 774 { 775 synchronized (subscriptions) 776 { 777 subscriptions.put(new Integer (req.subscriptionId), consumer); 778 779 LinkedList ll = (LinkedList ) destinationSubscriptions.get(req.destination); 780 if (ll == null) 781 { 782 ll = new LinkedList (); 783 destinationSubscriptions.put(req.destination, ll); 784 } 785 786 ll.add(consumer); 787 } 788 789 serverIL.subscribe(connectionToken, req); 790 } 791 catch (JMSSecurityException ex) 792 { 793 removeConsumerInternal(consumer); 794 throw ex; 795 } 796 catch (Throwable t) 797 { 798 SpyJMSException.rethrowAsJMSException("Cannot subscribe to this Destination: ", t); 799 } 800 } 801 802 810 SpyMessage[] browse(Queue queue, String selector) throws JMSException 811 { 812 checkClosed(); 813 if (trace) 814 log.trace("Browsing queue=" + queue + " selector=" + selector + " " + this); 815 816 try 817 { 818 return serverIL.browse(connectionToken, queue, selector); 819 } 820 catch (Throwable t) 821 { 822 SpyJMSException.rethrowAsJMSException("Cannot browse the Queue", t); 823 throw new UnreachableStatementException(); 824 } 825 } 826 827 833 void pingServer(long clientTime) throws JMSException 834 { 835 checkClosed(); 836 trace = log.isTraceEnabled(); 837 if (trace) 838 log.trace("PING " + clientTime + " " + this); 839 840 try 841 { 842 serverIL.ping(connectionToken, clientTime); 843 } 844 catch (Throwable t) 845 { 846 SpyJMSException.rethrowAsJMSException("Cannot ping the JMS server", t); 847 } 848 } 849 850 858 SpyMessage receive(Subscription sub, long wait) throws JMSException 859 { 860 checkClosed(); 861 if (trace) 862 log.trace("Receive subscription=" + sub + " wait=" + wait); 863 864 try 865 { 866 SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait); 867 if (message != null) 868 message.createAcknowledgementRequest(sub.subscriptionId); 869 return message; 870 } 871 catch (Throwable t) 872 { 873 SpyJMSException.rethrowAsJMSException("Cannot receive ", t); 874 throw new UnreachableStatementException(); 875 } 876 } 877 878 884 void removeConsumer(SpyConsumer consumer) throws JMSException 885 { 886 checkClosed(); 887 Subscription req = consumer.getSubscription(); 888 if (trace) 889 log.trace("removeConsumer req=" + req); 890 891 try 892 { 893 serverIL.unsubscribe(connectionToken, req.subscriptionId); 894 895 removeConsumerInternal(consumer); 896 } 897 catch (Throwable t) 898 { 899 SpyJMSException.rethrowAsJMSException("Cannot unsubscribe to this destination", t); 900 } 901 902 } 903 904 910 void sendToServer(SpyMessage mes) throws JMSException 911 { 912 checkClosed(); 913 if (trace) 914 log.trace("SendToServer message=" + mes.header.jmsMessageID + " " + this); 915 916 try 917 { 918 serverIL.addMessage(connectionToken, mes); 919 } 920 catch (Throwable t) 921 { 922 SpyJMSException.rethrowAsJMSException("Cannot send a message to the JMS server", t); 923 } 924 } 925 926 931 void sessionClosing(SpySession who) 932 { 933 if (trace) 934 log.trace("Closing session " + who); 935 936 synchronized (createdSessions) 937 { 938 createdSessions.remove(who); 939 } 940 941 } 944 945 void unsubscribe(DurableSubscriptionID id) throws JMSException 946 { 947 if (trace) 948 log.trace("Unsubscribe id=" + id + " " + this); 949 950 try 951 { 952 serverIL.destroySubscription(connectionToken, id); 953 } 954 catch (Throwable t) 955 { 956 SpyJMSException.rethrowAsJMSException("Cannot destroy durable subscription " + id, t); 957 } 958 } 959 960 965 void checkTemporary(Destination destination) throws JMSException 966 { 967 if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic ) 968 { 969 synchronized (temps) 970 { 971 if (temps.contains(destination) == false) 972 throw new JMSException ("Cannot create a consumer for a temporary destination from a different session. " + destination); 973 } 974 } 975 } 976 977 987 synchronized protected void checkClientID() throws JMSException 988 { 989 if (setClientIdAllowed == false) 990 return; 991 992 setClientIdAllowed = false; 993 if (trace) 994 log.trace("Checking clientID=" + clientID + " " + this); 995 if (clientID == null) 996 { 997 askForAnID(); if (clientID == null) 999 throw new JMSException ("Could not get a clientID"); 1000 connectionToken.setClientID(clientID); 1001 1002 if (trace) 1003 log.trace("ClientID established " + this); 1004 } 1005 } 1006 1007 1012 protected void askForAnID() throws JMSException 1013 { 1014 if (trace) 1015 log.trace("Ask for an id " + this); 1016 1017 try 1018 { 1019 if (clientID == null) 1020 clientID = serverIL.getID(); 1021 } 1022 catch (Throwable t) 1023 { 1024 SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t); 1025 } 1026 } 1027 1028 1035 protected void askForAnID(String userName, String password) throws JMSException 1036 { 1037 if (trace) 1038 log.trace("Ask for an id user=" + userName + " " + this); 1039 1040 try 1041 { 1042 String configuredClientID = serverIL.checkUser(userName, password); 1043 if (configuredClientID != null) 1044 clientID = configuredClientID; 1045 } 1046 catch (Throwable t) 1047 { 1048 SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t); 1049 } 1050 } 1051 1052 1059 protected void authenticate(String userName, String password) throws JMSException 1060 { 1061 if (trace) 1062 log.trace("Authenticating user " + userName + " " + this); 1063 try 1064 { 1065 sessionId = serverIL.authenticate(userName, password); 1066 } 1067 catch (Throwable t) 1068 { 1069 SpyJMSException.rethrowAsJMSException("Cannot authenticate user", t); 1070 } 1071 } 1072 1073 1080 protected void send(AcknowledgementRequest item) throws JMSException 1081 { 1082 checkClosed(); 1083 if (trace) 1084 log.trace("Acknowledge item=" + item + " " + this); 1085 1086 try 1087 { 1088 serverIL.acknowledge(connectionToken, item); 1089 } 1090 catch (Throwable t) 1091 { 1092 SpyJMSException.rethrowAsJMSException("Cannot acknowlege a message", t); 1093 } 1094 } 1095 1096 1102 protected void send(TransactionRequest transaction) throws JMSException 1103 { 1104 checkClosed(); 1105 if (trace) 1106 log.trace("Transact request=" + transaction + " " + this); 1107 1108 try 1109 { 1110 serverIL.transact(connectionToken, transaction); 1111 } 1112 catch (Throwable t) 1113 { 1114 SpyJMSException.rethrowAsJMSException("Cannot process a transaction", t); 1115 } 1116 } 1117 1118 1124 protected Xid [] recover(int flags) throws JMSException 1125 { 1126 checkClosed(); 1127 if (trace) 1128 log.trace("Recover flags=" + flags + " " + this); 1129 1130 try 1131 { 1132 if (serverIL instanceof Recoverable) 1133 { 1134 Recoverable recoverableIL = (Recoverable) serverIL; 1135 return recoverableIL.recover(connectionToken, flags); 1136 } 1137 } 1138 catch (Throwable t) 1139 { 1140 SpyJMSException.rethrowAsJMSException("Cannot recover", t); 1141 } 1142 1143 log.warn(serverIL + " does not implement " + Recoverable.class.getName()); 1144 return new Xid [0]; 1145 } 1146 1147 1152 protected void startILService() throws JMSException 1153 { 1154 if (trace) 1155 log.trace("Starting the client il " + this); 1156 try 1157 { 1158 clientILService = genericConnectionFactory.createClientILService(this); 1159 clientILService.start(); 1160 if (trace) 1161 log.trace("Using client id " + clientILService + " " + this); 1162 connectionToken = new ConnectionToken(clientID, clientILService.getClientIL(), sessionId); 1163 serverIL.setConnectionToken(connectionToken); 1164 } 1165 catch (Throwable t) 1166 { 1167 SpyJMSException.rethrowAsJMSException("Cannot start a the client IL service", t); 1168 } 1169 } 1170 1171 1176 protected void stopILService() throws JMSException 1177 { 1178 try 1179 { 1180 clientILService.stop(); 1181 } 1182 catch (Throwable t) 1183 { 1184 SpyJMSException.rethrowAsJMSException("Cannot stop a the client IL service", t); 1185 } 1186 } 1187 1188 1193 public void doStop() throws JMSException 1194 { 1195 if (modeStop) 1196 return; 1197 modeStop = true; 1198 1199 if (trace) 1200 log.trace("Stopping connection " + this); 1201 1202 try 1203 { 1204 serverIL.setEnabled(connectionToken, false); 1205 } 1206 catch (Throwable t) 1207 { 1208 SpyJMSException.rethrowAsJMSException("Cannot disable the connection with the JMS server", t); 1209 } 1210 } 1211 1212 1217 private void removeConsumerInternal(SpyConsumer consumer) 1218 { 1219 synchronized (subscriptions) 1220 { 1221 Subscription req = consumer.getSubscription(); 1222 subscriptions.remove(new Integer (req.subscriptionId)); 1223 1224 LinkedList ll = (LinkedList ) destinationSubscriptions.get(req.destination); 1225 if (ll != null) 1226 { 1227 ll.remove(consumer); 1228 if (ll.size() == 0) 1229 { 1230 destinationSubscriptions.remove(req.destination); 1231 } 1232 } 1233 } 1234 } 1235 1236 1241 protected void checkClosed() throws IllegalStateException 1242 { 1243 if (closed.get()) 1244 throw new IllegalStateException ("The connection is closed"); 1245 } 1246 1247 1250 private void startPingThread() 1251 { 1252 if (pingPeriod == 0) 1254 return; 1255 pingTaskId = clockDaemon.executePeriodically(pingPeriod, new PingTask(), true); 1256 } 1257 1258 1261 private void stopPingThread() 1262 { 1263 if (pingPeriod == 0) 1265 return; 1266 1267 ClockDaemon.cancel(pingTaskId); 1268 1269 try 1271 { 1272 pingTaskSemaphore.attempt(1000 * 10); 1273 } 1274 catch (InterruptedException e) 1275 { 1276 Thread.currentThread().interrupt(); 1277 } 1278 } 1279 1280 1283 class PingTask implements Runnable 1284 { 1285 1288 public void run() 1289 { 1290 try 1291 { 1292 pingTaskSemaphore.acquire(); 1293 } 1294 catch (InterruptedException e) 1295 { 1296 log.debug("Interrupted requesting ping semaphore"); 1297 return; 1298 } 1299 try 1300 { 1301 if (ponged == false) 1302 { 1303 throw new SpyJMSException("No pong received", new IOException ("ping timeout.")); 1306 } 1307 1308 ponged = false; 1309 pingServer(System.currentTimeMillis()); 1310 } 1311 catch (Throwable t) 1312 { 1313 asynchFailure("Unexpected ping failure", t); 1314 } 1315 finally 1316 { 1317 pingTaskSemaphore.release(); 1318 } 1319 } 1320 } 1321 1322 1325 class ExceptionListenerRunnable implements Runnable 1326 { 1327 ExceptionListener el; 1328 JMSException excep; 1329 1330 1336 public ExceptionListenerRunnable(ExceptionListener el, JMSException excep) 1337 { 1338 this.el = el; 1339 this.excep = excep; 1340 } 1341 1342 public void run() 1343 { 1344 try 1345 { 1346 synchronized (elLock) 1347 { 1348 el.onException(excep); 1349 } 1350 } 1351 catch (Throwable t) 1352 { 1353 log.warn("Connection failure: ", excep); 1354 log.warn("Exception listener ended abnormally: ", t); 1355 } 1356 1357 synchronized (elLock) 1358 { 1359 elThread = null; 1360 } 1361 } 1362 } 1363} 1364 | Popular Tags |