1 24 package com.scalagent.kjoram; 25 26 import com.scalagent.kjoram.jms.*; 27 import com.scalagent.kjoram.util.TimerTask; 28 29 import java.util.*; 30 31 import com.scalagent.kjoram.excepts.IllegalStateException; 32 import com.scalagent.kjoram.excepts.*; 33 34 35 public class Session 36 { 37 public static final int SESSION_TRANSACTED = 0; 38 public static final int AUTO_ACKNOWLEDGE = 1; 39 public static final int CLIENT_ACKNOWLEDGE = 2; 40 public static final int DUPS_OK_ACKNOWLEDGE = 3; 41 42 43 private TimerTask closingTask = null; 44 45 private boolean scheduled = false; 46 47 48 private com.scalagent.kjoram.util.Timer consumersTimer = null; 49 50 51 protected MessageListener messageListener = null; 52 53 54 String ident; 55 56 Connection cnx; 57 58 boolean transacted; 59 60 int acknowledgeMode; 61 62 boolean closed = false; 63 64 boolean started = false; 65 66 67 boolean autoAck; 68 69 70 Vector consumers; 71 72 Vector producers; 73 74 Vector browsers; 75 76 com.scalagent.kjoram.util.Queue repliesIn; 77 78 SessionDaemon daemon = null; 79 80 int msgListeners = 0; 81 88 Hashtable sendings; 89 96 Hashtable deliveries; 97 98 99 ConnectionConsumer connectionConsumer = null; 100 101 102 111 Session(Connection cnx, boolean transacted, 112 int acknowledgeMode) throws JMSException 113 { 114 if (! transacted 115 && acknowledgeMode != Session.AUTO_ACKNOWLEDGE 116 && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE 117 && acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE) 118 throw new JMSException("Can't create a non transacted session with an" 119 + " invalid acknowledge mode."); 120 121 this.ident = cnx.nextSessionId(); 122 this.cnx = cnx; 123 this.transacted = transacted; 124 this.acknowledgeMode = acknowledgeMode; 125 126 autoAck = ! transacted 127 && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE; 128 129 consumers = new Vector(); 130 producers = new Vector(); 131 browsers = new Vector(); 132 repliesIn = new com.scalagent.kjoram.util.Queue(); 133 sendings = new Hashtable(); 134 deliveries = new Hashtable(); 135 136 if (transacted && cnx.factoryParameters.txPendingTimer != 0) 139 closingTask = new SessionCloseTask(); 140 141 cnx.sessions.addElement(this); 142 143 if (JoramTracing.dbgClient) 144 JoramTracing.log(JoramTracing.DEBUG,this + ": created."); 145 } 146 147 148 public String toString() 149 { 150 return "Sess:" + ident; 151 } 152 153 154 159 public int getAcknowledgeMode() throws JMSException 160 { 161 return acknowledgeMode; 162 } 163 164 169 public boolean getTransacted() throws JMSException 170 { 171 return transacted; 172 } 173 174 179 public void setMessageListener(MessageListener messageListener) 180 throws JMSException 181 { 182 this.messageListener = messageListener; 183 } 184 185 190 public MessageListener getMessageListener() throws JMSException 191 { 192 return messageListener; 193 } 194 195 200 public Message createMessage() throws JMSException 201 { 202 if (closed) 203 throw new IllegalStateException ("Forbidden call on a closed session."); 204 205 return new Message(); 206 } 207 208 213 public TextMessage createTextMessage() throws JMSException 214 { 215 if (closed) 216 throw new IllegalStateException ("Forbidden call on a closed session."); 217 218 return new TextMessage(); 219 } 220 221 226 public TextMessage createTextMessage(String text) 227 throws JMSException 228 { 229 if (closed) 230 throw new IllegalStateException ("Forbidden call on a closed session."); 231 232 TextMessage message = new TextMessage(); 233 message.setText(text); 234 return message; 235 } 236 237 242 public BytesMessage createBytesMessage() 243 throws JMSException 244 { 245 if (closed) 246 throw new IllegalStateException ("Forbidden call on a closed session."); 247 248 return new BytesMessage(); 249 } 250 251 256 public MapMessage createMapMessage() 257 throws JMSException 258 { 259 if (closed) 260 throw new IllegalStateException ("Forbidden call on a closed session."); 261 262 return new MapMessage(); 263 } 264 265 270 public QueueBrowser 271 createBrowser(Queue queue, String selector) 272 throws JMSException 273 { 274 if (closed) 275 throw new IllegalStateException ("Forbidden call on a closed session."); 276 277 return new QueueBrowser(this, (Queue) queue, selector); 278 } 279 280 285 public QueueBrowser createBrowser(Queue queue) 286 throws JMSException 287 { 288 if (closed) 289 throw new IllegalStateException ("Forbidden call on a closed session."); 290 291 return new QueueBrowser(this, (Queue) queue, null); 292 } 293 294 301 public MessageProducer createProducer(Destination dest) 302 throws JMSException 303 { 304 if (closed) 305 throw new IllegalStateException ("Forbidden call on a closed session."); 306 307 return new MessageProducer(this, (Destination) dest); 308 } 309 310 317 public MessageConsumer 318 createConsumer(Destination dest, String selector, 319 boolean noLocal) throws JMSException 320 { 321 if (closed) 322 throw new IllegalStateException ("Forbidden call on a closed session."); 323 324 return new MessageConsumer(this, (Destination) dest, selector, null, 325 noLocal); 326 } 327 328 335 public MessageConsumer 336 createConsumer(Destination dest, String selector) 337 throws JMSException 338 { 339 if (closed) 340 throw new IllegalStateException ("Forbidden call on a closed session."); 341 342 return new MessageConsumer(this, (Destination) dest, selector); 343 } 344 345 352 public MessageConsumer createConsumer(Destination dest) 353 throws JMSException 354 { 355 if (closed) 356 throw new IllegalStateException ("Forbidden call on a closed session."); 357 358 return new MessageConsumer(this, (Destination) dest, null); 359 } 360 361 368 public TopicSubscriber 369 createDurableSubscriber(Topic topic, String name, 370 String selector, 371 boolean noLocal) throws JMSException 372 { 373 if (closed) 374 throw new IllegalStateException ("Forbidden call on a closed session."); 375 376 return new TopicSubscriber(this, (Topic) topic, name, selector, noLocal); 377 } 378 379 386 public TopicSubscriber 387 createDurableSubscriber(Topic topic, String name) 388 throws JMSException 389 { 390 if (closed) 391 throw new IllegalStateException ("Forbidden call on a closed session."); 392 393 return new TopicSubscriber(this, (Topic) topic, name, null, false); 394 } 395 396 401 public Queue createQueue(String queueName) throws JMSException 402 { 403 if (closed) 404 throw new IllegalStateException ("Forbidden call on a closed session."); 405 406 return new Queue(queueName); 407 } 408 409 415 public Topic createTopic(String topicName) throws JMSException 416 { 417 if (closed) 418 throw new IllegalStateException ("Forbidden call on a closed session."); 419 420 if (topicName.equals("#AdminTopic")) { 422 try { 423 GetAdminTopicReply reply = 424 (GetAdminTopicReply) cnx.syncRequest(new GetAdminTopicRequest()); 425 if (reply.getId() != null) 426 return new Topic(reply.getId()); 427 else 428 throw new JMSException("AdminTopic could not be retrieved."); 429 } 430 catch (JMSException exc) { 431 throw exc; 432 } 433 catch (Exception exc) { 434 throw new JMSException("AdminTopic could not be retrieved: " + exc); 435 } 436 } 437 return new Topic(topicName); 438 } 439 440 447 public TemporaryQueue createTemporaryQueue() throws JMSException 448 { 449 if (closed) 450 throw new IllegalStateException ("Forbidden call on a closed session."); 451 452 SessCreateTDReply reply = 453 (SessCreateTDReply) cnx.syncRequest(new SessCreateTQRequest()); 454 String tempDest = reply.getAgentId(); 455 return new TemporaryQueue(tempDest, cnx); 456 } 457 458 465 public TemporaryTopic createTemporaryTopic() throws JMSException 466 { 467 if (closed) 468 throw new IllegalStateException ("Forbidden call on a closed session."); 469 470 SessCreateTDReply reply = 471 (SessCreateTDReply) cnx.syncRequest(new SessCreateTTRequest()); 472 String tempDest = reply.getAgentId(); 473 return new TemporaryTopic(tempDest, cnx); 474 } 475 476 477 public synchronized void run() 478 { 479 int load = repliesIn.size(); 480 com.scalagent.kjoram.messages.Message momMsg; 481 String msgId; 482 String targetName = connectionConsumer.targetName; 483 boolean queueMode = connectionConsumer.queueMode; 484 485 if (JoramTracing.dbgClient) 486 JoramTracing.log(JoramTracing.DEBUG, "-- " + this 487 + ": loaded with " + load 488 + " message(s) and started."); 489 try { 490 for (int i = 0; i < load; i++) { 492 momMsg = (com.scalagent.kjoram.messages.Message) repliesIn.pop(); 493 msgId = momMsg.getIdentifier(); 494 495 if (messageListener == null) { 498 JoramTracing.log(JoramTracing.ERROR,this + ": an" 499 + " asynchronous delivery arrived for" 500 + " a non existing session listener:" 501 + " denying the message."); 502 503 if (queueMode) 504 cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, true)); 505 else 506 cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,false)); 507 } 508 else { 510 if (! autoAck) 512 prepareAck(targetName, msgId, queueMode); 513 514 try { 516 messageListener.onMessage(Message.wrapMomMessage(this, momMsg)); 517 518 if (autoAck) 520 cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId, 521 queueMode)); 522 } 523 catch (JMSException jE) { 526 JoramTracing.log(JoramTracing.ERROR, this 527 + ": error while processing the" 528 + " received message: " + jE); 529 if (queueMode) 530 cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, 531 queueMode)); 532 else 533 cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, 534 queueMode)); 535 } 536 catch (RuntimeException rE) { 539 JoramTracing.log(JoramTracing.ERROR,this 540 + ": RuntimeException thrown" 541 + " by the listener: " + rE); 542 if (autoAck && queueMode) 543 cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, 544 queueMode)); 545 else if (autoAck && ! queueMode) 546 cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, 547 queueMode)); 548 } 549 } 550 } 551 } 552 catch (JMSException e) {} 553 } 554 555 561 public void commit() throws JMSException 562 { 563 if (closed) 564 throw new IllegalStateException ("Forbidden call on a closed session."); 565 566 if (! transacted) 567 throw new IllegalStateException ("Can't commit a non transacted" 568 + " session."); 569 570 if (JoramTracing.dbgClient) 571 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 572 + ": committing..."); 573 574 if (scheduled) { 576 closingTask.cancel(); 577 scheduled = false; 578 } 579 580 try { 582 Enumeration dests = sendings.keys(); 583 String dest; 584 ProducerMessages pM; 585 while (dests.hasMoreElements()) { 586 dest = (String ) dests.nextElement(); 587 pM = (ProducerMessages) sendings.remove(dest); 588 cnx.syncRequest(pM); 589 } 590 acknowledge(); 592 593 if (JoramTracing.dbgClient) 594 JoramTracing.log(JoramTracing.DEBUG, this + ": committed."); 595 } 596 catch (JMSException jE) { 598 TransactionRolledBackException tE = 599 new TransactionRolledBackException("A JMSException was thrown during" 600 + " the commit."); 601 tE.setLinkedException(jE); 602 603 JoramTracing.log(JoramTracing.ERROR, "Exception: " + tE); 604 605 rollback(); 606 throw tE; 607 } 608 } 609 610 616 public void rollback() throws JMSException 617 { 618 if (closed) 619 throw new IllegalStateException ("Forbidden call on a closed session."); 620 621 if (! transacted) 622 throw new IllegalStateException ("Can't rollback a non transacted" 623 + " session."); 624 625 if (JoramTracing.dbgClient) 626 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 627 + ": rolling back..."); 628 629 if (scheduled) { 631 closingTask.cancel(); 632 scheduled = false; 633 } 634 635 deny(); 637 sendings.clear(); 639 640 if (JoramTracing.dbgClient) 641 JoramTracing.log(JoramTracing.DEBUG, this + ": rolled back."); 642 } 643 644 649 public void recover() throws JMSException 650 { 651 if (transacted) 652 throw new IllegalStateException ("Can't recover a transacted session."); 653 654 if (JoramTracing.dbgClient) 655 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 656 + " recovering..."); 657 658 stop(); 660 deny(); 661 start(); 663 664 if (JoramTracing.dbgClient) 665 JoramTracing.log(JoramTracing.DEBUG, this + ": recovered."); 666 } 667 668 669 676 public void unsubscribe(String name) throws JMSException 677 { 678 MessageConsumer cons; 679 for (int i = 0; i < consumers.size(); i++) { 680 cons = (MessageConsumer) consumers.elementAt(i); 681 if (! cons.queueMode && cons.targetName.equals(name)) 682 throw new JMSException("Can't delete durable subscription " + name 683 + " as long as an active subscriber exists."); 684 } 685 cnx.syncRequest(new ConsumerUnsubRequest(name)); 686 } 687 688 693 public synchronized void close() throws JMSException 694 { 695 if (closed) 697 return; 698 699 if (JoramTracing.dbgClient) 700 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 701 + ": closing..."); 702 703 if (consumersTimer != null) 705 consumersTimer.cancel(); 706 707 try { 709 repliesIn.stop(); 710 } 711 catch (InterruptedException iE) {} 712 713 stop(); 715 716 if (transacted) 718 rollback(); 719 else 720 deny(); 721 722 while (! browsers.isEmpty()) 724 ((QueueBrowser) browsers.elementAt(0)).close(); 725 while (! consumers.isEmpty()) 726 ((MessageConsumer) consumers.elementAt(0)).close(); 727 while (! producers.isEmpty()) 728 ((MessageProducer) producers.elementAt(0)).close(); 729 730 cnx.sessions.removeElement(this); 731 732 closed = true; 733 734 if (JoramTracing.dbgClient) 735 JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); 736 } 737 738 739 synchronized void schedule(TimerTask task, long timer) 740 { 741 if (consumersTimer == null) 742 consumersTimer = new com.scalagent.kjoram.util.Timer(); 743 744 try { 745 consumersTimer.schedule(task, timer); 746 } 747 catch (Exception exc) {} 748 } 749 750 763 void start() throws IllegalStateException 764 { 765 if (closed) 766 throw new IllegalStateException ("Forbidden call on a closed session."); 767 768 if (JoramTracing.dbgClient) 769 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 770 + ": starting..."); 771 772 repliesIn.start(); 773 774 if (! started && msgListeners > 0) { 776 daemon = new SessionDaemon(this); 777 daemon.setDaemon(false); 778 daemon.start(); 779 } 780 started = true; 781 782 if (JoramTracing.dbgClient) 783 JoramTracing.log(JoramTracing.DEBUG, this + ": started."); 784 } 785 786 801 void stop() 802 { 803 if (! started) 805 return; 806 807 if (JoramTracing.dbgClient) 808 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 809 + ": stopping..."); 810 811 if (daemon != null) { 813 daemon.stop(); 814 daemon = null; 815 } 816 if (consumers != null) { 818 MessageConsumer consumer; 819 for (int i = 0; i < consumers.size(); i++) { 820 consumer = (MessageConsumer) consumers.elementAt(i); 821 consumer.syncro(); 822 } 823 } 824 825 started = false; 826 827 if (JoramTracing.dbgClient) 828 JoramTracing.log(JoramTracing.DEBUG, this + ": stopped."); 829 } 830 831 838 void prepareSend(Destination dest, com.scalagent.kjoram.messages.Message msg) 839 { 840 if (scheduled) 842 closingTask.cancel(); 843 844 ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName()); 845 if (pM == null) { 846 pM = new ProducerMessages(dest.getName()); 847 sendings.put(dest.getName(), pM); 848 } 849 pM.addMessage(msg); 850 851 if (scheduled) 853 cnx.schedule(closingTask); 854 } 855 856 866 void prepareAck(String name, String id, boolean queueMode) 867 { 868 if (scheduled) 870 closingTask.cancel(); 871 872 MessageAcks acks = (MessageAcks) deliveries.get(name); 873 if (acks == null) { 874 acks = new MessageAcks(queueMode); 875 deliveries.put(name, acks); 876 } 877 acks.addId(id); 878 879 if (closingTask != null) { 881 scheduled = true; 882 cnx.schedule(closingTask); 883 } 884 } 885 886 891 void acknowledge() throws IllegalStateException 892 { 893 String target; 894 MessageAcks acks; 895 896 Enumeration targets = deliveries.keys(); 897 while (targets.hasMoreElements()) { 898 target = (String ) targets.nextElement(); 899 acks = (MessageAcks) deliveries.remove(target); 900 cnx.asyncRequest(new SessAckRequest(target, acks.getIds(), 901 acks.getQueueMode())); 902 } 903 } 904 905 906 void deny() 907 { 908 try { 909 String target; 910 MessageAcks acks; 911 SessDenyRequest deny; 912 913 Enumeration targets = deliveries.keys(); 914 while (targets.hasMoreElements()) { 915 target = (String ) targets.nextElement(); 916 acks = (MessageAcks) deliveries.remove(target); 917 deny = new SessDenyRequest(target, acks.getIds(), acks.getQueueMode()); 918 if (acks.getQueueMode()) 919 cnx.syncRequest(deny); 920 else 921 cnx.asyncRequest(deny); 922 } 923 } 924 catch (JMSException jE) {} 925 } 926 927 931 void distribute(AbstractJmsReply asyncReply) 932 { 933 ConsumerMessages reply = (ConsumerMessages) asyncReply; 935 936 MessageConsumer cons = null; 938 if (reply.getQueueMode()) { 939 cons = 940 (MessageConsumer) cnx.requestsTable.remove(reply.getKey()); 941 } 942 else 943 cons = (MessageConsumer) cnx.requestsTable.get(reply.getKey()); 944 945 if (cons != null) { 947 Vector msgs = reply.getMessages(); 948 for (int i = 0; i < msgs.size(); i++) 949 cons.onMessage((com.scalagent.kjoram.messages.Message) msgs.elementAt(i)); 950 } 951 else { 955 if (JoramTracing.dbgClient) 956 JoramTracing.log(JoramTracing.WARN, this + ": an asynchronous" 957 + " delivery arrived for an improperly" 958 + " closed consumer: denying the" 959 + " messages."); 960 961 Vector msgs = reply.getMessages(); 962 com.scalagent.kjoram.messages.Message msg; 963 Vector ids = new Vector(); 964 for (int i = 0; i < msgs.size(); i++) { 965 msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i); 966 ids.addElement(msg.getIdentifier()); 967 } 968 969 if (ids.isEmpty()) 970 return; 971 972 try { 973 cnx.asyncRequest(new SessDenyRequest(reply.comesFrom(), ids, 974 reply.getQueueMode(), true)); 975 } 976 catch (JMSException jE) {} 977 } 978 } 979 980 985 private class SessionCloseTask extends TimerTask 986 { 987 988 public void run() 989 { 990 try { 991 if (JoramTracing.dbgClient) 992 JoramTracing.log(JoramTracing.WARN, "Session closed " 993 + "because of pending transaction"); 994 close(); 995 } 996 catch (Exception e) {} 997 } 998 } 999 } 1000 | Popular Tags |