1 24 package org.objectweb.joram.client.jms; 25 26 import java.util.*; 27 28 import javax.jms.JMSException ; 29 import javax.jms.TransactionRolledBackException ; 30 import javax.jms.IllegalStateException ; 31 import javax.jms.MessageFormatException ; 32 33 import org.objectweb.joram.client.jms.connection.RequestMultiplexer; 34 import org.objectweb.joram.client.jms.connection.Requestor; 35 36 import org.objectweb.joram.shared.client.*; 37 38 import org.objectweb.util.monolog.api.BasicLevel; 39 import org.objectweb.util.monolog.api.Logger; 40 import fr.dyade.aaa.util.Debug; 41 42 45 public class Session implements javax.jms.Session { 46 47 public static Logger logger = Debug.getLogger(Session.class.getName()); 48 49 50 public static final String RECEIVE_ACK = 51 "org.objectweb.joram.client.jms.receiveAck"; 52 53 public static boolean receiveAck = Boolean.getBoolean(RECEIVE_ACK); 54 55 58 private static class Status { 59 63 public static final int STOP = 0; 64 65 68 public static final int START = 1; 69 70 73 public static final int CLOSE = 2; 74 75 private static final String [] names = { 76 "STOP", "START", "CLOSE"}; 77 78 public static String toString(int status) { 79 return names[status]; 80 } 81 } 82 83 86 private static class SessionMode { 87 91 public static final int NONE = 0; 92 93 96 public static final int RECEIVE = 1; 97 98 101 public static final int LISTENER = 2; 102 103 106 public static final int APP_SERVER = 3; 107 108 private static final String [] names = { 109 "NONE", "RECEIVE", "LISTENER", "APP_SERVER"}; 110 111 public static String toString(int status) { 112 return names[status]; 113 } 114 } 115 116 120 private static class RequestStatus { 121 122 public static final int NONE = 0; 123 124 public static final int RUN = 1; 125 126 public static final int DONE = 2; 127 128 private static final String [] names = { 129 "NONE", "RUN", "DONE"}; 130 131 public static String toString(int status) { 132 return names[status]; 133 } 134 } 135 136 137 private SessionCloseTask closingTask; 138 139 140 private boolean scheduled; 141 142 143 protected javax.jms.MessageListener messageListener; 144 145 146 private String ident; 147 148 149 private Connection cnx; 150 151 152 boolean transacted; 153 154 155 private int acknowledgeMode; 156 157 158 private boolean autoAck; 159 160 161 private Vector consumers; 162 163 164 private Vector producers; 165 166 167 private Vector browsers; 168 169 170 private fr.dyade.aaa.util.Queue repliesIn; 171 172 173 private SessionDaemon daemon; 174 175 176 private int listenerCount; 177 178 185 Hashtable sendings; 186 187 194 Hashtable deliveries; 195 196 199 private RequestMultiplexer mtpx; 200 201 204 private Requestor requestor; 205 206 211 private Requestor receiveRequestor; 212 213 218 private boolean recover; 219 220 223 private int status; 224 225 228 private int sessionMode; 229 230 233 private int requestStatus; 234 235 238 private MessageConsumer pendingMessageConsumer; 239 240 243 private Thread singleThreadOfControl; 244 245 249 private boolean passiveMsgInput; 250 251 254 private Closer closer; 255 256 260 private boolean asyncSend; 261 262 265 private int queueMessageReadMax; 266 267 271 private int topicAckBufferMax; 272 273 277 private int topicPassivationThreshold; 278 279 283 private int topicActivationThreshold; 284 285 private MessageConsumerListener messageConsumerListener; 286 287 296 Session(Connection cnx, 297 boolean transacted, 298 int acknowledgeMode, 299 RequestMultiplexer mtpx) 300 throws JMSException { 301 if (! transacted 302 && acknowledgeMode != javax.jms.Session.AUTO_ACKNOWLEDGE 303 && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE 304 && acknowledgeMode != javax.jms.Session.DUPS_OK_ACKNOWLEDGE 305 && !(cnx instanceof XAQueueConnection) 306 && !(cnx instanceof XATopicConnection) 307 && !(cnx instanceof XAConnection)) 308 throw new JMSException ("Can't create a non transacted session with an" 309 + " invalid acknowledge mode."); 310 311 this.ident = cnx.nextSessionId(); 312 this.cnx = cnx; 313 this.transacted = transacted; 314 this.acknowledgeMode = acknowledgeMode; 315 this.mtpx = mtpx; 316 requestor = new Requestor(mtpx); 317 receiveRequestor = new Requestor(mtpx); 318 319 autoAck = ! transacted 320 && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE; 321 322 consumers = new Vector(); 323 producers = new Vector(); 324 browsers = new Vector(); 325 repliesIn = new fr.dyade.aaa.util.Queue(); 326 sendings = new Hashtable(); 327 deliveries = new Hashtable(); 328 329 closer = new Closer(); 330 331 if (transacted && cnx.getTxPendingTimer() > 0) { 334 closingTask = new SessionCloseTask( 335 cnx.getTxPendingTimer() * 1000); 336 } 337 338 asyncSend = cnx.getAsyncSend(); 339 queueMessageReadMax = cnx.getQueueMessageReadMax(); 340 topicAckBufferMax = cnx.getTopicAckBufferMax(); 341 topicActivationThreshold = cnx.getTopicActivationThreshold(); 342 topicPassivationThreshold = cnx.getTopicPassivationThreshold(); 343 344 setStatus(Status.STOP); 345 setSessionMode(SessionMode.NONE); 346 setRequestStatus(RequestStatus.NONE); 347 } 348 349 352 private void setStatus(int status) { 353 if (logger.isLoggable(BasicLevel.DEBUG)) 354 logger.log( 355 BasicLevel.DEBUG, 356 "Session.setStatus(" + 357 Status.toString(status) + ')'); 358 this.status = status; 359 } 360 361 boolean isStarted() { 362 return (status == Status.START); 363 } 364 365 368 private void setSessionMode(int sessionMode) { 369 if (logger.isLoggable(BasicLevel.DEBUG)) 370 logger.log( 371 BasicLevel.DEBUG, 372 "Session.setSessionMode(" + 373 SessionMode.toString(sessionMode) + ')'); 374 this.sessionMode = sessionMode; 375 } 376 377 380 private void setRequestStatus(int requestStatus) { 381 if (logger.isLoggable(BasicLevel.DEBUG)) 382 logger.log( 383 BasicLevel.DEBUG, 384 "Session.setRequestStatus(" + 385 RequestStatus.toString(requestStatus) + ')'); 386 this.requestStatus = requestStatus; 387 } 388 389 393 protected synchronized void checkClosed() 394 throws IllegalStateException { 395 if (status == Status.CLOSE) 396 throw new IllegalStateException ( 397 "Forbidden call on a closed session."); 398 } 399 400 404 private synchronized void checkThreadOfControl() 405 throws IllegalStateException { 406 if (singleThreadOfControl != null && 407 Thread.currentThread() != singleThreadOfControl) 408 throw new IllegalStateException ("Illegal control thread"); 409 } 410 411 417 private void checkSessionMode( 418 int expectedSessionMode) 419 throws IllegalStateException { 420 if (sessionMode == SessionMode.NONE) { 421 setSessionMode(sessionMode); 422 } else if (sessionMode != expectedSessionMode) { 423 throw new IllegalStateException ("Bad session mode"); 424 } 425 } 426 427 428 public String toString() { 429 return "Sess:" + ident; 430 } 431 432 437 public final int getAcknowledgeMode() throws JMSException { 438 checkClosed(); 439 return getAckMode(); 440 } 441 442 int getAckMode() { 443 if (transacted) 444 return Session.SESSION_TRANSACTED; 445 return acknowledgeMode; 446 } 447 448 453 public synchronized final boolean getTransacted() 454 throws JMSException { 455 checkClosed(); 456 return transacted; 457 } 458 459 463 public void setTransacted(boolean t) { 464 if (status != Status.CLOSE) { 465 transacted = t; 466 } 467 } 470 471 476 public synchronized void setMessageListener( 477 javax.jms.MessageListener messageListener) 478 throws JMSException { 479 checkSessionMode(SessionMode.APP_SERVER); 480 this.messageListener = messageListener; 481 } 482 483 488 public synchronized javax.jms.MessageListener 489 getMessageListener() 490 throws JMSException { 491 return messageListener; 492 } 493 494 500 public synchronized javax.jms.Message createMessage() 501 throws JMSException { 502 checkClosed(); 503 return new Message(); 504 } 505 506 512 public synchronized javax.jms.TextMessage createTextMessage() 513 throws JMSException { 514 checkClosed(); 515 return new TextMessage(); 516 } 517 518 524 public synchronized javax.jms.TextMessage createTextMessage(String text) 525 throws JMSException { 526 checkClosed(); 527 TextMessage message = new TextMessage(); 528 message.setText(text); 529 return message; 530 } 531 532 538 public synchronized javax.jms.BytesMessage createBytesMessage() 539 throws JMSException { 540 checkClosed(); 541 return new BytesMessage(); 542 } 543 544 550 public synchronized javax.jms.MapMessage createMapMessage() 551 throws JMSException { 552 checkClosed(); 553 return new MapMessage(); 554 } 555 556 562 public synchronized javax.jms.ObjectMessage createObjectMessage() 563 throws JMSException { 564 checkClosed(); 565 return new ObjectMessage(); 566 } 567 568 574 public synchronized javax.jms.ObjectMessage createObjectMessage( 575 java.io.Serializable obj) 576 throws JMSException { 577 checkClosed(); 578 ObjectMessage message = new ObjectMessage(); 579 message.setObject(obj); 580 return message; 581 } 582 583 589 public synchronized javax.jms.StreamMessage createStreamMessage() 590 throws JMSException { 591 checkClosed(); 592 return new StreamMessage(); 593 } 594 595 600 public synchronized javax.jms.QueueBrowser 601 createBrowser(javax.jms.Queue queue, 602 String selector) 603 throws JMSException { 604 checkClosed(); 605 checkThreadOfControl(); 606 QueueBrowser qb = new QueueBrowser(this, (Queue) queue, selector); 607 browsers.addElement(qb); 608 return qb; 609 } 610 611 616 public synchronized javax.jms.QueueBrowser 617 createBrowser(javax.jms.Queue queue) 618 throws JMSException { 619 checkClosed(); 620 checkThreadOfControl(); 621 QueueBrowser qb = new QueueBrowser(this, (Queue) queue, null); 622 browsers.addElement(qb); 623 return qb; 624 } 625 626 634 public synchronized javax.jms.MessageProducer createProducer( 635 javax.jms.Destination dest) 636 throws JMSException { 637 checkClosed(); 638 checkThreadOfControl(); 639 MessageProducer mp = new MessageProducer( 640 this, 641 (Destination) dest); 642 addProducer(mp); 643 return mp; 644 } 645 646 655 public synchronized javax.jms.MessageConsumer 656 createConsumer(javax.jms.Destination dest, 657 String selector, 658 boolean noLocal) 659 throws JMSException { 660 checkClosed(); 661 checkThreadOfControl(); 662 MessageConsumer mc = new MessageConsumer( 663 this, (Destination) dest, 664 selector, null, 665 noLocal); 666 addConsumer(mc); 667 return mc; 668 } 669 670 679 public synchronized javax.jms.MessageConsumer 680 createConsumer(javax.jms.Destination dest, 681 String selector) 682 throws JMSException { 683 checkClosed(); 684 checkThreadOfControl(); 685 MessageConsumer mc = new MessageConsumer( 686 this, (Destination) dest, selector); 687 addConsumer(mc); 688 return mc; 689 } 690 691 699 public synchronized javax.jms.MessageConsumer 700 createConsumer(javax.jms.Destination dest) 701 throws JMSException { 702 checkClosed(); 703 checkThreadOfControl(); 704 MessageConsumer mc = new MessageConsumer( 705 this, (Destination) dest, null); 706 addConsumer(mc); 707 return mc; 708 } 709 710 717 public synchronized javax.jms.TopicSubscriber 718 createDurableSubscriber(javax.jms.Topic topic, 719 String name, 720 String selector, 721 boolean noLocal) 722 throws JMSException { 723 if (logger.isLoggable(BasicLevel.DEBUG)) 724 logger.log( 725 BasicLevel.DEBUG, 726 "Session.createDurableSubscriber(" + 727 topic + ',' + name + ',' + 728 selector + ',' + noLocal + ')'); 729 checkClosed(); 730 checkThreadOfControl(); 731 TopicSubscriber ts = new TopicSubscriber( 732 this, (Topic) topic, name, selector, noLocal); 733 addConsumer(ts); 734 return ts; 735 } 736 737 744 public synchronized javax.jms.TopicSubscriber 745 createDurableSubscriber(javax.jms.Topic topic, 746 String name) 747 throws JMSException { 748 if (logger.isLoggable(BasicLevel.DEBUG)) 749 logger.log( 750 BasicLevel.DEBUG, 751 "Session.createDurableSubscriber(" + 752 topic + ',' + name + ')'); 753 checkClosed(); 754 checkThreadOfControl(); 755 TopicSubscriber ts = new TopicSubscriber( 756 this, (Topic) topic, name, null, false); 757 addConsumer(ts); 758 return ts; 759 } 760 761 766 public synchronized javax.jms.Queue createQueue( 767 String queueName) 768 throws JMSException { 769 checkClosed(); 770 return new Queue(queueName); 771 } 772 773 779 public synchronized javax.jms.Topic createTopic( 780 String topicName) 781 throws JMSException { 782 checkClosed(); 783 checkThreadOfControl(); 784 785 if (topicName.equals("#AdminTopic")) { 787 try { 788 GetAdminTopicReply reply = 789 (GetAdminTopicReply) requestor.request(new GetAdminTopicRequest()); 790 if (reply.getId() != null) 791 return new Topic(reply.getId()); 792 else 793 throw new JMSException ("AdminTopic could not be retrieved."); 794 } 795 catch (JMSException exc) { 796 throw exc; 797 } 798 catch (Exception exc) { 799 throw new JMSException ("AdminTopic could not be retrieved: " + exc); 800 } 801 } 802 return new Topic(topicName); 803 } 804 805 812 public synchronized javax.jms.TemporaryQueue createTemporaryQueue() 813 throws JMSException { 814 checkClosed(); 815 checkThreadOfControl(); 816 817 SessCreateTDReply reply = 818 (SessCreateTDReply) requestor.request(new SessCreateTQRequest()); 819 String tempDest = reply.getAgentId(); 820 return new TemporaryQueue(tempDest, cnx); 821 } 822 823 830 public synchronized javax.jms.TemporaryTopic createTemporaryTopic() 831 throws JMSException { 832 checkClosed(); 833 checkThreadOfControl(); 834 835 SessCreateTDReply reply = 836 (SessCreateTDReply) requestor.request(new SessCreateTTRequest()); 837 String tempDest = reply.getAgentId(); 838 return new TemporaryTopic(tempDest, cnx); 839 } 840 841 842 public synchronized void run() { 843 int load = repliesIn.size(); 844 845 if (logger.isLoggable(BasicLevel.DEBUG)) 846 logger.log(BasicLevel.DEBUG, 847 "-- " + this + ": loaded with " + load + 848 " message(s) and started."); 849 850 try { 851 for (int i = 0; i < load; i++) { 853 org.objectweb.joram.shared.messages.Message momMsg = 854 (org.objectweb.joram.shared.messages.Message) repliesIn.pop(); 855 String msgId = momMsg.id; 856 857 onMessage(momMsg, messageConsumerListener); 858 } 859 } catch (Exception exc) { 860 if (logger.isLoggable(BasicLevel.ERROR)) 861 logger.log(BasicLevel.ERROR, "", exc); 862 } 863 } 864 865 869 void setMessageConsumerListener(MessageConsumerListener mcl) { 870 messageConsumerListener = mcl; 871 } 872 873 879 public synchronized void commit() throws JMSException { 880 if (logger.isLoggable(BasicLevel.DEBUG)) 881 logger.log( 882 BasicLevel.DEBUG, 883 "Session.commit()"); 884 885 checkClosed(); 886 checkThreadOfControl(); 887 888 if (! transacted) 889 throw new IllegalStateException ("Can't commit a non transacted" 890 + " session."); 891 892 if (logger.isLoggable(BasicLevel.DEBUG)) 893 logger.log(BasicLevel.DEBUG, "--- " + this 894 + ": committing..."); 895 896 if (scheduled) { 898 closingTask.cancel(); 899 scheduled = false; 900 } 901 902 try { 904 CommitRequest commitReq= new CommitRequest(); 905 906 Enumeration producerMessages = sendings.elements(); 907 while (producerMessages.hasMoreElements()) { 908 ProducerMessages pM = 909 (ProducerMessages) producerMessages.nextElement(); 910 commitReq.addProducerMessages(pM); 911 } 912 sendings.clear(); 913 914 Enumeration targets = deliveries.keys(); 916 while (targets.hasMoreElements()) { 917 String target = (String ) targets.nextElement(); 918 MessageAcks acks = (MessageAcks) deliveries.get(target); 919 commitReq.addAckRequest( 920 new SessAckRequest( 921 target, 922 acks.getIds(), 923 acks.getQueueMode())); 924 } 925 deliveries.clear(); 926 927 if (asyncSend) { 928 commitReq.setAsyncSend(true); 930 mtpx.sendRequest(commitReq); 931 } else { 932 requestor.request(commitReq); 933 } 934 935 if (logger.isLoggable(BasicLevel.DEBUG)) 936 logger.log(BasicLevel.DEBUG, this + ": committed."); 937 } 938 catch (JMSException jE) { 940 if (logger.isLoggable(BasicLevel.ERROR)) 941 logger.log(BasicLevel.ERROR, "", jE); 942 TransactionRolledBackException tE = 943 new TransactionRolledBackException ("A JMSException was thrown during" 944 + " the commit."); 945 tE.setLinkedException(jE); 946 947 if (logger.isLoggable(BasicLevel.ERROR)) 948 logger.log(BasicLevel.ERROR, "Exception: " + tE); 949 950 rollback(); 951 throw tE; 952 } 953 } 954 955 961 public synchronized void rollback() throws JMSException { 962 if (logger.isLoggable(BasicLevel.DEBUG)) 963 logger.log( 964 BasicLevel.DEBUG, 965 "Session.rollback()"); 966 967 checkClosed(); 968 checkThreadOfControl(); 969 970 if (! transacted) 971 throw new IllegalStateException ("Can't rollback a non transacted" 972 + " session."); 973 974 if (logger.isLoggable(BasicLevel.DEBUG)) 975 logger.log(BasicLevel.DEBUG, "--- " + this 976 + ": rolling back..."); 977 978 if (scheduled) { 980 closingTask.cancel(); 981 scheduled = false; 982 } 983 984 deny(); 986 sendings.clear(); 988 989 if (logger.isLoggable(BasicLevel.DEBUG)) 990 logger.log(BasicLevel.DEBUG, this + ": rolled back."); 991 } 992 993 998 public synchronized void recover() throws JMSException { 999 if (logger.isLoggable(BasicLevel.DEBUG)) 1000 logger.log( 1001 BasicLevel.DEBUG, 1002 "Session.recover()"); 1003 1004 checkClosed(); 1005 checkThreadOfControl(); 1006 1007 if (transacted) 1008 throw new IllegalStateException ("Can't recover a transacted session."); 1009 1010 if (logger.isLoggable(BasicLevel.DEBUG)) 1011 logger.log(BasicLevel.DEBUG, "--- " + this 1012 + " recovering..."); 1013 1014 if (daemon != null && 1015 daemon.isCurrentThread()) { 1016 recover = true; 1017 } else { 1018 doRecover(); 1019 } 1020 1021 if (logger.isLoggable(BasicLevel.DEBUG)) 1022 logger.log(BasicLevel.DEBUG, this + ": recovered."); 1023 } 1024 1025 private void doRecover() throws JMSException { 1026 if (logger.isLoggable(BasicLevel.DEBUG)) 1027 logger.log(BasicLevel.DEBUG, "Session.doRecover()"); 1028 deny(); 1029 } 1030 1031 1040 public synchronized void unsubscribe(String name) throws JMSException { 1041 if (logger.isLoggable(BasicLevel.DEBUG)) 1042 logger.log(BasicLevel.DEBUG, "Session.unsubscribe(" + name + ')'); 1043 1044 if (name == null) 1045 throw new JMSException ("Bad subscription name: null"); 1046 1047 checkClosed(); 1048 checkThreadOfControl(); 1049 1050 MessageConsumer cons; 1051 if (consumers != null) { 1052 for (int i = 0; i < consumers.size(); i++) { 1053 cons = (MessageConsumer) consumers.get(i); 1054 if (! cons.queueMode && cons.targetName.equals(name)) 1055 throw new JMSException ("Can't delete durable subscription " + name 1056 + " as long as an active subscriber exists."); 1057 } 1058 } 1059 syncRequest(new ConsumerUnsubRequest(name)); 1060 } 1061 1062 1068 public void close() throws JMSException { 1069 if (logger.isLoggable(BasicLevel.DEBUG)) 1070 logger.log( 1071 BasicLevel.DEBUG, 1072 "Session.close()"); 1073 closer.close(); 1074 } 1075 1076 1081 class Closer { 1082 synchronized void close() 1083 throws JMSException { 1084 doClose(); 1085 } 1086 } 1087 1088 void doClose() throws JMSException { 1089 synchronized (this) { 1090 if (status == Status.CLOSE) return; 1091 } 1092 1093 1097 Vector consumersToClose = (Vector)consumers.clone(); 1098 consumers.clear(); 1099 for (int i = 0; i < consumersToClose.size(); i++) { 1100 MessageConsumer mc = 1101 (MessageConsumer)consumersToClose.elementAt(i); 1102 try { 1103 mc.close(); 1104 } catch (JMSException exc) { 1105 if (logger.isLoggable(BasicLevel.DEBUG)) 1106 logger.log( 1107 BasicLevel.DEBUG, "", exc); 1108 } 1109 } 1110 1111 Vector browsersToClose = (Vector)browsers.clone(); 1112 browsers.clear(); 1113 for (int i = 0; i < browsersToClose.size(); i++) { 1114 QueueBrowser qb = 1115 (QueueBrowser)browsersToClose.elementAt(i); 1116 try { 1117 qb.close(); 1118 } catch (JMSException exc) { 1119 if (logger.isLoggable(BasicLevel.DEBUG)) 1120 logger.log( 1121 BasicLevel.DEBUG, "", exc); 1122 } 1123 } 1124 1125 Vector producersToClose = (Vector)producers.clone(); 1126 producers.clear(); 1127 for (int i = 0; i < producersToClose.size(); i++) { 1128 MessageProducer mp = 1129 (MessageProducer)producersToClose.elementAt(i); 1130 try { 1131 mp.close(); 1132 } catch (JMSException exc) { 1133 if (logger.isLoggable(BasicLevel.DEBUG)) 1134 logger.log( 1135 BasicLevel.DEBUG, "", exc); 1136 } 1137 } 1138 1139 1146 stop(); 1147 1148 receiveRequestor.close(); 1152 1153 if (transacted) { 1155 rollback(); 1156 } else { 1157 deny(); 1158 } 1159 1160 cnx.closeSession(this); 1161 1162 synchronized (this) { 1163 setStatus(Status.CLOSE); 1164 } 1165 } 1166 1167 1172 synchronized void start() { 1173 if (logger.isLoggable(BasicLevel.DEBUG)) 1174 logger.log( 1175 BasicLevel.DEBUG, 1176 "Session.start()"); 1177 1178 if (status == Status.CLOSE) return; 1179 if (status == Status.START) return; 1180 if (listenerCount > 0) { 1181 doStart(); 1182 } 1183 1184 setStatus(Status.START); 1185 } 1186 1187 private void doStart() { 1188 if (logger.isLoggable(BasicLevel.DEBUG)) 1189 logger.log( 1190 BasicLevel.DEBUG, 1191 "Session.doStart()"); 1192 repliesIn.start(); 1193 daemon = new SessionDaemon(); 1194 daemon.setDaemon(false); 1195 daemon.start(); 1196 singleThreadOfControl = daemon.getThread(); 1197 } 1198 1199 1213 synchronized void stop() { 1214 if (logger.isLoggable(BasicLevel.DEBUG)) 1215 logger.log( 1216 BasicLevel.DEBUG, 1217 "Session.stop()"); 1218 if (status == Status.STOP || 1219 status == Status.CLOSE) return; 1220 1221 1227 1233 doStop(); 1234 1235 setStatus(Status.STOP); 1236 } 1237 1238 private void doStop() { 1239 if (daemon != null) { 1240 daemon.stop(); 1241 daemon = null; 1242 singleThreadOfControl = null; 1243 } 1244 } 1245 1246 1253 private void prepareSend(Destination dest, 1254 org.objectweb.joram.shared.messages.Message msg) throws JMSException { 1255 if (logger.isLoggable(BasicLevel.DEBUG)) 1256 logger.log(BasicLevel.DEBUG, 1257 "Session.prepareSend(" + dest + ',' + msg + ')'); 1258 1259 checkClosed(); 1260 checkThreadOfControl(); 1261 1262 if (scheduled) closingTask.cancel(); 1264 1265 ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName()); 1266 if (pM == null) { 1267 pM = new ProducerMessages(dest.getName()); 1268 sendings.put(dest.getName(), pM); 1269 } 1270 pM.addMessage(msg); 1271 1272 if (scheduled) closingTask.start(); 1274 } 1275 1276 1286 private void prepareAck(String name, 1287 String id, 1288 boolean queueMode) { 1289 if (logger.isLoggable(BasicLevel.DEBUG)) 1290 logger.log( 1291 BasicLevel.DEBUG, 1292 "Session.prepareAck(" + 1293 name + ',' + id + ',' + queueMode + ')'); 1294 1295 if (scheduled) 1297 closingTask.cancel(); 1298 1299 MessageAcks acks = (MessageAcks) deliveries.get(name); 1300 if (acks == null) { 1301 acks = new MessageAcks(queueMode); 1302 deliveries.put(name, acks); 1303 } 1304 acks.addId(id); 1305 1306 if (logger.isLoggable(BasicLevel.DEBUG)) 1307 logger.log( 1308 BasicLevel.DEBUG, " -> acks = " + acks); 1309 1310 if (closingTask != null) { 1312 scheduled = true; 1313 closingTask.start(); 1314 } 1315 } 1316 1317 1321 synchronized void acknowledge() throws JMSException { 1322 checkClosed(); 1323 if (transacted || 1324 acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE) { 1325 return; 1326 } 1327 doAcknowledge(); 1328 } 1329 1330 1333 private void doAcknowledge() throws JMSException { 1334 Enumeration targets = deliveries.keys(); 1335 while (targets.hasMoreElements()) { 1336 String target = (String ) targets.nextElement(); 1337 MessageAcks acks = (MessageAcks) deliveries.remove(target); 1338 mtpx.sendRequest( 1339 new SessAckRequest( 1340 target, 1341 acks.getIds(), 1342 acks.getQueueMode())); 1343 } 1344 } 1345 1346 1365 private void deny() throws JMSException { 1366 if (logger.isLoggable(BasicLevel.DEBUG)) 1367 logger.log( 1368 BasicLevel.DEBUG, 1369 "Session.deny()"); 1370 Enumeration targets = deliveries.keys(); 1371 while (targets.hasMoreElements()) { 1372 String target = (String ) targets.nextElement(); 1373 MessageAcks acks = (MessageAcks) deliveries.remove(target); 1374 if (logger.isLoggable(BasicLevel.DEBUG)) 1375 logger.log( 1376 BasicLevel.DEBUG, 1377 " -> acks = " + acks + ')'); 1378 SessDenyRequest deny = new SessDenyRequest( 1379 target, 1380 acks.getIds(), 1381 acks.getQueueMode()); 1382 if (acks.getQueueMode()) { 1383 requestor.request(deny); 1384 } else { 1385 mtpx.sendRequest(deny); 1386 } 1387 } 1388 } 1389 1390 1396 javax.jms.Message receive( 1397 long requestTimeToLive, 1398 long waitTimeOut, 1399 MessageConsumer mc, 1400 String targetName, 1401 String selector, 1402 boolean queueMode) 1403 throws JMSException { 1404 if (logger.isLoggable(BasicLevel.DEBUG)) 1405 logger.log(BasicLevel.DEBUG, 1406 "Session.receive(" + requestTimeToLive + ',' + 1407 waitTimeOut + ',' + targetName + ',' + 1408 selector + ',' + queueMode + ')'); 1409 preReceive(mc); 1410 try { 1411 ConsumerMessages reply = null; 1412 ConsumerReceiveRequest request = 1413 new ConsumerReceiveRequest(targetName, selector, 1414 requestTimeToLive, queueMode); 1415 if (receiveAck) request.setReceiveAck(true); 1416 reply = (ConsumerMessages) receiveRequestor.request(request, waitTimeOut); 1417 1418 if (logger.isLoggable(BasicLevel.DEBUG)) 1419 logger.log( 1420 BasicLevel.DEBUG, 1421 " -> reply = " + reply); 1422 1423 synchronized (this) { 1424 if (status == Status.CLOSE) { 1427 if (reply != null) { 1428 mtpx.deny(reply); 1429 } 1430 return null; 1431 } 1432 1433 if (reply != null) { 1434 Vector msgs = reply.getMessages(); 1435 if (msgs != null && ! msgs.isEmpty()) { 1436 Message msg = Message.wrapMomMessage(this, (org.objectweb.joram.shared.messages.Message) msgs.get(0)); 1437 String msgId = msg.getJMSMessageID();; 1438 1439 if (autoAck && ! receiveAck) { 1441 ConsumerAckRequest req = new ConsumerAckRequest(targetName, queueMode); 1442 req.addId(msgId); 1443 mtpx.sendRequest(req); 1444 } else { 1445 prepareAck(targetName, msgId, queueMode); 1446 } 1447 msg.session = this; 1448 return msg; 1449 } else { 1450 return null; 1451 } 1452 } else { 1453 return null; 1454 } 1455 } 1456 } finally { 1457 postReceive(); 1458 } 1459 } 1460 1461 1467 private synchronized void preReceive( 1468 MessageConsumer mc) throws JMSException { 1469 if (logger.isLoggable(BasicLevel.DEBUG)) 1470 logger.log( 1471 BasicLevel.DEBUG, 1472 "Session.preReceive(" + mc + ')'); 1473 mc.checkClosed(); 1477 1478 checkClosed(); 1479 checkThreadOfControl(); 1480 1481 switch (sessionMode) { 1485 case SessionMode.NONE: 1486 setSessionMode(SessionMode.RECEIVE); 1487 break; 1488 default: 1489 throw new IllegalStateException ("Illegal session mode"); 1490 } 1491 1492 if (requestStatus != RequestStatus.NONE) 1493 throw new IllegalStateException ("Illegal request status"); 1494 1495 singleThreadOfControl = Thread.currentThread(); 1496 pendingMessageConsumer = mc; 1497 1498 setRequestStatus(RequestStatus.RUN); 1499 } 1500 1501 1506 private synchronized void postReceive() { 1507 if (logger.isLoggable(BasicLevel.DEBUG)) 1508 logger.log( 1509 BasicLevel.DEBUG, 1510 "Session.postReceive()"); 1511 1512 singleThreadOfControl = null; 1513 pendingMessageConsumer = null; 1514 setRequestStatus(RequestStatus.NONE); 1515 setSessionMode(SessionMode.NONE); 1516 notifyAll(); 1517 } 1518 1519 1522 protected synchronized void addConsumer( 1523 MessageConsumer mc) { 1524 consumers.addElement(mc); 1525 } 1526 1527 1530 synchronized void closeConsumer(MessageConsumer mc) { 1531 if (logger.isLoggable(BasicLevel.DEBUG)) 1532 logger.log( 1533 BasicLevel.DEBUG, 1534 "Session.closeConsumer(" + mc + ')'); 1535 consumers.removeElement(mc); 1536 1537 if (pendingMessageConsumer == mc) { 1538 if (requestStatus == RequestStatus.RUN) { 1539 receiveRequestor.close(); 1544 1545 try { 1547 while (requestStatus != RequestStatus.NONE) { 1548 wait(); 1549 } 1550 } catch (InterruptedException exc) {} 1551 1552 receiveRequestor = new Requestor(mtpx); 1554 } 1555 } 1556 } 1557 1558 1561 synchronized void checkConsumers(String agentId) 1562 throws JMSException { 1563 for (int j = 0; j < consumers.size(); j++) { 1564 MessageConsumer cons = 1565 (MessageConsumer) consumers.elementAt(j); 1566 if (agentId.equals(cons.dest.agentId)) { 1567 throw new JMSException ( 1568 "Consumers still exist for this temp queue."); 1569 } 1570 } 1571 } 1572 1573 1576 protected void addProducer(MessageProducer mp) { 1577 producers.addElement(mp); 1578 } 1579 1580 1583 synchronized void closeProducer(MessageProducer mp) { 1584 producers.removeElement(mp); 1585 } 1586 1587 1590 synchronized void closeBrowser(QueueBrowser qb) { 1591 browsers.removeElement(qb); 1592 } 1593 1594 1597 synchronized MessageConsumerListener addMessageListener( 1598 MessageConsumerListener mcl) throws JMSException { 1599 if (logger.isLoggable(BasicLevel.DEBUG)) 1600 logger.log( 1601 BasicLevel.DEBUG, 1602 "Session.addMessageListener(" + mcl + ')'); 1603 checkClosed(); 1604 checkThreadOfControl(); 1605 1606 checkSessionMode(SessionMode.LISTENER); 1607 1608 mcl.start(); 1609 1610 if (status == Status.START && 1611 listenerCount == 0) { 1612 doStart(); 1613 } 1614 1615 listenerCount++; 1616 return mcl; 1617 } 1618 1619 1624 void removeMessageListener( 1625 MessageConsumerListener mcl, 1626 boolean check) throws JMSException { 1627 if (logger.isLoggable(BasicLevel.DEBUG)) 1628 logger.log( 1629 BasicLevel.DEBUG, 1630 "Session.removeMessageListener(" + 1631 mcl + ',' + check + ')'); 1632 1633 if (check) { 1634 checkClosed(); 1635 checkThreadOfControl(); 1636 } 1637 1638 mcl.close(); 1642 1643 synchronized (this) { 1644 listenerCount--; 1645 if (status == Status.START && listenerCount == 0) { 1646 try { 1647 repliesIn.stop(); 1648 } catch (InterruptedException iE) { 1649 } 1650 doStop(); 1654 } 1655 } 1656 } 1657 1658 1667 void pushMessages(SingleSessionConsumer consumerListener, 1668 ConsumerMessages messages) { 1669 if (logger.isLoggable(BasicLevel.DEBUG)) 1670 logger.log( 1671 BasicLevel.DEBUG, 1672 "Session.pushMessages(" + 1673 consumerListener + ',' + messages + ')'); 1674 repliesIn.push( 1675 new MessageListenerContext( 1676 consumerListener, messages)); 1677 } 1678 1679 1683 void onMessage(org.objectweb.joram.shared.messages.Message msg) { 1684 if (logger.isLoggable(BasicLevel.DEBUG)) 1685 logger.log(BasicLevel.DEBUG, "Session.onMessage(" + msg + ')'); 1686 1687 repliesIn.push(msg); 1688 } 1689 1690 1694 private void ackMessage(String targetName, 1695 String msgId, 1696 boolean queueMode) 1697 throws JMSException { 1698 ConsumerAckRequest ack = new ConsumerAckRequest(targetName, queueMode); 1699 ack.addId(msgId); 1700 mtpx.sendRequest(ack); 1701 } 1702 1703 1710 private void denyMessage(String targetName, 1711 String msgId, 1712 boolean queueMode) 1713 throws JMSException { 1714 if (logger.isLoggable(BasicLevel.DEBUG)) 1715 logger.log( 1716 BasicLevel.DEBUG, 1717 "Session.denyMessage(" + 1718 targetName + ',' + 1719 msgId + ',' + 1720 queueMode + ')'); 1721 ConsumerDenyRequest cdr = new ConsumerDenyRequest( 1722 targetName, msgId, queueMode); 1723 if (queueMode) { 1724 requestor.request(cdr); 1725 } else { 1726 mtpx.sendRequest(cdr, null); 1727 } 1728 } 1729 1730 1735 private void onMessages(MessageListenerContext ctx) throws JMSException { 1736 Vector msgs = ctx.messages.getMessages(); 1737 for (int i = 0; i < msgs.size(); i++) { 1738 onMessage((org.objectweb.joram.shared.messages.Message) msgs.elementAt(i), 1739 ctx.consumerListener); 1740 } 1741 } 1742 1743 1746 void onMessage(org.objectweb.joram.shared.messages.Message momMsg, 1747 MessageConsumerListener mcl) throws JMSException { 1748 String msgId = momMsg.id; 1749 1750 if (! autoAck) 1751 prepareAck(mcl.getTargetName(), msgId, mcl.getQueueMode()); 1752 1753 Message msg = null; 1754 try { 1755 msg = Message.wrapMomMessage(this, momMsg); 1756 } catch (JMSException jE) { 1757 if (autoAck) 1760 denyMessage(mcl.getTargetName(), msgId, mcl.getQueueMode()); 1761 return; 1762 } 1763 msg.session = this; 1764 1765 try { 1766 if (messageListener == null) { 1767 mcl.onMessage(msg, acknowledgeMode); 1769 } else { 1770 mcl.onMessage(msg, messageListener, acknowledgeMode); 1772 } 1773 } catch (JMSException exc) { 1774 if (logger.isLoggable(BasicLevel.DEBUG)) 1775 logger.log(BasicLevel.DEBUG, "", exc); 1776 1777 if (autoAck || mcl.isClosed()) { 1778 denyMessage(mcl.getTargetName(), msgId, mcl.getQueueMode()); 1779 } 1780 return; 1781 } 1782 1783 if (recover) { 1784 if (autoAck) { 1787 denyMessage(mcl.getTargetName(), msgId, mcl.getQueueMode()); 1788 } else { 1789 doRecover(); 1790 recover = false; 1791 } 1792 } else { 1793 if (autoAck) { 1794 mcl.ack(msgId, acknowledgeMode); 1795 } 1796 } 1797 } 1798 1799 1802 synchronized void send(Destination dest, 1803 javax.jms.Message msg, 1804 int deliveryMode, 1805 int priority, 1806 long timeToLive, 1807 boolean timestampDisabled) throws JMSException { 1808 if (logger.isLoggable(BasicLevel.DEBUG)) 1809 logger.log(BasicLevel.DEBUG, 1810 "Session.send(" + dest + ',' + msg + ',' + deliveryMode + ',' + 1811 priority + ',' + timeToLive + ',' + timestampDisabled + ')'); 1812 1813 checkClosed(); 1814 checkThreadOfControl(); 1815 1816 String msgID = cnx.nextMessageId(); 1818 msg.setJMSMessageID(msgID); 1819 msg.setJMSDeliveryMode(deliveryMode); 1820 msg.setJMSDestination(dest); 1821 if (timeToLive == 0) { 1822 msg.setJMSExpiration(0); 1823 } else { 1824 msg.setJMSExpiration(System.currentTimeMillis() + timeToLive); 1825 } 1826 msg.setJMSPriority(priority); 1827 if (! timestampDisabled) { 1828 msg.setJMSTimestamp(System.currentTimeMillis()); 1829 } 1830 1831 Message joramMsg = null; 1832 try { 1833 joramMsg = (Message) msg; 1834 } catch (ClassCastException exc) { 1835 try { 1836 joramMsg = Message.convertJMSMessage(msg); 1839 } catch (JMSException jE) { 1840 MessageFormatException mE = 1841 new MessageFormatException ("Message to send is invalid."); 1842 mE.setLinkedException(jE); 1843 throw mE; 1844 } 1845 } 1846 joramMsg.prepare(); 1847 1848 if (transacted) { 1849 if (logger.isLoggable(BasicLevel.DEBUG)) 1850 logger.log(BasicLevel.DEBUG, "Buffering the message."); 1851 prepareSend(dest, (org.objectweb.joram.shared.messages.Message) joramMsg.momMsg.clone()); 1853 } else { 1854 ProducerMessages pM = new ProducerMessages(dest.getName(), 1855 (org.objectweb.joram.shared.messages.Message) joramMsg.momMsg.clone()); 1856 1857 if (logger.isLoggable(BasicLevel.DEBUG)) 1858 logger.log(BasicLevel.DEBUG, "Sending " + joramMsg); 1859 1860 if (asyncSend || (! joramMsg.momMsg.persistent)) { 1861 pM.setAsyncSend(true); 1863 mtpx.sendRequest(pM); 1864 } else { 1865 requestor.request(pM); 1866 } 1867 } 1868 } 1869 1870 1877 synchronized AbstractJmsReply syncRequest( 1878 AbstractJmsRequest request) 1879 throws JMSException { 1880 return requestor.request(request); 1881 } 1882 1883 final Connection getConnection() { 1884 return cnx; 1885 } 1886 1887 final String getId() { 1888 return ident; 1889 } 1890 1891 final RequestMultiplexer getRequestMultiplexer() { 1892 return mtpx; 1893 } 1894 1895 public final boolean isAutoAck() { 1896 return autoAck; 1897 } 1898 1899 private void activateMessageInput() throws JMSException { 1900 for (int i = 0; i < consumers.size(); i++) { 1901 MessageConsumer cons = 1902 (MessageConsumer) consumers.elementAt(i); 1903 cons.activateMessageInput(); 1904 } 1905 passiveMsgInput = false; 1906 } 1907 1908 private void passivateMessageInput() throws JMSException { 1909 for (int i = 0; i < consumers.size(); i++) { 1910 MessageConsumer cons = 1911 (MessageConsumer) consumers.elementAt(i); 1912 cons.passivateMessageInput(); 1913 } 1914 passiveMsgInput = true; 1915 } 1916 1917 1922 public void setAsyncSend(boolean b) { 1923 asyncSend = b; 1924 } 1925 1926 1931 public void setQueueMessageReadMax(int i) { 1932 queueMessageReadMax = i; 1933 } 1934 1935 public final int getQueueMessageReadMax() { 1936 return queueMessageReadMax; 1937 } 1938 1939 public final int getTopicAckBufferMax() { 1940 return topicAckBufferMax; 1941 } 1942 1943 public void setTopicAckBufferMax(int i) { 1944 topicAckBufferMax = i; 1945 } 1946 1947 public final int getTopicActivationThreshold() { 1948 return topicActivationThreshold; 1949 } 1950 1951 public void setTopicActivationThreshold(int i) { 1952 topicActivationThreshold = i; 1953 } 1954 1955 public final int getTopicPassivationThreshold() { 1956 return topicPassivationThreshold; 1957 } 1958 1959 public void setTopicPassivationThreshold(int i) { 1960 topicPassivationThreshold = i; 1961 } 1962 1963 1968 private class SessionCloseTask extends TimerTask { 1969 private long txPendingTimer; 1970 1971 SessionCloseTask(long txPendingTimer) { 1972 this.txPendingTimer = txPendingTimer; 1973 } 1974 1975 1976 public void run() { 1977 try { 1978 if (logger.isLoggable(BasicLevel.WARN)) 1979 logger.log(BasicLevel.WARN, "Session closed " 1980 + "because of pending transaction"); 1981 close(); 1982 } catch (Exception e) {} 1983 } 1984 1985 public void start() { 1986 try { 1987 mtpx.schedule(this, txPendingTimer); 1988 } catch (Exception e) {} 1989 } 1990 } 1991 1992 1995 private class SessionDaemon extends fr.dyade.aaa.util.Daemon { 1996 SessionDaemon() { 1997 super("Connection#" + cnx + " - Session#" + ident); 1998 } 1999 2000 public void run() { 2001 while (running) { 2002 canStop = true; 2003 MessageListenerContext ctx; 2004 try { 2005 ctx = (MessageListenerContext)repliesIn.get(); 2006 repliesIn.pop(); 2007 } catch (InterruptedException exc) { 2008 if (logger.isLoggable(BasicLevel.DEBUG)) 2009 logger.log(BasicLevel.DEBUG, "", exc); 2010 return; 2011 } 2012 2013 canStop = false; 2014 try { 2015 onMessages(ctx); 2016 } catch (JMSException exc) { 2017 if (logger.isLoggable(BasicLevel.DEBUG)) 2018 logger.log(BasicLevel.DEBUG, "", exc); 2019 } 2020 } 2021 } 2022 2023 Thread getThread() { 2024 return thread; 2025 } 2026 2027 protected void shutdown() {} 2028 2029 protected void close() {} 2030 } 2031 2032 2036 private static class MessageListenerContext { 2037 SingleSessionConsumer consumerListener; 2038 ConsumerMessages messages; 2039 2040 MessageListenerContext( 2041 SingleSessionConsumer consumerListener, 2042 ConsumerMessages messages) { 2043 this.consumerListener = consumerListener; 2044 this.messages = messages; 2045 } 2046 } 2047} 2048 | Popular Tags |