1 45 package org.exolab.jms.server; 46 47 import java.rmi.RemoteException ; 48 import java.util.ArrayList ; 49 import java.util.HashMap ; 50 import java.util.Iterator ; 51 import java.util.List ; 52 import java.util.Vector ; 53 import javax.jms.DeliveryMode ; 54 import javax.jms.InvalidDestinationException ; 55 import javax.jms.JMSException ; 56 import javax.jms.Session ; 57 import javax.transaction.xa.XAException ; 58 import javax.transaction.xa.XAResource ; 59 import javax.transaction.xa.Xid ; 60 61 import org.apache.commons.logging.Log; 62 import org.apache.commons.logging.LogFactory; 63 64 import org.exolab.jms.client.JmsDestination; 65 import org.exolab.jms.client.JmsMessageListener; 66 import org.exolab.jms.client.JmsQueue; 67 import org.exolab.jms.client.JmsTopic; 68 import org.exolab.jms.message.MessageImpl; 69 import org.exolab.jms.messagemgr.ConsumerEndpoint; 70 import org.exolab.jms.messagemgr.ConsumerManager; 71 import org.exolab.jms.messagemgr.DestinationManager; 72 import org.exolab.jms.messagemgr.ConsumerEndpointListener; 73 import org.exolab.jms.messagemgr.MessageHandle; 74 import org.exolab.jms.messagemgr.MessageMgr; 75 import org.exolab.jms.messagemgr.QueueBrowserEndpoint; 76 import org.exolab.jms.messagemgr.ResourceManager; 77 import org.exolab.jms.messagemgr.ResourceManagerException; 78 import org.exolab.jms.server.ServerSession; 79 import org.exolab.jms.server.JmsServerConnection; 80 import org.exolab.jms.server.SentMessageCache; 81 82 83 101 public class JmsServerSession 102 implements ServerSession, ConsumerEndpointListener, XAResource { 103 104 108 private JmsServerConnection _connection = null; 109 110 113 private HashMap _consumers = new HashMap (); 114 115 119 private JmsMessageListener _listener = null; 120 121 124 private int _ackMode = Session.AUTO_ACKNOWLEDGE; 125 126 129 private boolean _transacted = false; 130 131 135 private Xid _xid = null; 136 137 140 private boolean _stopped = true; 141 142 145 private boolean _closed = false; 146 147 150 private SentMessageCache _sentMessageCache; 151 152 155 private static final Log _log = LogFactory.getLog(JmsServerSession.class); 156 157 158 165 public JmsServerSession(JmsServerConnection connection, int ackMode, 166 boolean transacted) { 167 _connection = connection; 168 _ackMode = ackMode; 169 _transacted = transacted; 170 _stopped = true; 171 _sentMessageCache = new SentMessageCache(this); 172 } 173 174 179 public long getConnectionId() { 180 return _connection.getConnectionId(); 181 } 182 183 190 public void acknowledgeMessage(long consumerId, String messageId) 191 throws JMSException { 192 _sentMessageCache.acknowledgeMessage(messageId, consumerId); 193 } 194 195 201 public void send(MessageImpl message) throws JMSException { 202 if (message == null) { 203 throw new JMSException ("Message is null"); 204 } 205 206 try { 207 checkDeliveryMode((MessageImpl) message); 209 210 ((MessageImpl) message).setConnectionId(_connection.getConnectionId()); 213 214 if (_xid != null) { 218 ResourceManager.instance().logPublishedMessage(_xid, 219 (MessageImpl) message); 220 } else { 221 MessageMgr.instance().add((MessageImpl) message); 222 } 223 } catch (JMSException exception) { 224 _log.error("Failed to process message", exception); 225 throw exception; 226 } catch (OutOfMemoryError exception) { 227 String msg = 228 "Failed to process message due to out-of-memory error"; 229 _log.error(msg, exception); 230 throw new JMSException (msg); 231 } catch (Exception exception) { 232 String msg = "Failed to process message"; 233 _log.error(msg, exception); 234 throw new JMSException (msg); 235 } 236 } 237 238 244 public void send(List messages) throws JMSException { 245 if (messages == null) { 246 throw new JMSException ("Argument 'messages' is null"); 247 } 248 249 Iterator iterator = messages.iterator(); 250 while (iterator.hasNext()) { 251 MessageImpl message = (MessageImpl) iterator.next(); 252 send(message); 253 } 254 } 255 256 269 public MessageImpl receive(long consumerId, long wait) 270 throws JMSException { 271 MessageImpl message = null; 272 ConsumerEndpoint consumer = getConsumerEndpoint(consumerId); 273 if (consumer == null) { 274 throw new JMSException ("Can't receive message: no consumer registered with " 275 + "identifier " 276 + consumerId 277 + " on session"); 278 } 279 280 MessageHandle handle = consumer.receive(wait); 282 283 if (handle != null) { 284 MessageImpl orig = handle.getMessage(); 286 if (orig != null) { 287 try { 289 message = (MessageImpl) orig.clone(); 290 message.setJMSRedelivered(handle.getDelivered()); 291 message.setConsumerId(handle.getConsumerId()); 292 } catch (Exception exception) { 293 _log.error(exception); 294 } 295 } 296 } 297 298 if (message != null) { 302 _sentMessageCache.process(handle); 303 304 if (_xid != null) { 305 try { 306 ResourceManager.instance().logReceivedMessage(_xid, 307 consumer.getId(), 308 handle); 309 } catch (Exception exception) { 310 _log.error(exception); 311 JMSException error = new JMSException ("Error in receive"); 312 error.setLinkedException(exception); 313 throw error; 314 } 315 } 316 } 317 318 return message; 319 } 320 321 329 public List browse(long consumerId, int count) throws JMSException { 330 ConsumerEndpoint consumer = getConsumerEndpoint(consumerId); 331 if (consumer == null) { 332 throw new JMSException ("Can't browse messages: no browser registered with " 333 + "identifier " 334 + consumerId 335 + " on session"); 336 } 337 if (!(consumer instanceof QueueBrowserEndpoint)) { 338 throw new JMSException ("Can't browse messages: invalid consumer"); 339 } 340 341 Vector handles = ((QueueBrowserEndpoint) consumer).receiveMessages( 342 count); 343 List messages = new ArrayList (count); 344 345 Iterator iterator = handles.iterator(); 346 while (iterator.hasNext()) { 347 MessageHandle handle = (MessageHandle) iterator.next(); 348 MessageImpl orig = handle.getMessage(); 349 if (orig != null) { 350 try { 352 MessageImpl message = (MessageImpl) orig.clone(); 353 message.setJMSRedelivered(handle.getDelivered()); 354 message.setConsumerId(handle.getConsumerId()); 355 messages.add(message); 356 } catch (Exception exception) { 357 _log.error(exception); 358 } 359 if (messages.size() == count) { 360 break; 361 } 362 } 363 } 364 return messages; 365 } 366 367 379 public long createConsumer(JmsDestination destination, String selector, 380 boolean noLocal) throws JMSException { 381 if (_log.isDebugEnabled()) { 382 _log.debug("createConsumer(destination=" + destination 383 + ", selector=" + selector + ", noLocal=" + noLocal 384 + ") [session=" + this + "]"); 385 } 386 387 if (destination == null) { 388 throw new InvalidDestinationException ( 389 "Cannot create MessageConsumer for null destination"); 390 } 391 392 ConsumerEndpoint consumer = 395 ConsumerManager.instance().createConsumerEndpoint(this, 396 destination, 397 selector, noLocal); 398 final long id = consumer.getId(); 399 consumer.setStopped(_stopped); 400 _consumers.put(new Long (id), consumer); 401 return id; 402 } 403 404 419 public long createDurableConsumer(JmsTopic topic, String name, 420 String selector, boolean noLocal) 421 throws JMSException { 422 if (_log.isDebugEnabled()) { 423 _log.debug("createDurableConsumer(topic=" + topic + ", name=" 424 + name 425 + ", selector=" + selector + ", noLocal=" + noLocal 426 + ") [session=" + this + "]"); 427 } 428 429 if (topic == null || topic.isTemporaryDestination()) { 430 throw new InvalidDestinationException ("Invalid topic: " + topic); 431 } 432 433 if (name == null) { 434 throw new InvalidDestinationException ("Invalid subscription name"); 435 } 436 437 ConsumerManager manager = ConsumerManager.instance(); 438 439 if (manager.durableConsumerExists(name)) { 440 if (!manager.validSubscription(topic.getName(), name)) { 446 unsubscribe(name); 447 manager.createDurableConsumer(topic, name); 448 } 449 } else { 450 manager.createDurableConsumer(topic, name); 453 } 454 455 ConsumerEndpoint consumer = manager.createDurableConsumerEndpoint(this, 459 topic, 460 name, 461 noLocal, 462 selector); 463 final long id = consumer.getId(); 464 consumer.setStopped(_stopped); 465 _consumers.put(new Long (id), consumer); 466 return id; 467 } 468 469 478 public long createBrowser(JmsQueue queue, String selector) 479 throws JMSException { 480 if (_log.isDebugEnabled()) { 481 _log.debug("createBrowser(queue=" + queue + ", selector=" 482 + selector 483 + ") [session=" + this + "]"); 484 } 485 486 if (queue == null) { 487 throw new JMSException ("Cannot create QueueBrowser for null queue"); 488 } 489 490 ConsumerEndpoint consumer = 491 ConsumerManager.instance().createQueueBrowserEndpoint(this, 492 queue, 493 selector); 494 495 final long id = consumer.getId(); 496 consumer.setStopped(_stopped); 497 _consumers.put(new Long (id), consumer); 498 return id; 499 } 500 501 508 public void removeConsumer(long consumerId) throws JMSException { 509 if (_log.isDebugEnabled()) { 510 _log.debug("removeConsumer(consumerId=" + consumerId 511 + ") [session=" 512 + this + "]"); 513 } 514 515 ConsumerEndpoint consumer = 516 (ConsumerEndpoint) _consumers.remove(new Long (consumerId)); 517 if (consumer == null) { 518 throw new JMSException ("No consuemr with id=" + consumerId); 519 } 520 521 ConsumerManager.instance().deleteConsumerEndpoint(consumer); 523 } 524 525 531 public void unsubscribe(String name) throws JMSException { 532 if (_log.isDebugEnabled()) { 533 _log.debug("unsubscribe(name=" + name + ") [session=" + this + "]"); 534 } 535 536 ConsumerManager manager = ConsumerManager.instance(); 537 538 if (!manager.durableConsumerExists(name)) { 541 throw new InvalidDestinationException ( 542 name + " is not a durable subscriber name"); 543 } 544 545 if (!manager.isDurableConsumerActive(name)) { 548 manager.removeDurableConsumer(name); 549 } else { 550 throw new JMSException ("Failed to unsubscribe subscriber " 551 + name + " since is still active"); 552 } 553 } 554 555 558 public void start() { 559 if (_log.isDebugEnabled()) { 560 _log.debug("start() [session=" + this + "]"); 561 } 562 563 if (_stopped) { 564 pause(false); 565 _stopped = false; 566 } 567 } 568 569 572 public void stop() { 573 if (_log.isDebugEnabled()) { 574 _log.debug("stop() [session=" + this + "]"); 575 } 576 if (!_stopped) { 577 pause(true); 578 _stopped = true; 579 } 580 } 581 582 588 public void setMessageListener(JmsMessageListener listener) { 589 _listener = listener; 590 } 591 592 600 public void enableAsynchronousDelivery(long consumerId, boolean enable) 601 throws JMSException { 602 ConsumerEndpoint consumer = getConsumerEndpoint(consumerId); 603 if (consumer == null) { 604 throw new JMSException (consumerId + " is not registered"); 605 } 606 607 if (enable) { 608 consumer.setMessageListener(this); 609 } else { 610 consumer.setMessageListener(null); 611 } 612 } 613 614 619 public void close() throws JMSException { 620 boolean closed = false; 621 622 synchronized (this) { 623 closed = _closed; 624 if (!closed) { 625 _closed = true; 626 } 627 } 628 629 if (!closed) { 630 if (_log.isDebugEnabled()) { 631 _log.debug("close() [session=" + this + "]"); 632 } 633 634 setMessageListener(null); 636 637 Iterator consumers = _consumers.values().iterator(); 640 while (consumers.hasNext()) { 641 ConsumerEndpoint consumer = (ConsumerEndpoint) consumers.next(); 642 ConsumerManager.instance().deleteConsumerEndpoint(consumer); 643 } 644 645 _sentMessageCache.clear(); 647 648 _consumers.clear(); 650 651 _connection.closed(this); 653 } else { 654 if (_log.isDebugEnabled()) { 655 _log.debug("close() [session=" + this + 656 "]: session already closed"); 657 } 658 } 659 } 660 661 668 public void onMessage(MessageHandle handle) throws JMSException , 669 RemoteException { 670 if (_listener != null) { 671 MessageImpl message = handle.getMessage(); 672 MessageImpl m = null; 673 674 if (message != null) { 676 try { 677 m = (MessageImpl) message.clone(); 678 } catch (CloneNotSupportedException exception) { 679 throw new JMSException (exception.toString()); 680 } 681 682 m.setConsumerId(handle.getConsumerId()); 683 m.setJMSRedelivered(handle.getDelivered()); 684 685 if (_transacted || (_ackMode == Session.CLIENT_ACKNOWLEDGE)) { 691 _sentMessageCache.process(handle); 692 } 693 694 try { 695 _listener.onMessage(m); 697 698 if (!_transacted && 702 (_ackMode != Session.CLIENT_ACKNOWLEDGE)) { 703 _sentMessageCache.process(handle); 704 } 705 } catch (RemoteException exception) { 706 close(); 708 throw exception; 709 } 710 } 711 } else { 712 _log.error("Failed to stop async consumer endpoints?"); 713 } 714 } 715 716 722 public void onMessageAvailable(long consumerId) throws RemoteException { 723 _listener.onMessageAvailable(consumerId); 724 } 725 726 734 public void recover() throws JMSException { 735 stop(); 737 738 _sentMessageCache.clear(); 740 741 start(); 743 } 744 745 751 public void commit() throws JMSException { 752 try { 753 _sentMessageCache.acknowledgeAllMessages(); 754 } catch (OutOfMemoryError exception) { 755 String msg = 756 "Failed to commit transaction due to out-of-memory error"; 757 _log.error(msg, exception); 758 throw new JMSException (msg); 759 } 760 } 761 762 768 public void rollback() throws JMSException { 769 _sentMessageCache.clear(); 770 } 771 772 public void start(Xid xid, int flags) throws XAException { 774 try { 775 ResourceManager.instance().start(xid, flags); 776 777 _xid = xid; 779 } catch (ResourceManagerException exception) { 780 throw new XAException ("Failed in start " + exception); 781 } 782 } 783 784 public int prepare(Xid xid) throws XAException { 786 try { 787 return ResourceManager.instance().prepare(xid); 788 } catch (ResourceManagerException exception) { 789 throw new XAException ("Failed in prepare " + exception); 790 } 791 } 792 793 public void commit(Xid xid, boolean onePhase) throws XAException { 795 try { 796 ResourceManager.instance().commit(xid, onePhase); 797 } catch (ResourceManagerException exception) { 798 throw new XAException ("Failed in commit " + exception); 799 } finally { 800 _xid = null; 801 } 802 } 803 804 public void end(Xid xid, int flags) throws XAException { 806 try { 807 ResourceManager.instance().end(xid, flags); 808 } catch (ResourceManagerException exception) { 809 throw new XAException ("Failed in end " + exception); 810 } finally { 811 _xid = null; 812 } 813 } 814 815 public void forget(Xid xid) throws XAException { 817 try { 818 ResourceManager.instance().forget(xid); 819 } catch (ResourceManagerException exception) { 820 throw new XAException ("Failed in forget " + exception); 821 } finally { 822 _xid = null; 823 } 824 } 825 826 public Xid [] recover(int flag) throws XAException { 828 try { 829 return ResourceManager.instance().recover(flag); 830 } catch (ResourceManagerException exception) { 831 throw new XAException ("Failed in recover " + exception); 832 } 833 } 834 835 public void rollback(Xid xid) throws XAException { 837 try { 838 ResourceManager.instance().rollback(xid); 839 } catch (ResourceManagerException exception) { 840 throw new XAException ("Failed in rollback " + exception); 841 } finally { 842 _xid = null; 844 } 845 } 846 847 public int getTransactionTimeout() throws XAException { 849 try { 850 return ResourceManager.instance().getTransactionTimeout(); 851 } catch (ResourceManagerException exception) { 852 throw new XAException ("Failed in getTransactionTimeout " + 853 exception); 854 } 855 } 856 857 public boolean isSameRM(XAResource xares) throws XAException { 859 return true; 860 } 861 862 863 public boolean setTransactionTimeout(int seconds) throws XAException { 865 try { 866 return ResourceManager.instance().setTransactionTimeout(seconds); 867 } catch (ResourceManagerException exception) { 868 throw new XAException ("Failed in setTransactionTimeout " 869 + exception); 870 } 871 } 872 873 879 public Xid getXid() { 880 return _xid; 881 } 882 883 890 public String getResourceManagerId() throws XAException { 891 try { 892 return ResourceManager.instance().getResourceManagerId(); 893 } catch (ResourceManagerException exception) { 894 throw new XAException ("Failed in getResourceManagerId " 895 + exception); 896 } 897 } 898 899 904 public boolean isTransacted() { 905 return _transacted; 906 } 907 908 911 public int getAckMode() { 912 return _ackMode; 913 } 914 915 922 public ConsumerEndpoint getConsumerEndpoint(long consumerId) { 923 return (ConsumerEndpoint) _consumers.get(new Long (consumerId)); 924 } 925 926 932 private void pause(boolean stop) { 933 Iterator iter = _consumers.values().iterator(); 934 while (iter.hasNext()) { 935 ((ConsumerEndpoint) iter.next()).setStopped(stop); 936 } 937 } 938 939 948 private void checkDeliveryMode(MessageImpl message) throws JMSException { 949 if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) 950 && 951 (!DestinationManager.instance() 952 .isMessageForAdministeredDestination(message))) { 953 message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); 954 } 955 } 956 957 } 958 | Popular Tags |