| 1 24 package org.objectweb.joram.client.jms; 25 26 import java.util.*; 27 28 import javax.jms.JMSException ; 29 import javax.jms.TransactionRolledBackException ; 30 import javax.jms.IllegalStateException ; 31 import javax.jms.MessageFormatException ; 32 33 import org.objectweb.joram.client.jms.connection.RequestMultiplexer; 34 import org.objectweb.joram.client.jms.connection.Requestor; 35 36 import org.objectweb.joram.shared.client.*; 37 38 import org.objectweb.util.monolog.api.BasicLevel; 39 import org.objectweb.util.monolog.api.Logger; 40 import fr.dyade.aaa.util.Debug; 41 42 45 public class Session implements javax.jms.Session { 46 47 public static Logger logger = Debug.getLogger(Session.class.getName()); 48 49 50 public static final String RECEIVE_ACK = 51 "org.objectweb.joram.client.jms.receiveAck"; 52 53 public static boolean receiveAck = Boolean.getBoolean(RECEIVE_ACK); 54 55 58 private static class Status { 59 63 public static final int STOP = 0; 64 65 68 public static final int START = 1; 69 70 73 public static final int CLOSE = 2; 74 75 private static final String [] names = { 76 "STOP", "START", "CLOSE"}; 77 78 public static String toString(int status) { 79 return names[status]; 80 } 81 } 82 83 86 private static class SessionMode { 87 91 public static final int NONE = 0; 92 93 96 public static final int RECEIVE = 1; 97 98 101 public static final int LISTENER = 2; 102 103 106 public static final int APP_SERVER = 3; 107 108 private static final String [] names = { 109 "NONE", "RECEIVE", "LISTENER", "APP_SERVER"}; 110 111 public static String toString(int status) { 112 return names[status]; 113 } 114 } 115 116 120 private static class RequestStatus { 121 122 public static final int NONE = 0; 123 124 public static final int RUN = 1; 125 126 public static final int DONE = 2; 127 128 private static final String [] names = { 129 "NONE", "RUN", "DONE"}; 130 131 public static String toString(int status) { 132 return names[status]; 133 } 134 } 135 136 137 private SessionCloseTask closingTask; 138 139 140 private boolean scheduled; 141 142 143 protected javax.jms.MessageListener messageListener; 144 145 146 private String ident; 147 148 149 private Connection cnx; 150 151 152 boolean transacted; 153 154 155 private int acknowledgeMode; 156 157 158 private boolean autoAck; 159 160 161 private Vector consumers; 162 163 164 private Vector producers; 165 166 167 private Vector browsers; 168 169 170 private fr.dyade.aaa.util.Queue repliesIn; 171 172 173 private SessionDaemon daemon; 174 175 176 private int listenerCount; 177 178 185 Hashtable sendings; 186 187 194 Hashtable deliveries; 195 196 199 private RequestMultiplexer mtpx; 200 201 204 private Requestor requestor; 205 206 211 private Requestor receiveRequestor; 212 213 218 private boolean recover; 219 220 223 private int status; 224 225 228 private int sessionMode; 229 230 233 private int requestStatus; 234 235 238 private MessageConsumer pendingMessageConsumer; 239 240 243 private Thread singleThreadOfControl; 244 245 249 private boolean passiveMsgInput; 250 251 254 private Closer closer; 255 256 260 private boolean asyncSend; 261 262 265 private int queueMessageReadMax; 266 267 271 private int topicAckBufferMax; 272 273 277 private int topicPassivationThreshold; 278 279 283 private int topicActivationThreshold; 284 285 private MessageConsumerListener messageConsumerListener; 286 287 296 Session(Connection cnx, 297 boolean transacted, 298 int acknowledgeMode, 299 RequestMultiplexer mtpx) 300 throws JMSException { 301 if (! transacted 302 && acknowledgeMode != javax.jms.Session.AUTO_ACKNOWLEDGE 303 && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE 304 && acknowledgeMode != javax.jms.Session.DUPS_OK_ACKNOWLEDGE 305 && !(cnx instanceof XAQueueConnection) 306 && !(cnx instanceof XATopicConnection) 307 && !(cnx instanceof XAConnection)) 308 throw new JMSException ("Can't create a non transacted session with an" 309 + " invalid acknowledge mode."); 310 311 this.ident = cnx.nextSessionId(); 312 this.cnx = cnx; 313 this.transacted = transacted; 314 this.acknowledgeMode = acknowledgeMode; 315 this.mtpx = mtpx; 316 requestor = new Requestor(mtpx); 317 receiveRequestor = new Requestor(mtpx); 318 319 autoAck = ! transacted 320 && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE; 321 322 consumers = new Vector(); 323 producers = new Vector(); 324 browsers = new Vector(); 325 repliesIn = new fr.dyade.aaa.util.Queue(); 326 sendings = new Hashtable(); 327 deliveries = new Hashtable(); 328 329 closer = new Closer(); 330 331 if (transacted && cnx.getTxPendingTimer() > 0) { 334 closingTask = new SessionCloseTask( 335 cnx.getTxPendingTimer() * 1000); 336 } 337 338 asyncSend = cnx.getAsyncSend(); 339 queueMessageReadMax = cnx.getQueueMessageReadMax(); 340 topicAckBufferMax = cnx.getTopicAckBufferMax(); 341 topicActivationThreshold = cnx.getTopicActivationThreshold(); 342 topicPassivationThreshold = cnx.getTopicPassivationThreshold(); 343 344 setStatus(Status.STOP); 345 setSessionMode(SessionMode.NONE); 346 setRequestStatus(RequestStatus.NONE); 347 } 348 349 352 private void setStatus(int status) { 353 if (logger.isLoggable(BasicLevel.DEBUG)) 354 logger.log( 355 BasicLevel.DEBUG, 356 "Session.setStatus(" + 357 Status.toString(status) + ')'); 358 this.status = status; 359 } 360 361 boolean isStarted() { 362 return (status == Status.START); 363 } 364 365 368 private void setSessionMode(int sessionMode) { 369 if (logger.isLoggable(BasicLevel.DEBUG)) 370 logger.log( 371 BasicLevel.DEBUG, 372 "Session.setSessionMode(" + 373 SessionMode.toString(sessionMode) + ')'); 374 this.sessionMode = sessionMode; 375 } 376 377 380 private void setRequestStatus(int requestStatus) { 381 if (logger.isLoggable(BasicLevel.DEBUG)) 382 logger.log( 383 BasicLevel.DEBUG, 384 "Session.setRequestStatus(" + 385 RequestStatus.toString(requestStatus) + ')'); 386 this.requestStatus = requestStatus; 387 } 388 389 393 protected synchronized void checkClosed() 394 throws IllegalStateException { 395 if (status == Status.CLOSE) 396 throw new IllegalStateException ( 397 "Forbidden call on a closed session."); 398 } 399 400 404 private synchronized void checkThreadOfControl() 405 throws IllegalStateException { 406 if (singleThreadOfControl != null && 407 Thread.currentThread() != singleThreadOfControl) 408 throw new IllegalStateException ("Illegal control thread"); 409 } 410 411 417 private void checkSessionMode( 418 int expectedSessionMode) 419 throws IllegalStateException { 420 if (sessionMode == SessionMode.NONE) { 421 setSessionMode(sessionMode); 422 } else if (sessionMode != expectedSessionMode) { 423 throw new IllegalStateException ("Bad session mode"); 424 } 425 } 426 427 428 public String toString() { 429 return "Sess:" + ident; 430 } 431 432 437 public final int getAcknowledgeMode() throws JMSException { 438 checkClosed(); 439 return getAckMode(); 440 } 441 442 int getAckMode() { 443 if (transacted) 444 return Session.SESSION_TRANSACTED; 445 return acknowledgeMode; 446 } 447 448 453 public synchronized final boolean getTransacted() 454 throws JMSException { 455 checkClosed(); 456 return transacted; 457 } 458 459 463 public void setTransacted(boolean t) { 464 if (status != Status.CLOSE) { 465 transacted = t; 466 } 467 } 470 471 476 public synchronized void setMessageListener( 477 javax.jms.MessageListener messageListener) 478 throws JMSException { 479 checkSessionMode(SessionMode.APP_SERVER); 480 this.messageListener = messageListener; 481 } 482 483 488 public synchronized javax.jms.MessageListener 489 getMessageListener() 490 throws JMSException { 491 return messageListener; 492 } 493 494 500 public synchronized javax.jms.Message createMessage() 501 throws JMSException { 502 checkClosed(); 503 return new Message(); 504 } 505 506 512 public synchronized javax.jms.TextMessage createTextMessage() 513 throws JMSException { 514 checkClosed(); 515 return new TextMessage(); 516 } 517 518 524 public synchronized javax.jms.TextMessage createTextMessage(String text) 525 throws JMSException { 526 checkClosed(); 527 TextMessage message = new TextMessage(); 528 message.setText(text); 529 return message; 530 } 531 532 538 public synchronized javax.jms.BytesMessage createBytesMessage() 539 throws JMSException { 540 checkClosed(); 541 return new BytesMessage(); 542 } 543 544 550 public synchronized javax.jms.MapMessage createMapMessage() 551 throws JMSException { 552 checkClosed(); 553 return new MapMessage(); 554 } 555 556 562 public synchronized javax.jms.ObjectMessage createObjectMessage() 563 throws JMSException { 564 checkClosed(); 565 return new ObjectMessage(); 566 } 567 568 574 public synchronized javax.jms.ObjectMessage createObjectMessage( 575 java.io.Serializable obj) 576 throws JMSException { 577 checkClosed(); 578 ObjectMessage message = new ObjectMessage(); 579 message.setObject(obj); 580 return message; 581 } 582 583 589 public synchronized javax.jms.StreamMessage createStreamMessage() 590 throws JMSException { 591 checkClosed(); 592 return new StreamMessage(); 593 } 594 595 600 public synchronized javax.jms.QueueBrowser  601 createBrowser(javax.jms.Queue queue, 602 String selector) 603 throws JMSException { 604 checkClosed(); 605 checkThreadOfControl(); 606 QueueBrowser qb = new QueueBrowser(this, (Queue) queue, selector); 607 browsers.addElement(qb); 608 return qb; 609 } 610 611 616 public synchronized javax.jms.QueueBrowser 617 createBrowser(javax.jms.Queue queue) 618 throws JMSException { 619 checkClosed(); 620 checkThreadOfControl(); 621 QueueBrowser qb = new QueueBrowser(this, (Queue) queue, null); 622 browsers.addElement(qb); 623 return qb; 624 } 625 626 634 public synchronized javax.jms.MessageProducer createProducer( 635 javax.jms.Destination dest) 636 throws JMSException { 637 checkClosed(); 638 checkThreadOfControl(); 639 MessageProducer mp = new MessageProducer( 640 this, 641 (Destination) dest); 642 addProducer(mp); 643 return mp; 644 } 645 646 655 public synchronized javax.jms.MessageConsumer  656 createConsumer(javax.jms.Destination dest, 657 String selector, 658 boolean noLocal) 659 throws JMSException { 660 checkClosed(); 661 checkThreadOfControl(); 662 MessageConsumer mc = new MessageConsumer( 663 this, (Destination) dest, 664 selector, null, 665 noLocal); 666 addConsumer(mc); 667 return mc; 668 } 669 670 679 public synchronized javax.jms.MessageConsumer  680 createConsumer(javax.jms.Destination dest, 681 String selector) 682 throws JMSException { 683 checkClosed(); 684 checkThreadOfControl(); 685 MessageConsumer mc = new MessageConsumer( 686 this, (Destination) dest, selector); 687 addConsumer(mc); 688 return mc; 689 } 690 691 699 public synchronized javax.jms.MessageConsumer 700 createConsumer(javax.jms.Destination dest) 701 throws JMSException { 702 checkClosed(); 703 checkThreadOfControl(); 704 MessageConsumer mc = new MessageConsumer( 705 this, (Destination) dest, null); 706 addConsumer(mc); 707 return mc; 708 } 709 710 717 public synchronized javax.jms.TopicSubscriber  718 createDurableSubscriber(javax.jms.Topic topic, 719 String name, 720 String selector, 721 boolean noLocal) 722 throws JMSException { 723 if (logger.isLoggable(BasicLevel.DEBUG)) 724 logger.log( 725 BasicLevel.DEBUG, 726 "Session.createDurableSubscriber(" + 727 topic + ',' + name + ',' + 728 selector + ',' + noLocal + ')'); 729 checkClosed(); 730 checkThreadOfControl(); 731 TopicSubscriber ts = new TopicSubscriber( 732 this, (Topic) topic, name, selector, noLocal); 733 addConsumer(ts); 734 return ts; 735 } 736 737 744 public synchronized javax.jms.TopicSubscriber  745 createDurableSubscriber(javax.jms.Topic topic, 746 String name) 747 throws JMSException { 748 if (logger.isLoggable(BasicLevel.DEBUG)) 749 logger.log( 750 BasicLevel.DEBUG, 751 "Session.createDurableSubscriber(" + 752 topic + ',' + name + ')'); 753 checkClosed(); 754 checkThreadOfControl(); 755 TopicSubscriber ts = new TopicSubscriber( 756 this, (Topic) topic, name, null, false); 757 addConsumer(ts); 758 return ts; 759 } 760 761 766 public synchronized javax.jms.Queue createQueue( 767 String queueName) 768 throws JMSException { 769 checkClosed(); 770 return new Queue(queueName); 771 } 772 773 779 public synchronized javax.jms.Topic createTopic( 780 String topicName) 781 throws JMSException { 782 checkClosed(); 783 checkThreadOfControl(); 784 785 if (topicName.equals("#AdminTopic")) { 787 try { 788 GetAdminTopicReply reply = 789 (GetAdminTopicReply) requestor.request(new GetAdminTopicRequest()); 790 if (reply.getId() != null) 791 return new Topic(reply.getId()); 792 else 793 throw new JMSException ("AdminTopic could not be retrieved."); 794 } 795 catch (JMSException exc) { 796 throw exc; 797 } 798 catch (Exception exc) { 799 throw new JMSException ("AdminTopic could not be retrieved: " + exc); 800 } 801 } 802 return new Topic(topicName); 803 } 804 805 812 public synchronized javax.jms.TemporaryQueue createTemporaryQueue() 813 throws JMSException { 814 checkClosed(); 815 checkThreadOfControl(); 816 817 SessCreateTDReply reply = 818 (SessCreateTDReply) requestor.request(new SessCreateTQRequest()); 819 String tempDest = reply.getAgentId(); 820 return new TemporaryQueue(tempDest, cnx); 821 } 822 823 830 public synchronized javax.jms.TemporaryTopic createTemporaryTopic() 831 throws JMSException { 832 checkClosed(); 833 checkThreadOfControl(); 834 835 SessCreateTDReply reply = 836 (SessCreateTDReply) requestor.request(new SessCreateTTRequest()); 837 String tempDest = reply.getAgentId(); 838 return new TemporaryTopic(tempDest, cnx); 839 } 840 841 842 public synchronized void run() { 843 int load = repliesIn.size(); 844 845 if (logger.isLoggable(BasicLevel.DEBUG)) 846 logger.log(BasicLevel.DEBUG, 847 "-- " + this + ": loaded with " + load + 848 " message(s) and started."); 849 850 try { 851 for (int i = 0; i < load; i++) { 853 org.objectweb.joram.shared.messages.Message momMsg = 854 (org.objectweb.joram.shared.messages.Message) repliesIn.pop(); 855 String msgId = momMsg.id; 856 857 onMessage(momMsg, messageConsumerListener); 858 } 859 } catch (Exception exc) { 860 if (logger.isLoggable(BasicLevel.ERROR)) 861 logger.log(BasicLevel.ERROR, "", exc); 862 } 863 } 864 865 869 void setMessageConsumerListener(MessageConsumerListener mcl) { 870 messageConsumerListener = mcl; 871 } 872 873 879 public synchronized void commit() throws JMSException { 880 if (logger.isLoggable(BasicLevel.DEBUG)) 881 logger.log( 882 BasicLevel.DEBUG, 883 "Session.commit()"); 884 885 checkClosed(); 886 checkThreadOfControl(); 887 888 if (! transacted) 889 throw new IllegalStateException ("Can't commit a non transacted" 890 + " session."); 891 892 if (logger.isLoggable(BasicLevel.DEBUG)) 893 logger.log(BasicLevel.DEBUG, "--- " + this 894 + ": committing..."); 895 896 if (scheduled) { 898 closingTask.cancel(); 899 scheduled = false; 900 } 901 902 try { 904 CommitRequest commitReq= new CommitRequest(); 905 906 Enumeration producerMessages = sendings.elements(); 907 while (producerMessages.hasMoreElements()) { 908 ProducerMessages pM = 909 (ProducerMessages) producerMessages.nextElement(); 910 commitReq.addProducerMessages(pM); 911 } 912 sendings.clear(); 913 914 Enumeration targets = deliveries.keys(); 916 while (targets.hasMoreElements()) { 917 String target = (String ) targets.nextElement(); 918 MessageAcks acks = (MessageAcks) deliveries.get(target); 919 commitReq.addAckRequest( 920 new SessAckRequest( 921 target, 922 acks.getIds(), 923 acks.getQueueMode())); 924 } 925 deliveries.clear(); 926 927 if (asyncSend) { 928 commitReq.setAsyncSend(true); 930 mtpx.sendRequest(commitReq); 931 } else { 932 requestor.request(commitReq); 933 } 934 935 if (logger.isLoggable(BasicLevel.DEBUG)) 936 logger.log(BasicLevel.DEBUG, this + ": committed."); 937 } 938 catch (JMSException jE) { 940 if (logger.isLoggable(BasicLevel.ERROR)) 941 logger.log(BasicLevel.ERROR, "", jE); 942 TransactionRolledBackException tE = 943 new TransactionRolledBackException ("A JMSException was thrown during" 944 + " the commit."); 945 tE.setLinkedException(jE); 946 947 if (logger.isLoggable(BasicLevel.ERROR)) 948 logger.log(BasicLevel.ERROR, "Exception: " + tE); 949 950 rollback(); 951 throw tE; 952 } 953 } 954 955 961 public synchronized void rollback() throws JMSException { 962 if (logger.isLoggable(BasicLevel.DEBUG)) 963 logger.log( 964 BasicLevel.DEBUG, 965 "Session.rollback()"); 966 967 checkClosed(); 968 checkThreadOfControl(); 969 970 if (! transacted) 971 |