1 29 30 package com.caucho.jms.session; 31 32 import com.caucho.jms.AbstractDestination; 33 import com.caucho.jms.message.BytesMessageImpl; 34 import com.caucho.jms.message.MapMessageImpl; 35 import com.caucho.jms.message.MessageImpl; 36 import com.caucho.jms.message.ObjectMessageImpl; 37 import com.caucho.jms.message.StreamMessageImpl; 38 import com.caucho.jms.message.TextMessageImpl; 39 import com.caucho.log.Log; 40 import com.caucho.util.Alarm; 41 import com.caucho.util.L10N; 42 import com.caucho.util.ThreadPool; 43 import com.caucho.util.ThreadTask; 44 45 import javax.jms.*; 46 import javax.jms.IllegalStateException ; 47 import java.io.Serializable ; 48 import java.util.ArrayList ; 49 import java.util.logging.Level ; 50 import java.util.logging.Logger ; 51 52 55 public class SessionImpl implements Session, ThreadTask { 56 protected static final Logger log = Log.open(SessionImpl.class); 57 protected static final L10N L = new L10N(SessionImpl.class); 58 59 private static final long SHUTDOWN_WAIT_TIME = 10000; 60 61 private boolean _isTransacted; 62 private int _acknowledgeMode; 63 64 private ClassLoader _classLoader; 65 66 private ConnectionImpl _connection; 67 private ArrayList <MessageConsumerImpl> _consumers = 68 new ArrayList <MessageConsumerImpl>(); 69 private MessageListener _messageListener; 70 private boolean _isAsynchronous; 71 72 private Thread _thread; 74 75 private ArrayList <TransactedMessage> _transactedMessages; 77 78 private volatile boolean _isRunning; 79 private volatile boolean _isClosed; 80 private volatile boolean _hasMessage; 81 82 public SessionImpl(ConnectionImpl connection, 83 boolean isTransacted, int ackMode) 84 throws JMSException 85 { 86 _classLoader = Thread.currentThread().getContextClassLoader(); 87 88 _connection = connection; 89 _isTransacted = isTransacted; 90 if (isTransacted) 91 _acknowledgeMode = SESSION_TRANSACTED; 92 else { 93 switch (ackMode) { 94 case CLIENT_ACKNOWLEDGE: 95 case DUPS_OK_ACKNOWLEDGE: 96 case AUTO_ACKNOWLEDGE: 97 _acknowledgeMode = ackMode; 98 break; 99 default: 100 throw new JMSException(L.l("{0} is an illegal acknowledge mode", 101 ackMode)); 102 } 103 } 104 105 _connection.addSession(this); 106 } 107 108 111 ConnectionImpl getConnection() 112 { 113 return _connection; 114 } 115 116 119 public String getClientID() 120 throws JMSException 121 { 122 return _connection.getClientID(); 123 } 124 125 128 public boolean isActive() 129 { 130 return ! _isClosed && _connection.isActive(); 131 } 132 133 136 boolean isStopping() 137 { 138 return _connection.isStopping(); 139 } 140 141 144 public boolean getTransacted() 145 throws JMSException 146 { 147 checkOpen(); 148 149 return _isTransacted; 150 } 151 152 155 public int getAcknowledgeMode() 156 throws JMSException 157 { 158 checkOpen(); 159 160 return _acknowledgeMode; 161 } 162 163 166 public MessageListener getMessageListener() 167 throws JMSException 168 { 169 checkOpen(); 170 171 return _messageListener; 172 } 173 174 177 public void setMessageListener(MessageListener listener) 178 throws JMSException 179 { 180 checkOpen(); 181 182 _messageListener = listener; 183 setAsynchronous(); 184 } 185 186 189 void setAsynchronous() 190 { 191 boolean oldAsynchronous = _isAsynchronous; 192 193 _isAsynchronous = true; 194 195 notifyListener(); 196 } 197 198 201 boolean isAsynchronous() 202 { 203 return _isAsynchronous; 204 } 205 206 209 public BytesMessage createBytesMessage() 210 throws JMSException 211 { 212 checkOpen(); 213 214 return new BytesMessageImpl(); 215 } 216 217 220 public MapMessage createMapMessage() 221 throws JMSException 222 { 223 checkOpen(); 224 225 return new MapMessageImpl(); 226 } 227 228 231 public Message createMessage() 232 throws JMSException 233 { 234 checkOpen(); 235 236 return new MessageImpl(); 237 } 238 239 242 public ObjectMessage createObjectMessage() 243 throws JMSException 244 { 245 checkOpen(); 246 247 return new ObjectMessageImpl(); 248 } 249 250 255 public ObjectMessage createObjectMessage(Serializable obj) 256 throws JMSException 257 { 258 checkOpen(); 259 260 ObjectMessage msg = createObjectMessage(); 261 262 msg.setObject(obj); 263 264 return msg; 265 } 266 267 270 public StreamMessage createStreamMessage() 271 throws JMSException 272 { 273 checkOpen(); 274 275 return new StreamMessageImpl(); 276 } 277 278 281 public TextMessage createTextMessage() 282 throws JMSException 283 { 284 checkOpen(); 285 286 return new TextMessageImpl(); 287 } 288 289 292 public TextMessage createTextMessage(String message) 293 throws JMSException 294 { 295 checkOpen(); 296 297 TextMessage msg = createTextMessage(); 298 299 msg.setText(message); 300 301 return msg; 302 } 303 304 309 public MessageConsumer createConsumer(Destination destination) 310 throws JMSException 311 { 312 checkOpen(); 313 314 return createConsumer(destination, null, false); 315 } 316 317 323 public MessageConsumer createConsumer(Destination destination, 324 String messageSelector) 325 throws JMSException 326 { 327 checkOpen(); 328 329 return createConsumer(destination, messageSelector, false); 330 } 331 332 338 public MessageConsumer createConsumer(Destination destination, 339 String messageSelector, 340 boolean noLocal) 341 throws JMSException 342 { 343 checkOpen(); 344 345 AbstractDestination dest = (AbstractDestination) destination; 346 347 MessageConsumer consumer; 348 consumer = dest.createConsumer(this, messageSelector, noLocal); 349 350 addConsumer((MessageConsumerImpl) consumer); 351 352 return consumer; 353 } 354 355 360 public MessageProducer createProducer(Destination destination) 361 throws JMSException 362 { 363 checkOpen(); 364 365 AbstractDestination dest = (AbstractDestination) destination; 366 367 return dest.createProducer(this); 368 } 369 370 375 public QueueBrowser createBrowser(Queue queue) 376 throws JMSException 377 { 378 checkOpen(); 379 380 return createBrowser(queue, null); 381 } 382 383 388 public QueueBrowser createBrowser(Queue queue, String messageSelector) 389 throws JMSException 390 { 391 checkOpen(); 392 393 return ((AbstractDestination) queue).createBrowser(this, messageSelector); 394 } 395 396 399 public Queue createQueue(String queueName) 400 throws JMSException 401 { 402 checkOpen(); 403 404 return _connection.getConnectionFactory().createQueue(queueName); 405 } 406 407 410 public TemporaryQueue createTemporaryQueue() 411 throws JMSException 412 { 413 checkOpen(); 414 415 return new TemporaryQueueImpl(); 416 } 417 418 421 public Topic createTopic(String topicName) 422 throws JMSException 423 { 424 checkOpen(); 425 426 return _connection.getConnectionFactory().createTopic(topicName); 427 } 428 429 432 public TemporaryTopic createTemporaryTopic() 433 throws JMSException 434 { 435 checkOpen(); 436 437 return new TemporaryTopicImpl(); 438 } 439 440 445 public TopicSubscriber createDurableSubscriber(Topic topic, String name) 446 throws JMSException 447 { 448 checkOpen(); 449 450 if (getClientID() == null) 451 throw new JMSException(L.l("connection may not create a durable subscriber because it does not have an assigned ClientID.")); 452 453 return createDurableSubscriber(topic, name, null, false); 454 } 455 456 463 public TopicSubscriber createDurableSubscriber(Topic topic, 464 String name, 465 String messageSelector, 466 boolean noLocal) 467 throws JMSException 468 { 469 checkOpen(); 470 471 AbstractDestination topicImpl = (AbstractDestination) topic; 472 473 if (_connection.getDurableSubscriber(name) != null) 474 throw new JMSException(L.l("'{0}' is already an active durable subscriber", 475 name)); 476 477 TopicSubscriber consumer; 478 consumer = topicImpl.createDurableSubscriber(this, messageSelector, 479 noLocal, name); 480 481 _connection.putDurableSubscriber(name, consumer); 482 483 addConsumer((MessageConsumerImpl) consumer); 484 485 return consumer; 486 } 487 488 491 public void unsubscribe(String name) 492 throws JMSException 493 { 494 checkOpen(); 495 496 _connection.removeDurableSubscriber(name); 497 } 498 499 502 void start() 503 { 504 notifyListener(); 505 } 506 507 510 void stop() 511 { 512 synchronized (_consumers) { 513 _consumers.notifyAll(); 514 515 long timeout = Alarm.getCurrentTime() + SHUTDOWN_WAIT_TIME; 516 while (_isRunning && Alarm.getCurrentTime() < timeout) { 517 try { 518 _consumers.wait(SHUTDOWN_WAIT_TIME); 519 520 if (Alarm.isTest()) { 521 return; 522 } 523 } catch (Throwable e) { 524 } 525 } 526 } 527 } 528 529 532 public void commit() 533 throws JMSException 534 { 535 checkOpen(); 536 537 if (! _isTransacted) 538 throw new IllegalStateException (L.l("commit() can only be called on a transacted session.")); 539 540 541 ArrayList <TransactedMessage> messages = _transactedMessages; 542 if (messages != null) { 543 try { 544 for (int i = 0; i < messages.size(); i++) { 545 messages.get(i).send(); 546 } 547 } finally { 548 messages.clear(); 549 } 550 } 551 552 acknowledge(); 553 } 554 555 558 public void acknowledge() 559 throws JMSException 560 { 561 checkOpen(); 562 563 for (int i = 0; i < _consumers.size(); i++) { 564 MessageConsumerImpl consumer = _consumers.get(i); 565 566 try { 567 consumer.acknowledge(); 568 } catch (Throwable e) { 569 log.log(Level.WARNING, e.toString(), e); 570 } 571 } 572 } 573 574 577 public void rollback() 578 throws JMSException 579 { 580 checkOpen(); 581 582 if (! _isTransacted) 583 throw new IllegalStateException (L.l("rollback() can only be called on a transacted session.")); 584 585 if (_transactedMessages != null) 586 _transactedMessages.clear(); 587 588 589 for (int i = 0; i < _consumers.size(); i++) { 590 MessageConsumerImpl consumer = _consumers.get(i); 591 592 try { 593 consumer.rollback(); 594 } catch (Throwable e) { 595 log.log(Level.WARNING, e.toString(), e); 596 } 597 } 598 } 599 600 603 public void recover() 604 throws JMSException 605 { 606 checkOpen(); 607 608 if (_isTransacted) 609 throw new IllegalStateException (L.l("recover() may not be called on a transacted session.")); 610 611 for (int i = 0; i < _consumers.size(); i++) { 612 MessageConsumerImpl consumer = _consumers.get(i); 613 614 try { 615 consumer.rollback(); 616 } catch (Throwable e) { 617 log.log(Level.WARNING, e.toString(), e); 618 } 619 } 620 } 621 622 625 public void close() 626 throws JMSException 627 { 628 if (_isClosed) 629 return; 630 631 try { 632 stop(); 633 } catch (Throwable e) { 634 log.log(Level.WARNING, e.toString(), e); 635 } 636 637 for (int i = 0; i < _consumers.size(); i++) { 638 MessageConsumerImpl consumer = _consumers.get(i); 639 640 try { 641 consumer.rollback(); 642 } catch (Throwable e) { 643 log.log(Level.WARNING, e.toString(), e); 644 } 645 646 try { 647 consumer.close(); 648 } catch (Throwable e) { 649 log.log(Level.WARNING, e.toString(), e); 650 } 651 } 652 653 try { 654 _connection.removeSession(this); 655 } finally { 656 _isClosed = true; 657 } 658 659 _classLoader = null; 660 } 661 662 protected void addConsumer(MessageConsumerImpl consumer) 663 { 664 if (_consumers == null) 665 _consumers = new ArrayList <MessageConsumerImpl>(); 666 667 _consumers.add(consumer); 668 669 notifyListener(); 670 } 671 672 protected void removeConsumer(MessageConsumerImpl consumer) 673 { 674 if (_consumers != null) 675 _consumers.remove(consumer); 676 } 677 678 681 void notifyListener() 682 { 683 _hasMessage = true; 684 685 synchronized (_consumers) { 686 _consumers.notifyAll(); 687 } 688 689 if (_isAsynchronous) { 690 ThreadPool.getThreadPool().schedule(this); 691 Thread.yield(); 693 } 694 } 695 696 699 public void send(AbstractDestination queue, 700 MessageImpl message, 701 int deliveryMode, 702 int priority, 703 long expiration) 704 throws JMSException 705 { 706 checkOpen(); 707 708 message.setJMSMessageID(queue.generateMessageID()); 709 message.setJMSDestination(queue); 710 message.setJMSDeliveryMode(deliveryMode); 711 message.setJMSTimestamp(Alarm.getCurrentTime()); 712 message.setJMSExpiration(expiration); 713 message.setJMSPriority(priority); 714 715 MessageImpl destMessage = ((MessageImpl) message).copy(); 716 destMessage.setSession(this); 717 destMessage.setReceive(); 718 719 if (_isTransacted) { 720 if (_transactedMessages == null) 721 _transactedMessages = new ArrayList <TransactedMessage>(); 722 723 TransactedMessage transMsg = new TransactedMessage(queue, message); 724 725 _transactedMessages.add(transMsg); 726 } 727 else 728 queue.send(destMessage); 729 } 730 731 734 protected Message receive(MessageConsumerImpl consumer, 735 long timeout) 736 throws JMSException 737 { 738 throw new UnsupportedOperationException (); 739 787 } 788 789 792 public void run() 793 { 794 _hasMessage = true; 795 Thread thread = Thread.currentThread(); 796 797 while (_hasMessage && isActive() && ! isStopping()) { 798 synchronized (_consumers) { 799 if (_isRunning) 800 return; 801 802 _isRunning = true; 803 } 804 805 try { 806 _hasMessage = false; 808 809 for (int i = 0; i < _consumers.size(); i++) { 810 MessageConsumerImpl consumer = _consumers.get(i); 811 MessageListener listener = consumer.getMessageListener(); 815 816 if (_messageListener != null) 817 listener = _messageListener; 818 819 if (consumer.isActive() && ! isStopping() && listener != null) { 820 try { 821 Message msg = consumer.receiveNoWait(); 822 823 if (msg != null) { 824 _hasMessage = true; 825 826 if (log.isLoggable(Level.FINE)) 827 log.fine("JMS " + msg + " delivered to " + listener); 828 829 ClassLoader oldLoader = thread.getContextClassLoader(); 830 try { 831 thread.setContextClassLoader(_classLoader); 832 listener.onMessage(msg); 833 } finally { 834 thread.setContextClassLoader(oldLoader); 835 } 836 } 837 } catch (Throwable e) { 838 log.log(Level.WARNING, e.toString(), e); 839 } 840 } 841 } 842 } finally { 843 845 synchronized (_consumers) { 846 _isRunning = false; 847 848 _consumers.notifyAll(); 849 } 850 } 851 } 852 } 853 854 857 public void checkOpen() 858 throws IllegalStateException 859 { 860 if (_isClosed) 861 throw new IllegalStateException (L.l("session is closed")); 862 } 863 864 870 void checkThread() 871 throws JMSException 872 { 873 Thread thread = _thread; 874 875 if (thread != Thread.currentThread() && thread != null) { 876 Exception e = new IllegalStateException (L.l("Can't use session from concurrent threads.")); 877 log.log(Level.WARNING, e.toString(), e); 878 } 879 } 880 881 static class TransactedMessage { 882 private AbstractDestination _queue; 883 private MessageImpl _message; 884 885 TransactedMessage(AbstractDestination queue, MessageImpl message) 886 { 887 _queue = queue; 888 _message = message; 889 } 890 891 void send() 892 throws JMSException 893 { 894 _queue.send(_message); 895 } 896 } 897 } 898 | Popular Tags |