1 45 package org.exolab.jms.client; 46 47 import java.io.Serializable ; 48 import java.util.ArrayList ; 49 import java.util.HashMap ; 50 import java.util.List ; 51 import java.util.Vector ; 52 import javax.jms.BytesMessage ; 53 import javax.jms.Connection ; 54 import javax.jms.Destination ; 55 import javax.jms.IllegalStateException ; 56 import javax.jms.InvalidDestinationException ; 57 import javax.jms.InvalidSelectorException ; 58 import javax.jms.JMSException ; 59 import javax.jms.MapMessage ; 60 import javax.jms.Message ; 61 import javax.jms.MessageConsumer ; 62 import javax.jms.MessageListener ; 63 import javax.jms.MessageProducer ; 64 import javax.jms.ObjectMessage ; 65 import javax.jms.Queue ; 66 import javax.jms.QueueBrowser ; 67 import javax.jms.Session ; 68 import javax.jms.StreamMessage ; 69 import javax.jms.TemporaryQueue ; 70 import javax.jms.TemporaryTopic ; 71 import javax.jms.TextMessage ; 72 import javax.jms.Topic ; 73 import javax.jms.TopicSubscriber ; 74 75 import org.apache.commons.logging.Log; 76 import org.apache.commons.logging.LogFactory; 77 78 import org.exolab.jms.message.BytesMessageImpl; 79 import org.exolab.jms.message.MapMessageImpl; 80 import org.exolab.jms.message.MessageConverter; 81 import org.exolab.jms.message.MessageConverterFactory; 82 import org.exolab.jms.message.MessageImpl; 83 import org.exolab.jms.message.MessageSessionIfc; 84 import org.exolab.jms.message.ObjectMessageImpl; 85 import org.exolab.jms.message.StreamMessageImpl; 86 import org.exolab.jms.message.TextMessageImpl; 87 import org.exolab.jms.server.ServerSession; 88 89 90 97 class JmsSession implements Session , JmsMessageListener, MessageSessionIfc { 98 99 102 private JmsConnection _connection; 103 104 107 private ServerSession _session = null; 108 109 112 private volatile boolean _closed = false; 113 114 117 private volatile boolean _closing = false; 118 119 123 private volatile boolean _stopped = true; 124 125 131 private final boolean _transacted; 132 133 140 private final int _ackMode; 141 142 146 private HashMap _consumers = new HashMap (); 147 148 151 private List _producers = new ArrayList (); 152 153 157 private List _messagesToSend = new ArrayList (); 158 159 163 private MessageListener _listener = null; 164 165 169 private Vector _messageCache = new Vector (); 170 171 175 private final Object _receiveLock = new Object (); 176 177 180 private static final Log _log = LogFactory.getLog(JmsSession.class); 181 182 183 196 public JmsSession(JmsConnection connection, boolean transacted, 197 int ackMode) throws JMSException { 198 if (connection == null) { 199 throw new IllegalArgumentException ("Argument 'connection' is null"); 200 } 201 202 _connection = connection; 203 _transacted = transacted; 204 _ackMode = ackMode; 205 206 _session = connection.getServerConnection().createSession(_ackMode, 208 transacted); 209 210 _session.setMessageListener(this); 212 213 if (!connection.isStopped()) { 215 start(); 216 } 217 } 218 219 227 public BytesMessage createBytesMessage() throws JMSException { 228 ensureOpen(); 229 return new BytesMessageImpl(); 230 } 231 232 241 public MapMessage createMapMessage() throws JMSException { 242 ensureOpen(); 243 return new MapMessageImpl(); 244 } 245 246 255 public Message createMessage() throws JMSException { 256 ensureOpen(); 257 return new MessageImpl(); 258 } 259 260 268 public ObjectMessage createObjectMessage() throws JMSException { 269 ensureOpen(); 270 return new ObjectMessageImpl(); 271 } 272 273 282 public ObjectMessage createObjectMessage(Serializable object) 283 throws JMSException { 284 ensureOpen(); 285 ObjectMessageImpl result = new ObjectMessageImpl(); 286 result.setObject(object); 287 return result; 288 } 289 290 298 public StreamMessage createStreamMessage() throws JMSException { 299 ensureOpen(); 300 return new StreamMessageImpl(); 301 } 302 303 311 public TextMessage createTextMessage() throws JMSException { 312 ensureOpen(); 313 return new TextMessageImpl(); 314 } 315 316 325 public TextMessage createTextMessage(String text) throws JMSException { 326 ensureOpen(); 327 TextMessageImpl result = new TextMessageImpl(); 328 result.setText(text); 329 return result; 330 } 331 332 338 public boolean getTransacted() throws JMSException { 339 ensureOpen(); 340 return _transacted; 341 } 342 343 355 public int getAcknowledgeMode() throws JMSException { 356 ensureOpen(); 357 return _ackMode; 358 } 359 360 373 public MessageProducer createProducer(Destination destination) 374 throws JMSException { 375 return new JmsMessageProducer(this, destination); 376 } 377 378 387 388 public MessageConsumer createConsumer(Destination destination) 389 throws JMSException { 390 return createConsumer(destination, null); 391 } 392 393 409 public MessageConsumer createConsumer(Destination destination, 410 String messageSelector) 411 throws JMSException { 412 return createConsumer(destination, messageSelector, false); 413 } 414 415 442 public MessageConsumer createConsumer(Destination destination, 443 String messageSelector, 444 boolean noLocal) throws JMSException { 445 long consumerId = allocateConsumer(destination, messageSelector, 446 noLocal); 447 JmsMessageConsumer consumer = new JmsMessageConsumer(this, consumerId, 448 destination, 449 messageSelector); 450 addConsumer(consumer); 451 return consumer; 452 } 453 454 473 public Queue createQueue(String queueName) throws JMSException { 474 ensureOpen(); 475 476 JmsQueue queue = null; 477 478 if (queueName != null && queueName.length() > 0) { 479 queue = new JmsQueue(queueName); 480 } else { 481 throw new JMSException ( 482 "Cannot create a queue with null or empty name"); 483 } 484 485 return queue; 486 } 487 488 507 public Topic createTopic(String topicName) throws JMSException { 508 ensureOpen(); 509 510 JmsTopic topic = null; 511 512 if (topicName != null && topicName.length() > 0) { 513 topic = new JmsTopic(topicName); 514 } else { 515 throw new JMSException ("Invalid or null topic name specified"); 516 } 517 518 return topic; 519 } 520 521 554 public TopicSubscriber createDurableSubscriber(Topic topic, String name) 555 throws JMSException { 556 return createDurableSubscriber(topic, name, null, false); 557 } 558 559 599 public TopicSubscriber createDurableSubscriber(Topic topic, String name, 600 String messageSelector, 601 boolean noLocal) 602 throws JMSException { 603 ensureOpen(); 604 605 if (topic == null) { 606 throw new InvalidDestinationException ( 607 "Cannot create durable subscriber: argument 'topic' is " 608 + " null"); 609 } 610 if (name == null || name.trim().length() == 0) { 611 throw new JMSException ("Invalid subscription name specified"); 612 } 613 614 if (((JmsTopic) topic).isTemporaryDestination()) { 617 throw new InvalidDestinationException ( 618 "Cannot create a durable subscriber for a temporary topic"); 619 } 620 621 long consumerId = _session.createDurableConsumer((JmsTopic) topic, name, 622 messageSelector, 623 noLocal); 624 JmsTopicSubscriber subscriber = new JmsTopicSubscriber(this, 625 consumerId, 626 topic, 627 messageSelector, 628 noLocal); 629 addConsumer(subscriber); 630 631 return subscriber; 632 } 633 634 644 public QueueBrowser createBrowser(Queue queue) throws JMSException { 645 return createBrowser(queue, null); 646 } 647 648 663 public QueueBrowser createBrowser(Queue queue, String messageSelector) 664 throws JMSException { 665 ensureOpen(); 666 if (!(queue instanceof JmsQueue)) { 667 throw new InvalidDestinationException ("Cannot create QueueBrowser for destination=" 668 + queue); 669 } 670 671 JmsQueue dest = (JmsQueue) queue; 672 if (!checkForValidTemporaryDestination(dest)) { 675 throw new InvalidDestinationException ( 676 "Cannot create a queue browser for a temporary queue " 677 + "that is not bound to this connection"); 678 } 679 680 long consumerId = _session.createBrowser(dest, messageSelector); 681 JmsQueueBrowser browser = new JmsQueueBrowser(this, consumerId, queue, 682 messageSelector); 683 addConsumer(browser); 684 return browser; 685 } 686 687 695 public TemporaryQueue createTemporaryQueue() throws JMSException { 696 ensureOpen(); 697 698 JmsTemporaryQueue queue = new JmsTemporaryQueue(); 699 queue.setOwningConnection(getConnection()); 700 return queue; 701 } 702 703 711 public TemporaryTopic createTemporaryTopic() throws JMSException { 712 ensureOpen(); 713 714 JmsTemporaryTopic topic = new JmsTemporaryTopic(); 715 topic.setOwningConnection(getConnection()); 716 return topic; 717 } 718 719 738 public void unsubscribe(String name) throws JMSException { 739 ensureOpen(); 740 _session.unsubscribe(name); 741 } 742 743 748 public synchronized void commit() throws JMSException { 749 ensureOpen(); 750 ensureTransactional(); 751 752 getServerSession().send(_messagesToSend); 754 _messagesToSend.clear(); 755 756 getServerSession().commit(); 758 } 759 760 765 public synchronized void rollback() throws JMSException { 766 ensureOpen(); 767 ensureTransactional(); 768 769 _messagesToSend.clear(); 771 772 getServerSession().rollback(); 774 } 775 776 783 public synchronized void close() throws JMSException { 784 if (!_closed) { 785 _closing = true; 786 787 stop(); 789 790 notifyConsumers(); 792 793 JmsMessageProducer[] producers = 796 (JmsMessageProducer[]) _producers.toArray( 797 new JmsMessageProducer[0]); 798 for (int i = 0; i < producers.length; ++i) { 799 JmsMessageProducer producer = producers[i]; 800 producer.close(); 801 } 802 803 JmsMessageConsumer[] consumers = 806 (JmsMessageConsumer[]) _consumers.values().toArray( 807 new JmsMessageConsumer[0]); 808 for (int i = 0; i < consumers.length; ++i) { 809 JmsMessageConsumer consumer = consumers[i]; 810 consumer.close(); 811 } 812 813 _connection.removeSession(this); 815 _connection = null; 816 817 _messagesToSend.clear(); 819 820 getServerSession().close(); 823 _session = null; 824 825 _closed = true; 827 _closing = false; 828 } 829 } 830 831 837 public synchronized void recover() throws JMSException { 838 ensureOpen(); 839 if (!_transacted) { 840 getServerSession().recover(); 842 } else { 843 throw new IllegalStateException ( 844 "Cannot recover from a transacted session"); 845 } 846 } 847 848 855 public MessageListener getMessageListener() throws JMSException { 856 ensureOpen(); 857 return _listener; 858 } 859 860 866 public void setMessageListener(MessageListener listener) 867 throws JMSException { 868 ensureOpen(); 869 _listener = listener; 870 } 871 872 876 public void run() { 877 try { 878 while (!_messageCache.isEmpty()) { 879 Message message = (Message ) _messageCache.remove(0); 880 _listener.onMessage(message); 881 } 882 } catch (Exception exception) { 883 _log.error("Error in the Session.run()", exception); 884 } finally { 885 _messageCache.clear(); 887 } 888 } 889 890 899 public void setMessageListener(JmsMessageConsumer listener) 900 throws JMSException { 901 ensureOpen(); 902 enableAsynchronousDelivery(listener.getConsumerId(), true); 903 } 904 905 911 public void removeMessageListener(JmsMessageConsumer listener) 912 throws JMSException { 913 914 ensureOpen(); 915 enableAsynchronousDelivery(listener.getConsumerId(), false); 916 } 917 918 924 public void start() throws JMSException { 925 ensureOpen(); 926 if (_stopped) { 927 getServerSession().start(); 928 _stopped = false; 929 930 notifyConsumers(); 932 } 933 } 934 935 941 public void stop() throws JMSException { 942 ensureOpen(); 943 if (!_stopped) { 944 getServerSession().stop(); 945 _stopped = true; 946 947 notifyConsumers(); 949 } 950 } 951 952 962 public void acknowledgeMessage(Message message) throws JMSException { 963 ensureOpen(); 964 if (_ackMode == Session.CLIENT_ACKNOWLEDGE) { 965 MessageImpl impl = (MessageImpl) message; 966 getServerSession().acknowledgeMessage(impl.getConsumerId(), 967 impl.getAckMessageID()); 968 } 969 } 970 971 980 public void enableAsynchronousDelivery(long consumerId, boolean enable) 981 throws JMSException { 982 983 ensureOpen(); 984 getServerSession().enableAsynchronousDelivery(consumerId, enable); 985 } 986 987 992 public void onMessage(Message message) { 993 if (message != null) { 994 MessageImpl impl = (MessageImpl) message; 995 impl.setJMSXRcvTimestamp(System.currentTimeMillis()); 996 997 execute(message); 999 } 1000 } 1001 1002 1008 public void onMessageAvailable(long consumerId) { 1009 notifyConsumers(); 1011 } 1012 1013 1025 public synchronized void execute(Object object) { 1026 if (_closed) { 1028 _log.error("Received a message for a closed session"); 1029 return; 1030 } 1031 1032 MessageImpl message = (MessageImpl) object; 1033 long consumerId = message.getConsumerId(); 1034 JmsMessageConsumer consumer = 1035 (JmsMessageConsumer) _consumers.get(new Long (consumerId)); 1036 1037 message.setSession(this); 1039 if (consumer != null) { 1040 if (_listener != null) { 1045 _listener.onMessage(message); 1046 } else { 1047 consumer.onMessage(message); 1049 } 1050 } else { 1051 _log.error("Received a message for an inactive consumer"); 1053 } 1054 } 1055 1056 1070 public Message retrieveMessage(long consumerId, long wait) 1071 throws JMSException { 1072 1073 ensureOpen(); 1074 1075 boolean breakOnNextRead = false; 1076 long start = System.currentTimeMillis(); 1077 long end = start + wait; 1078 MessageImpl message = null; 1079 while (true) { 1080 synchronized (_receiveLock) { 1081 if (_closing || _closed) { 1082 break; 1085 } else if (_stopped) { 1086 } else { 1089 message = (MessageImpl) getServerSession().receive( 1091 consumerId, wait); 1092 } 1093 if (message != null) { 1094 message.setSession(this); 1095 break; 1096 } else { 1097 if (breakOnNextRead) { 1099 break; 1100 } 1101 1102 if (wait >= 0) { 1108 try { 1109 if (wait > 0) { 1110 _receiveLock.wait(wait); 1112 long current = System.currentTimeMillis(); 1113 if (current >= end) { 1114 breakOnNextRead = true; 1115 } else { 1116 wait = end - current; 1119 if (wait == 0) { 1120 breakOnNextRead = true; 1121 } 1122 } 1123 } else { 1124 _receiveLock.wait(); 1126 } 1127 } catch (InterruptedException ignore) { 1128 } 1130 } else { 1131 break; 1134 } 1135 } 1136 } 1137 } 1138 1139 return message; 1140 } 1141 1142 1150 public synchronized List browse(long consumerId, int count) 1151 throws JMSException { 1152 ensureOpen(); 1153 return getServerSession().browse(consumerId, count); 1154 } 1155 1156 1162 protected synchronized void sendMessage(Message message) 1163 throws JMSException { 1164 1165 if (_transacted) { 1166 if (message instanceof MessageImpl) { 1169 try { 1170 message = (Message ) ((MessageImpl) message).clone(); 1171 } catch (CloneNotSupportedException error) { 1172 throw new JMSException (error.getMessage()); 1173 } 1174 } else { 1175 message = convert(message); 1176 } 1177 _messagesToSend.add(message); 1178 } else { 1179 if (!(message instanceof MessageImpl)) { 1180 message = convert(message); 1181 } 1182 getServerSession().send((MessageImpl) message); 1183 } 1184 } 1185 1186 1191 protected ServerSession getServerSession() { 1192 return _session; 1193 } 1194 1195 1200 protected JmsConnection getConnection() { 1201 return _connection; 1202 } 1203 1204 1220 protected long allocateConsumer(Destination destination, 1221 String selector, boolean noLocal) 1222 throws JMSException { 1223 ensureOpen(); 1224 1225 if (!(destination instanceof JmsDestination)) { 1226 throw new InvalidDestinationException ( 1227 "Cannot create MessageConsumer for destination=" 1228 + destination); 1229 } 1230 JmsDestination dest = (JmsDestination) destination; 1231 1232 if (!checkForValidTemporaryDestination(dest)) { 1235 throw new InvalidDestinationException ( 1236 "Trying to create a MessageConsumer for a temporary " 1237 + "destination that is not bound to this connection"); 1238 } 1239 1240 long consumerId = _session.createConsumer(dest, selector, noLocal); 1241 return consumerId; 1242 } 1243 1244 1253 protected boolean checkForValidTemporaryDestination( 1254 JmsDestination destination) { 1255 boolean result = false; 1256 1257 if (destination.isTemporaryDestination()) { 1258 JmsTemporaryDestination temp = 1259 (JmsTemporaryDestination) destination; 1260 1261 if (temp.validForConnection(getConnection())) { 1264 result = true; 1265 } 1266 } else { 1267 result = true; 1268 } 1269 1270 return result; 1271 } 1272 1273 1278 protected void addConsumer(JmsMessageConsumer consumer) { 1279 _consumers.put(new Long (consumer.getConsumerId()), consumer); 1280 } 1281 1282 1288 protected void removeConsumer(JmsMessageConsumer consumer) 1289 throws JMSException { 1290 long consumerId = consumer.getConsumerId(); 1291 try { 1292 if (!(consumer instanceof JmsQueueBrowser)) { 1293 removeMessageListener(consumer); 1294 } 1295 _session.removeConsumer(consumerId); 1296 } finally { 1297 _consumers.remove(new Long (consumerId)); 1298 } 1299 } 1300 1301 1306 protected void addProducer(JmsMessageProducer producer) { 1307 _producers.add(producer); 1308 } 1309 1310 1315 protected void removeProducer(JmsMessageProducer producer) { 1316 _producers.remove(producer); 1317 } 1318 1319 1324 protected final boolean isClosed() { 1325 return _closed; 1326 } 1327 1328 1334 protected void addMessage(Message message) { 1335 _messageCache.add(message); 1336 } 1337 1338 1343 protected void ensureOpen() throws IllegalStateException { 1344 if (_closed) { 1345 throw new IllegalStateException ( 1346 "Cannot perform operation - session has been closed"); 1347 } 1348 } 1349 1350 1355 private void ensureTransactional() throws IllegalStateException { 1356 if (!_transacted) { 1357 throw new IllegalStateException ( 1358 "Cannot perform operatiorn - session is not transactional"); 1359 } 1360 } 1361 1362 1365 private void notifyConsumers() { 1366 synchronized (_receiveLock) { 1367 _receiveLock.notifyAll(); 1368 } 1369 } 1370 1371 1378 private Message convert(Message message) throws JMSException { 1379 MessageConverter converter = 1380 MessageConverterFactory.create(message); 1381 return converter.convert(message); 1382 } 1383 1384} 1385 1386 | Popular Tags |