1 22 package org.jboss.mq; 23 24 import java.io.Serializable ; 25 import java.util.ArrayList ; 26 import java.util.HashSet ; 27 import java.util.Iterator ; 28 import java.util.LinkedList ; 29 30 import javax.jms.BytesMessage ; 31 import javax.jms.Destination ; 32 import javax.jms.IllegalStateException ; 33 import javax.jms.InvalidDestinationException ; 34 import javax.jms.JMSException ; 35 import javax.jms.JMSSecurityException ; 36 import javax.jms.MapMessage ; 37 import javax.jms.Message ; 38 import javax.jms.MessageConsumer ; 39 import javax.jms.MessageListener ; 40 import javax.jms.MessageProducer ; 41 import javax.jms.ObjectMessage ; 42 import javax.jms.Queue ; 43 import javax.jms.QueueBrowser ; 44 import javax.jms.QueueReceiver ; 45 import javax.jms.QueueSender ; 46 import javax.jms.Session ; 47 import javax.jms.StreamMessage ; 48 import javax.jms.TemporaryQueue ; 49 import javax.jms.TemporaryTopic ; 50 import javax.jms.TextMessage ; 51 import javax.jms.Topic ; 52 import javax.jms.TopicPublisher ; 53 import javax.jms.TopicSubscriber ; 54 import javax.jms.XASession ; 55 import javax.transaction.xa.XAResource ; 56 57 import org.jboss.logging.Logger; 58 59 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 60 61 68 public class SpySession implements Session , XASession 69 { 70 71 static Logger log = Logger.getLogger(SpySession.class); 72 73 74 static boolean trace = log.isTraceEnabled(); 75 76 77 public Connection connection; 78 79 80 public boolean running; 81 82 protected boolean transacted; 83 84 protected int acknowledgeMode; 85 86 protected HashSet consumers; 87 88 protected HashSet producers; 89 90 protected Object deliveryLock = new Object (); 91 92 protected boolean inDelivery = false; 93 94 99 SpyMessageConsumer sessionConsumer; 100 101 102 SynchronizedBoolean closed = new SynchronizedBoolean(false); 103 104 105 Object runLock = new Object (); 106 107 111 private Object currentTransactionId; 112 113 114 SpyXAResource spyXAResource; 115 116 117 LinkedList messages = new LinkedList (); 118 119 120 ArrayList unacknowledgedMessages = new ArrayList (); 121 122 130 SpySession(Connection conn, boolean trans, int acknowledge, boolean xaSession) 131 { 132 trace = log.isTraceEnabled(); 133 134 connection = conn; 135 transacted = trans; 136 acknowledgeMode = acknowledge; 137 if (xaSession) 138 spyXAResource = new SpyXAResource(this); 139 140 running = true; 141 consumers = new HashSet (); 142 producers = new HashSet (); 143 144 if (spyXAResource == null && transacted) 146 currentTransactionId = connection.spyXAResourceManager.startTx(); 147 148 if (trace) 149 log.trace("New session " + this); 150 } 151 152 173 public void doAcknowledge(Message message, AcknowledgementRequest ack) throws JMSException 174 { 175 checkClosed(); 176 if (ack.isAck()) 178 { 179 synchronized (unacknowledgedMessages) 180 { 181 if (trace) 182 log.trace("Acknowledging message " + ack); 183 184 connection.send(((SpyMessage) message).getAcknowledgementRequest(true)); 186 unacknowledgedMessages.remove(message); 187 188 Iterator i = unacknowledgedMessages.iterator(); 190 while (i.hasNext()) 191 { 192 Message mess = (Message ) i.next(); 193 i.remove(); 194 connection.send(((SpyMessage) mess).getAcknowledgementRequest(true)); 195 } 196 } 197 } 198 else 200 { 201 if (trace) 202 log.trace("Nacking message " + message.getJMSMessageID()); 203 204 unacknowledgedMessages.remove(message); 206 connection.send(ack); 207 } 208 } 209 210 215 public SpyXAResourceManager getXAResourceManager() 216 { 217 return connection.spyXAResourceManager; 218 } 219 220 public void setMessageListener(MessageListener listener) throws JMSException 221 { 222 checkClosed(); 223 224 if (trace) 225 log.trace("Set message listener " + listener + " " + this); 226 227 sessionConsumer = new SpyMessageConsumer(this, true); 228 sessionConsumer.setMessageListener(listener); 229 } 230 231 public boolean getTransacted() throws JMSException 232 { 233 checkClosed(); 234 return transacted; 235 } 236 237 public MessageListener getMessageListener() throws JMSException 238 { 239 checkClosed(); 240 if (sessionConsumer == null) 241 return null; 242 243 return sessionConsumer.getMessageListener(); 244 } 245 246 public BytesMessage createBytesMessage() throws JMSException 247 { 248 checkClosed(); 249 SpyBytesMessage message = MessagePool.getBytesMessage(); 250 message.header.producerClientId = connection.getClientID(); 251 return message; 252 } 253 254 public MapMessage createMapMessage() throws JMSException 255 { 256 checkClosed(); 257 SpyMapMessage message = MessagePool.getMapMessage(); 258 message.header.producerClientId = connection.getClientID(); 259 return message; 260 } 261 262 public Message createMessage() throws JMSException 263 { 264 checkClosed(); 265 SpyMessage message = MessagePool.getMessage(); 266 message.header.producerClientId = connection.getClientID(); 267 return message; 268 } 269 270 public ObjectMessage createObjectMessage() throws JMSException 271 { 272 checkClosed(); 273 SpyObjectMessage message = MessagePool.getObjectMessage(); 274 message.header.producerClientId = connection.getClientID(); 275 return message; 276 } 277 278 public ObjectMessage createObjectMessage(Serializable object) throws JMSException 279 { 280 checkClosed(); 281 SpyObjectMessage message = MessagePool.getObjectMessage(); 282 message.setObject(object); 283 message.header.producerClientId = connection.getClientID(); 284 return message; 285 } 286 287 public StreamMessage createStreamMessage() throws JMSException 288 { 289 checkClosed(); 290 SpyStreamMessage message = MessagePool.getStreamMessage(); 291 message.header.producerClientId = connection.getClientID(); 292 return message; 293 } 294 295 public TextMessage createTextMessage() throws JMSException 296 { 297 checkClosed(); 298 SpyTextMessage message = MessagePool.getTextMessage(); 299 message.header.producerClientId = connection.getClientID(); 300 return message; 301 } 302 303 public void run() 305 { 306 synchronized (messages) 307 { 308 if (trace) 309 log.trace("Run messages=" + messages.size() + " " + this); 310 while (messages.size() > 0) 311 { 312 SpyMessage message = (SpyMessage) messages.removeFirst(); 313 try 314 { 315 if (sessionConsumer == null) 316 { 317 log.warn("Session has no message listener set, cannot process message. " + this); 318 connection.send(message.getAcknowledgementRequest(false)); 320 } 321 else 322 { 323 sessionConsumer.addMessage(message); 324 } 325 } 326 catch (Throwable ignore) 327 { 328 if (trace) 329 log.trace("Ignored error from session consumer", ignore); 330 } 331 } 332 } 333 } 334 335 public void close() throws JMSException 336 { 337 if (closed.set(true)) 338 return; 339 340 if (trace) 341 log.trace("Session closing " + this); 342 343 JMSException exception = null; 344 345 if (trace) 346 log.trace("Closing consumers " + this); 347 348 Iterator i; 349 synchronized (consumers) 350 { 351 if (sessionConsumer != null) 353 { 354 try 355 { 356 sessionConsumer.close(); 357 } 358 catch (Throwable t) 359 { 360 log.trace("Error closing session consumer", t); 361 } 362 } 363 364 i = new ArrayList (consumers).iterator(); 365 } 366 367 while (i.hasNext()) 368 { 369 SpyMessageConsumer messageConsumer = (SpyMessageConsumer) i.next(); 370 try 371 { 372 messageConsumer.close(); 373 } 374 catch (Throwable t) 375 { 376 log.trace("Error closing message consumer", t); 377 } 378 } 379 380 synchronized (producers) 381 { 382 i = new ArrayList (producers).iterator(); 383 } 384 385 while (i.hasNext()) 386 { 387 SpyMessageProducer messageProducer = (SpyMessageProducer) i.next(); 388 try 389 { 390 messageProducer.close(); 391 } 392 catch (InvalidDestinationException ignored) 393 { 394 log.warn(ignored.getMessage(), ignored); 395 } 396 catch (Throwable t) 397 { 398 log.trace("Error closing message producer", t); 399 } 400 } 401 402 if (trace) 403 log.trace("Close handling unacknowledged messages " + this); 404 try 405 { 406 if (spyXAResource == null) 407 { 408 if (transacted) 409 internalRollback(); 410 else 411 { 412 i = unacknowledgedMessages.iterator(); 413 while (i.hasNext()) 414 { 415 SpyMessage message = (SpyMessage) i.next(); 416 connection.send(message.getAcknowledgementRequest(false)); 417 i.remove(); 418 } 419 } 420 } 421 } 422 catch (Throwable t) 423 { 424 if (exception == null) 425 exception = SpyJMSException.getAsJMSException("Error nacking message", t); 426 } 427 428 if (trace) 429 log.trace("Informing connection of close " + this); 430 connection.sessionClosing(this); 431 432 if (exception != null) 434 throw exception; 435 } 436 437 public void commit() throws JMSException 439 { 440 checkClosed(); 441 trace = log.isTraceEnabled(); 442 443 synchronized (runLock) 445 { 446 if (spyXAResource != null) 447 throw new javax.jms.TransactionInProgressException ("Should not be call from a XASession"); 448 if (!transacted) 449 throw new IllegalStateException ("The session is not transacted"); 450 451 if (trace) 452 log.trace("Committing transaction " + this); 453 try 454 { 455 connection.spyXAResourceManager.endTx(currentTransactionId, true); 456 connection.spyXAResourceManager.commit(currentTransactionId, true); 457 } 458 catch (Throwable t) 459 { 460 SpyJMSException.rethrowAsJMSException("Could not commit", t); 461 } 462 finally 463 { 464 unacknowledgedMessages.clear(); 465 try 466 { 467 currentTransactionId = connection.spyXAResourceManager.startTx(); 468 469 if (trace) 470 log.trace("Current transaction id: " + currentTransactionId + " " + this); 471 } 472 catch (Throwable ignore) 473 { 474 if (trace) 475 log.trace("Failed to start tx " + this, ignore); 476 } 477 } 478 } 479 } 480 481 public void rollback() throws JMSException 482 { 483 checkClosed(); 484 trace = log.isTraceEnabled(); 485 486 synchronized (runLock) 487 { 488 internalRollback(); 489 } 490 } 491 492 public void recover() throws JMSException 493 { 494 checkClosed(); 495 boolean stopped = connection.modeStop; 496 497 synchronized (runLock) 498 { 499 if (currentTransactionId != null) 500 throw new IllegalStateException ("The session is transacted"); 501 502 if (trace) 503 log.trace("Session recovery stopping delivery " + this); 504 try 505 { 506 connection.stop(); 507 running = false; 508 } 509 catch (Throwable t) 510 { 511 SpyJMSException.rethrowAsJMSException("Could not stop message delivery", t); 512 } 513 514 try 517 { 518 synchronized (messages) 519 { 520 if (stopped == false) 521 { 522 if (trace) 523 log.trace("Recovering: unacknowledged messages=" + unacknowledgedMessages + " " + this); 524 Iterator i = consumers.iterator(); 525 while (i.hasNext()) 526 { 527 SpyMessageConsumer consumer = (SpyMessageConsumer) i.next(); 528 529 Iterator ii = unacknowledgedMessages.iterator(); 530 while (ii.hasNext()) 531 { 532 SpyMessage message = (SpyMessage) ii.next(); 533 534 if (consumer.getSubscription().accepts(message.header)) 535 { 536 message.setJMSRedelivered(true); 537 consumer.messages.addLast(message); 538 ii.remove(); 539 if (trace) 540 log.trace("Recovered: message=" + message + " consumer=" + consumer); 541 } 542 } 543 } 544 } 545 546 Iterator i = unacknowledgedMessages.iterator(); 548 while (i.hasNext()) 549 { 550 SpyMessage message = (SpyMessage) i.next(); 551 connection.send(message.getAcknowledgementRequest(false)); 552 i.remove(); 553 if (trace) 554 log.trace("Recovered: nacked with no consumer message=" + message + " " + this); 555 } 556 } 557 } 558 catch (Throwable t) 559 { 560 SpyJMSException.rethrowAsJMSException("Unable to recover session ", t); 561 } 562 568 if (stopped == false) 569 { 570 if (trace) 571 log.trace("Recovery restarting message delivery " + this); 572 try 573 { 574 running = true; 575 connection.start(); 576 577 Iterator i = consumers.iterator(); 578 while (i.hasNext()) 579 ((SpyMessageConsumer) i.next()).restartProcessing(); 580 } 581 catch (Throwable t) 582 { 583 SpyJMSException.rethrowAsJMSException("Could not resume message delivery", t); 584 } 585 } 586 } 587 } 588 589 public TextMessage createTextMessage(String string) throws JMSException 590 { 591 checkClosed(); 592 SpyTextMessage message = new SpyTextMessage(); 593 message.setText(string); 594 message.header.producerClientId = connection.getClientID(); 595 return message; 596 } 597 598 public int getAcknowledgeMode() throws JMSException 599 { 600 return acknowledgeMode; 601 } 602 603 public MessageConsumer createConsumer(Destination destination) throws JMSException 604 { 605 return createConsumer(destination, null, false); 606 } 607 608 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException 609 { 610 return createConsumer(destination, messageSelector, false); 611 } 612 613 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) 614 throws JMSException 615 { 616 if (destination instanceof Topic ) 617 return createSubscriber((Topic ) destination, messageSelector, noLocal); 618 else 619 return createReceiver((Queue ) destination, messageSelector); 620 } 621 622 public MessageProducer createProducer(Destination destination) throws JMSException 623 { 624 if (destination instanceof Topic ) 625 return createPublisher((Topic ) destination); 626 else 627 return createSender((Queue ) destination); 628 } 629 630 public QueueBrowser createBrowser(Queue queue) throws JMSException 631 { 632 return createBrowser(queue, null); 633 } 634 635 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException 636 { 637 checkClosed(); 638 if (this instanceof SpyTopicSession) 639 throw new IllegalStateException ("Not allowed for a TopicSession"); 640 if (queue == null) 641 throw new InvalidDestinationException ("Cannot browse a null queue."); 642 return new SpyQueueBrowser(this, queue, messageSelector); 643 } 644 645 public QueueReceiver createReceiver(Queue queue) throws JMSException 646 { 647 return createReceiver(queue, null); 648 } 649 650 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException 651 { 652 checkClosed(); 653 if (queue == null) 654 throw new InvalidDestinationException ("Queue cannot be null."); 655 656 connection.checkTemporary(queue); 657 SpyQueueReceiver receiver = new SpyQueueReceiver(this, queue, messageSelector); 658 addConsumer(receiver); 659 660 return receiver; 661 } 662 663 public QueueSender createSender(Queue queue) throws JMSException 664 { 665 checkClosed(); 666 SpyQueueSender producer = new SpyQueueSender(this, queue); 667 addProducer(producer); 668 return producer; 669 } 670 671 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException 672 { 673 return createDurableSubscriber(topic, name, null, false); 674 } 675 676 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) 677 throws JMSException 678 { 679 checkClosed(); 680 if (this instanceof SpyQueueSession) 681 throw new IllegalStateException ("Not allowed for a QueueSession"); 682 if (topic == null) 683 throw new InvalidDestinationException ("Topic cannot be null"); 684 if (topic instanceof TemporaryTopic ) 685 throw new InvalidDestinationException ("Attempt to create a durable subscription for a temporary topic"); 686 687 if (name == null || name.trim().length() == 0) 688 throw new JMSException ("Null or empty subscription"); 689 690 SpyTopic t = new SpyTopic((SpyTopic) topic, connection.getClientID(), name, messageSelector); 691 SpyTopicSubscriber sub = new SpyTopicSubscriber(this, t, noLocal, messageSelector); 692 addConsumer(sub); 693 694 return sub; 695 } 696 697 public TopicSubscriber createSubscriber(Topic topic) throws JMSException 698 { 699 return createSubscriber(topic, null, false); 700 } 701 702 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException 703 { 704 checkClosed(); 705 if (topic == null) 706 throw new InvalidDestinationException ("Topic cannot be null"); 707 708 connection.checkTemporary(topic); 709 SpyTopicSubscriber sub = new SpyTopicSubscriber(this, (SpyTopic) topic, noLocal, messageSelector); 710 addConsumer(sub); 711 712 return sub; 713 } 714 715 public TopicPublisher createPublisher(Topic topic) throws JMSException 716 { 717 checkClosed(); 718 SpyTopicPublisher producer = new SpyTopicPublisher(this, topic); 719 addProducer(producer); 720 return producer; 721 } 722 723 public Queue createQueue(String queueName) throws JMSException 724 { 725 checkClosed(); 726 if (this instanceof SpyTopicSession) 727 throw new IllegalStateException ("Not allowed for a TopicSession"); 728 if (queueName == null) 729 throw new InvalidDestinationException ("Queue name cannot be null."); 730 return ((SpyConnection) connection).createQueue(queueName); 731 } 732 733 public Topic createTopic(String topicName) throws JMSException 734 { 735 checkClosed(); 736 if (this instanceof SpyQueueSession) 737 throw new IllegalStateException ("Not allowed for a QueueSession"); 738 if (topicName == null) 739 throw new InvalidDestinationException ("The topic name cannot be null"); 740 741 return ((SpyConnection) connection).createTopic(topicName); 742 } 743 744 public TemporaryQueue createTemporaryQueue() throws JMSException 745 { 746 checkClosed(); 747 if (this instanceof SpyTopicSession) 748 throw new IllegalStateException ("Not allowed for a TopicSession"); 749 750 return ((SpyConnection) connection).getTemporaryQueue(); 751 } 752 753 public TemporaryTopic createTemporaryTopic() throws JMSException 754 { 755 checkClosed(); 756 if (this instanceof SpyQueueSession) 757 throw new IllegalStateException ("Not allowed for a QueueSession"); 758 return ((SpyConnection) connection).getTemporaryTopic(); 759 } 760 761 public void unsubscribe(String name) throws JMSException 762 { 763 checkClosed(); 764 if (this instanceof SpyQueueSession) 765 throw new IllegalStateException ("Not allowed for a QueueSession"); 766 767 DurableSubscriptionID id = new DurableSubscriptionID(connection.getClientID(), name, null); 769 connection.unsubscribe(id); 770 } 771 772 public XAResource getXAResource() 773 { 774 return spyXAResource; 775 } 776 777 public Session getSession() throws JMSException 778 { 779 checkClosed(); 780 return this; 781 } 782 783 public String toString() 784 { 785 StringBuffer buffer = new StringBuffer (100); 786 buffer.append("SpySession@").append(System.identityHashCode(this)); 787 buffer.append('['); 788 buffer.append("tx=").append(transacted); 789 if (transacted == false) 790 { 791 if (acknowledgeMode == AUTO_ACKNOWLEDGE) 792 buffer.append(" ack=").append("AUTO"); 793 else if (acknowledgeMode == CLIENT_ACKNOWLEDGE) 794 buffer.append(" ack=").append("CLIENT"); 795 else if (acknowledgeMode == DUPS_OK_ACKNOWLEDGE) 796 buffer.append(" ack=").append("DUPSOK"); 797 } 798 buffer.append(" txid=" + currentTransactionId); 799 if (spyXAResource != null) 800 buffer.append(" XA"); 801 if (running) 802 buffer.append(" RUNNING"); 803 if (closed.get()) 804 buffer.append(" CLOSED"); 805 buffer.append(" connection=").append(connection); 806 buffer.append(']'); 807 return buffer.toString(); 808 } 809 810 815 void setCurrentTransactionId(final Object xid) 816 { 817 if (xid == null) 818 throw new org.jboss.util.NullArgumentException("xid"); 819 820 if (trace) 821 log.trace("Setting current tx xid=" + xid + " previous: " + currentTransactionId + " " + this); 822 823 this.currentTransactionId = xid; 824 } 825 826 831 void unsetCurrentTransactionId(final Object xid) 832 { 833 if (xid == null) 834 throw new org.jboss.util.NullArgumentException("xid"); 835 836 if (trace) 837 log.trace("Unsetting current tx xid=" + xid + " previous: " + currentTransactionId + " " + this); 838 839 if (xid.equals(currentTransactionId)) 842 this.currentTransactionId = null; 843 } 844 845 850 Object getCurrentTransactionId() 851 { 852 return currentTransactionId; 853 } 854 855 861 String getNewMessageID() throws JMSException 862 { 863 checkClosed(); 864 return connection.getNewMessageID(); 865 } 866 867 872 void addMessage(SpyMessage message) 873 { 874 synchronized (messages) 875 { 876 if (trace) 877 log.trace("Add message msgid=" + message.header.jmsMessageID + " " + this); 878 messages.addLast(message); 879 } 880 } 881 882 887 void addUnacknowlegedMessage(SpyMessage message) 888 { 889 if (!transacted) 890 { 891 synchronized (unacknowledgedMessages) 892 { 893 if (trace) 894 log.trace("Add unacked message msgid=" + message.header.jmsMessageID + " " + this); 895 896 unacknowledgedMessages.add(message); 897 } 898 } 899 } 900 901 907 void sendMessage(SpyMessage m) throws JMSException 908 { 909 checkClosed(); 910 911 m.header.producerClientId = connection.getClientID(); 913 914 if (transacted) 915 { 916 if (trace) 917 log.trace("Adding message to transaction " + m.header.jmsMessageID + " " + this); 918 connection.spyXAResourceManager.addMessage(currentTransactionId, m.myClone()); 919 } 920 else 921 { 922 if (trace) 923 log.trace("Sending message to server " + m.header.jmsMessageID + " " + this); 924 connection.sendToServer(m); 925 } 926 } 927 928 934 void addConsumer(SpyMessageConsumer who) throws JMSException 935 { 936 checkClosed(); 937 938 synchronized (consumers) 939 { 940 if (trace) 941 log.trace("Adding consumer " + who); 942 943 consumers.add(who); 944 } 945 try 946 { 947 connection.addConsumer(who); 948 } 949 catch (JMSSecurityException ex) 950 { 951 removeConsumerInternal(who); 952 throw ex; 953 } 954 catch (Throwable t) 955 { 956 SpyJMSException.rethrowAsJMSException("Error adding consumer", t); 957 } 958 } 959 960 966 void removeConsumer(SpyMessageConsumer who) throws JMSException 967 { 968 connection.removeConsumer(who); 969 removeConsumerInternal(who); 970 } 971 972 978 void addProducer(SpyMessageProducer who) throws JMSException 979 { 980 checkClosed(); 981 982 synchronized (producers) 983 { 984 if (trace) 985 log.trace("Adding producer " + who); 986 987 producers.add(who); 988 } 989 } 990 991 997 void removeProducer(SpyMessageProducer who) throws JMSException 998 { 999 removeProducerInternal(who); 1000 } 1001 1002 1007 boolean tryDeliveryLock() 1008 { 1009 synchronized (deliveryLock) 1010 { 1011 if (inDelivery) 1012 { 1013 try 1014 { 1015 deliveryLock.wait(); 1016 } 1017 catch (InterruptedException e) 1018 { 1019 log.trace("Ignored interruption waiting for delivery lock"); 1020 } 1021 } 1022 if (inDelivery == false) 1024 { 1025 inDelivery = true; 1026 return true; 1027 } 1028 } 1029 return false; 1030 } 1031 1032 1035 void releaseDeliveryLock() 1036 { 1037 synchronized (deliveryLock) 1038 { 1039 inDelivery = false; 1040 deliveryLock.notifyAll(); 1041 } 1042 } 1043 1044 1047 void interruptDeliveryLockWaiters() 1048 { 1049 synchronized (deliveryLock) 1050 { 1051 deliveryLock.notifyAll(); 1052 } 1053 } 1054 1055 1061 void asynchFailure(String message, Throwable t) 1062 { 1063 connection.asynchFailure(message, t); 1064 } 1065 1066 1071 private void internalRollback() throws JMSException 1072 { 1073 synchronized (runLock) 1074 { 1075 if (spyXAResource != null) 1076 throw new javax.jms.TransactionInProgressException ("Should not be call from a XASession"); 1077 if (!transacted) 1078 throw new IllegalStateException ("The session is not transacted"); 1079 1080 if (trace) 1081 log.trace("Rollback transaction " + this); 1082 try 1083 { 1084 connection.spyXAResourceManager.endTx(currentTransactionId, true); 1085 connection.spyXAResourceManager.rollback(currentTransactionId); 1086 } 1087 catch (Throwable t) 1088 { 1089 SpyJMSException.rethrowAsJMSException("Could not rollback", t); 1090 } 1091 finally 1092 { 1093 unacknowledgedMessages.clear(); 1094 try 1095 { 1096 currentTransactionId = connection.spyXAResourceManager.startTx(); 1097 if (trace) 1098 log.trace("Current transaction id: " + currentTransactionId + " " + this); 1099 } 1100 catch (Throwable ignore) 1101 { 1102 if (trace) 1103 log.trace("Failed to start tx " + this, ignore); 1104 } 1105 } 1106 } 1107 } 1108 1109 1114 private void removeConsumerInternal(SpyMessageConsumer who) 1115 { 1116 synchronized (consumers) 1117 { 1118 if (trace) 1119 log.trace("Remove consumer " + who); 1120 1121 consumers.remove(who); 1122 } 1123 } 1124 1125 1130 private void removeProducerInternal(SpyMessageProducer who) 1131 { 1132 synchronized (producers) 1133 { 1134 if (trace) 1135 log.trace("Remove producer " + who); 1136 1137 producers.remove(who); 1138 } 1139 } 1140 1141 1146 private void checkClosed() throws IllegalStateException 1147 { 1148 if (closed.get()) 1149 throw new IllegalStateException ("The session is closed"); 1150 } 1151} 1152 | Popular Tags |