1 18 package org.apache.activemq; 19 20 import java.io.File ; 21 import java.io.InputStream ; 22 import java.io.Serializable ; 23 import java.net.URL ; 24 import java.util.Collections ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 import java.util.concurrent.CopyOnWriteArrayList ; 28 import java.util.concurrent.atomic.AtomicBoolean ; 29 30 import javax.jms.BytesMessage ; 31 import javax.jms.Destination ; 32 import javax.jms.IllegalStateException ; 33 import javax.jms.InvalidDestinationException ; 34 import javax.jms.InvalidSelectorException ; 35 import javax.jms.JMSException ; 36 import javax.jms.MapMessage ; 37 import javax.jms.Message ; 38 import javax.jms.MessageConsumer ; 39 import javax.jms.MessageListener ; 40 import javax.jms.MessageProducer ; 41 import javax.jms.ObjectMessage ; 42 import javax.jms.Queue ; 43 import javax.jms.QueueBrowser ; 44 import javax.jms.QueueReceiver ; 45 import javax.jms.QueueSender ; 46 import javax.jms.QueueSession ; 47 import javax.jms.Session ; 48 import javax.jms.StreamMessage ; 49 import javax.jms.TemporaryQueue ; 50 import javax.jms.TemporaryTopic ; 51 import javax.jms.TextMessage ; 52 import javax.jms.Topic ; 53 import javax.jms.TopicPublisher ; 54 import javax.jms.TopicSession ; 55 import javax.jms.TopicSubscriber ; 56 import javax.jms.TransactionRolledBackException ; 57 58 import org.apache.activemq.blob.BlobTransferPolicy; 59 import org.apache.activemq.blob.BlobUploader; 60 import org.apache.activemq.command.ActiveMQBlobMessage; 61 import org.apache.activemq.command.ActiveMQBytesMessage; 62 import org.apache.activemq.command.ActiveMQDestination; 63 import org.apache.activemq.command.ActiveMQMapMessage; 64 import org.apache.activemq.command.ActiveMQMessage; 65 import org.apache.activemq.command.ActiveMQObjectMessage; 66 import org.apache.activemq.command.ActiveMQQueue; 67 import org.apache.activemq.command.ActiveMQStreamMessage; 68 import org.apache.activemq.command.ActiveMQTempDestination; 69 import org.apache.activemq.command.ActiveMQTextMessage; 70 import org.apache.activemq.command.ActiveMQTopic; 71 import org.apache.activemq.command.Command; 72 import org.apache.activemq.command.ConsumerId; 73 import org.apache.activemq.command.MessageAck; 74 import org.apache.activemq.command.MessageDispatch; 75 import org.apache.activemq.command.MessageId; 76 import org.apache.activemq.command.ProducerId; 77 import org.apache.activemq.command.Response; 78 import org.apache.activemq.command.SessionId; 79 import org.apache.activemq.command.SessionInfo; 80 import org.apache.activemq.command.TransactionId; 81 import org.apache.activemq.management.JMSSessionStatsImpl; 82 import org.apache.activemq.management.StatsCapable; 83 import org.apache.activemq.management.StatsImpl; 84 import org.apache.activemq.memory.UsageManager; 85 import org.apache.activemq.thread.Scheduler; 86 import org.apache.activemq.transaction.Synchronization; 87 import org.apache.activemq.util.Callback; 88 import org.apache.activemq.util.LongSequenceGenerator; 89 import org.apache.commons.logging.Log; 90 import org.apache.commons.logging.LogFactory; 91 92 178 public class ActiveMQSession implements Session , QueueSession , TopicSession , StatsCapable, ActiveMQDispatcher { 179 180 public static interface DeliveryListener { 181 public void beforeDelivery(ActiveMQSession session, Message msg); 182 public void afterDelivery(ActiveMQSession session, Message msg); 183 } 184 185 private static final Log log = LogFactory.getLog(ActiveMQSession.class); 186 187 protected int acknowledgementMode; 188 189 private MessageListener messageListener; 190 private JMSSessionStatsImpl stats; 191 private TransactionContext transactionContext; 192 private DeliveryListener deliveryListener; 193 private MessageTransformer transformer; 194 private BlobTransferPolicy blobTransferPolicy; 195 196 protected final ActiveMQConnection connection; 197 protected final SessionInfo info; 198 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 199 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 200 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 201 protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this); 202 protected final AtomicBoolean started = new AtomicBoolean (false); 203 204 protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList (); 205 protected final CopyOnWriteArrayList producers = new CopyOnWriteArrayList (); 206 207 protected boolean closed; 208 protected boolean asyncDispatch; 209 protected boolean sessionAsyncDispatch; 210 protected final boolean debug; 211 protected Object sendMutex = new Object (); 212 213 226 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch,boolean sessionAsyncDispatch) 227 throws JMSException { 228 this.debug = log.isDebugEnabled(); 229 this.connection = connection; 230 this.acknowledgementMode = acknowledgeMode; 231 this.asyncDispatch=asyncDispatch; 232 this.sessionAsyncDispatch = sessionAsyncDispatch; 233 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 234 setTransactionContext(new TransactionContext(connection)); 235 connection.addSession(this); 236 stats = new JMSSessionStatsImpl(producers, consumers); 237 this.connection.asyncSendPacket(info); 238 setTransformer(connection.getTransformer()); 239 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 240 241 if( connection.isStarted() ) 242 start(); 243 244 } 245 246 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch)throws JMSException { 247 this(connection,sessionId,acknowledgeMode,asyncDispatch,true); 248 } 249 250 256 public void setTransactionContext(TransactionContext transactionContext) { 257 this.transactionContext = transactionContext; 258 } 259 260 265 public TransactionContext getTransactionContext() { 266 return transactionContext; 267 } 268 269 274 public StatsImpl getStats() { 275 return stats; 276 } 277 278 283 public JMSSessionStatsImpl getSessionStats() { 284 return stats; 285 } 286 287 297 public BytesMessage createBytesMessage() throws JMSException { 298 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 299 configureMessage(message); 300 return message; 301 } 302 303 314 public MapMessage createMapMessage() throws JMSException { 315 ActiveMQMapMessage message = new ActiveMQMapMessage(); 316 configureMessage(message); 317 return message; 318 } 319 320 331 public Message createMessage() throws JMSException { 332 ActiveMQMessage message = new ActiveMQMessage(); 333 configureMessage(message); 334 return message; 335 } 336 337 347 public ObjectMessage createObjectMessage() throws JMSException { 348 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 349 configureMessage(message); 350 return message; 351 } 352 353 365 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 366 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 367 configureMessage(message); 368 message.setObject(object); 369 return message; 370 } 371 372 382 public StreamMessage createStreamMessage() throws JMSException { 383 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 384 configureMessage(message); 385 return message; 386 } 387 388 398 public TextMessage createTextMessage() throws JMSException { 399 ActiveMQTextMessage message = new ActiveMQTextMessage(); 400 configureMessage(message); 401 return message; 402 } 403 404 415 public TextMessage createTextMessage(String text) throws JMSException { 416 ActiveMQTextMessage message = new ActiveMQTextMessage(); 417 message.setText(text); 418 configureMessage(message); 419 return message; 420 } 421 422 434 public BlobMessage createBlobMessage(URL url) throws JMSException { 435 return createBlobMessage(url, false); 436 } 437 438 439 454 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 455 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 456 configureMessage(message); 457 message.setURL(url); 458 message.setDeletedByBroker(deletedByBroker); 459 return message; 460 } 461 462 476 public BlobMessage createBlobMessage(File file) throws JMSException { 477 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 478 configureMessage(message); 479 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 480 message.setDeletedByBroker(true); 481 message.setName(file.getName()); 482 return message; 483 } 484 485 486 500 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 501 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 502 configureMessage(message); 503 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 504 message.setDeletedByBroker(true); 505 return message; 506 } 507 508 509 516 public boolean getTransacted() throws JMSException { 517 checkClosed(); 518 return ((acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction())); 519 } 520 521 533 public int getAcknowledgeMode() throws JMSException { 534 checkClosed(); 535 return this.acknowledgementMode; 536 } 537 538 551 public void commit() throws JMSException { 552 checkClosed(); 553 if (!getTransacted()) { 554 throw new javax.jms.IllegalStateException ("Not a transacted session"); 555 } 556 transactionContext.commit(); 557 } 558 559 569 public void rollback() throws JMSException { 570 checkClosed(); 571 if (!getTransacted()) { 572 throw new javax.jms.IllegalStateException ("Not a transacted session"); 573 } 574 transactionContext.rollback(); 575 } 576 577 605 public void close() throws JMSException { 606 if (!closed) { 607 dispose(); 608 connection.asyncSendPacket(info.createRemoveCommand()); 609 } 610 } 611 612 void clearMessagesInProgress(){ 613 executor.clearMessagesInProgress(); 614 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 615 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next(); 616 consumer.clearMessagesInProgress(); 617 } 618 } 619 620 void deliverAcks(){ 621 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 622 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next(); 623 consumer.deliverAcks(); 624 } 625 } 626 627 synchronized public void dispose() throws JMSException { 628 if (!closed) { 629 630 try { 631 executor.stop(); 632 633 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 634 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next(); 635 consumer.dispose(); 636 } 637 consumers.clear(); 638 639 for (Iterator iter = producers.iterator(); iter.hasNext();) { 640 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) iter.next(); 641 producer.dispose(); 642 } 643 producers.clear(); 644 645 try { 646 if (getTransactionContext().isInLocalTransaction()) { 647 rollback(); 648 } 649 } 650 catch (JMSException e) { 651 } 652 653 654 } 655 finally { 656 connection.removeSession(this); 657 this.transactionContext = null; 658 closed = true; 659 } 660 } 661 } 662 663 666 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 667 checkClosed(); 668 message.setConnection(connection); 669 } 670 671 672 679 protected void checkClosed() throws IllegalStateException { 680 if (closed) { 681 throw new IllegalStateException ("The Session is closed"); 682 } 683 } 684 685 709 public void recover() throws JMSException { 710 711 checkClosed(); 712 if (getTransacted()) { 713 throw new IllegalStateException ("This session is transacted"); 714 } 715 716 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 717 ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next(); 718 c.rollback(); 719 } 720 721 } 722 723 734 public MessageListener getMessageListener() throws JMSException { 735 checkClosed(); 736 return this.messageListener; 737 } 738 739 757 public void setMessageListener(MessageListener listener) throws JMSException { 758 checkClosed(); 759 this.messageListener = listener; 760 761 if (listener != null) { 762 executor.setDispatchedBySessionPool(true); 763 } 764 } 765 766 772 public void run() { 773 MessageDispatch messageDispatch; 774 while ((messageDispatch = executor.dequeueNoWait()) != null) { 775 final MessageDispatch md = messageDispatch; 776 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 777 if( message.isExpired() ) { 778 continue; 780 } 781 782 if( isClientAcknowledge() ) { 783 message.setAcknowledgeCallback(new Callback() { 784 public void execute() throws Exception { 785 } 786 }); 787 } 788 789 if (deliveryListener != null) { 790 deliveryListener.beforeDelivery(this, message); 791 } 792 793 md.setDeliverySequenceId(getNextDeliveryId()); 794 795 try { 796 messageListener.onMessage(message); 797 } catch ( Throwable e ) { 798 log.error("error dispatching message: ",e); 800 connection.onAsyncException(e); 801 } 802 803 try { 804 MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1); 805 ack.setFirstMessageId(md.getMessage().getMessageId()); 806 doStartTransaction(); 807 ack.setTransactionId(getTransactionContext().getTransactionId()); 808 if( ack.getTransactionId()!=null ) { 809 getTransactionContext().addSynchronization(new Synchronization(){ 810 public void afterRollback() throws Exception { 811 812 md.getMessage().onMessageRolledBack(); 813 814 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 815 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 816 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 817 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { 818 819 822 MessageAck ack = new MessageAck(md,MessageAck.POSION_ACK_TYPE,1); 824 ack.setFirstMessageId(md.getMessage().getMessageId()); 825 asyncSendPacket(ack); 826 827 } else { 828 829 long redeliveryDelay=0; 831 for( int i=0; i < redeliveryCounter; i++) { 832 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); 833 } 834 835 Scheduler.executeAfterDelay(new Runnable () { 836 public void run() { 837 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 838 } 839 }, redeliveryDelay); 840 841 } 842 } 843 }); 844 } 845 asyncSendPacket(ack); 846 } catch ( Throwable e ) { 847 connection.onAsyncException(e); 848 } 849 850 if (deliveryListener != null) { 851 deliveryListener.afterDelivery(this, message); 852 } 853 } 854 } 855 856 876 public MessageProducer createProducer(Destination destination) throws JMSException { 877 checkClosed(); 878 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation 879 .transformDestination(destination)); 880 } 881 882 898 public MessageConsumer createConsumer(Destination destination) throws JMSException { 899 checkClosed(); 900 return createConsumer(destination, null); 901 } 902 903 929 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 930 checkClosed(); 931 int prefetch = 0; 932 933 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 934 if (destination instanceof Topic ) { 935 prefetch = prefetchPolicy.getTopicPrefetch(); 936 } else { 937 prefetch = prefetchPolicy.getQueuePrefetch(); 938 } 939 940 return new ActiveMQMessageConsumer(this, getNextConsumerId(), 941 ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch, 942 prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch); 943 } 944 945 948 protected ConsumerId getNextConsumerId() { 949 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 950 } 951 952 955 protected ProducerId getNextProducerId() { 956 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 957 } 958 959 1000 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) 1001 throws JMSException { 1002 checkClosed(); 1003 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1004 return new ActiveMQMessageConsumer(this, getNextConsumerId(), 1005 ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, 1006 prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), NoLocal, false, asyncDispatch); 1007 } 1008 1009 1031 public Queue createQueue(String queueName) throws JMSException { 1032 checkClosed(); 1033 return new ActiveMQQueue(queueName); 1034 } 1035 1036 1058 public Topic createTopic(String topicName) throws JMSException { 1059 checkClosed(); 1060 return new ActiveMQTopic(topicName); 1061 } 1062 1063 1073 1111 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1112 checkClosed(); 1113 return createDurableSubscriber(topic, name, null, false); 1114 } 1115 1116 1163 public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) 1164 throws JMSException { 1165 checkClosed(); 1166 connection.checkClientIDWasManuallySpecified(); 1167 ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy(); 1168 int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy 1169 .getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch(); 1170 int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit(); 1171 return new ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation 1172 .transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false, 1173 asyncDispatch); 1174 } 1175 1176 1190 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1191 checkClosed(); 1192 return createBrowser(queue, null); 1193 } 1194 1195 1216 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1217 checkClosed(); 1218 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation 1219 .transformDestination(queue), messageSelector, asyncDispatch); 1220 } 1221 1222 1232 public TemporaryQueue createTemporaryQueue() throws JMSException { 1233 checkClosed(); 1234 return (TemporaryQueue ) connection.createTempDestination(false); 1235 } 1236 1237 1247 public TemporaryTopic createTemporaryTopic() throws JMSException { 1248 checkClosed(); 1249 return (TemporaryTopic )connection.createTempDestination(true); 1250 } 1251 1252 1266 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1267 checkClosed(); 1268 return createReceiver(queue, null); 1269 } 1270 1271 1291 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1292 checkClosed(); 1293 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1294 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation 1295 .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1296 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1297 } 1298 1299 1313 public QueueSender createSender(Queue queue) throws JMSException { 1314 checkClosed(); 1315 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue)); 1316 } 1317 1318 1341 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1342 checkClosed(); 1343 return createSubscriber(topic, null, false); 1344 } 1345 1346 1385 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1386 checkClosed(); 1387 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1388 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation 1389 .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(), 1390 prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1391 } 1392 1393 1411 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1412 checkClosed(); 1413 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic)); 1414 } 1415 1416 1436 public void unsubscribe(String name) throws JMSException { 1437 checkClosed(); 1438 connection.unsubscribe(name); 1439 } 1440 1441 1442 public void dispatch(MessageDispatch messageDispatch) { 1443 try { 1444 executor.execute(messageDispatch); 1445 } catch (InterruptedException e) { 1446 Thread.currentThread().interrupt(); 1447 connection.onAsyncException(e); 1448 } 1449 } 1450 1451 1452 1453 1480 public void acknowledge() throws JMSException { 1481 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 1482 ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next(); 1483 c.acknowledge(); 1484 } 1485 } 1486 1487 1488 1495 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1496 this.consumers.add(consumer); 1497 if (consumer.isDurableSubscriber()) { 1498 stats.onCreateDurableSubscriber(); 1499 } 1500 this.connection.addDispatcher(consumer.getConsumerId(), this); 1501 } 1502 1503 1510 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1511 this.connection.removeDispatcher(consumer.getConsumerId()); 1512 if (consumer.isDurableSubscriber()) { 1513 stats.onRemoveDurableSubscriber(); 1514 } 1515 this.consumers.remove(consumer); 1516 } 1517 1518 1525 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1526 this.producers.add(producer); 1527 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1528 } 1529 1530 1537 protected void removeProducer(ActiveMQMessageProducer producer) { 1538 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1539 this.producers.remove(producer); 1540 } 1541 1542 1547 protected void start() throws JMSException { 1548 started.set(true); 1549 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 1550 ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next(); 1551 c.start(); 1552 } 1553 executor.start(); 1554 } 1555 1556 1560 protected void stop() throws JMSException { 1561 1562 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 1563 ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next(); 1564 c.stop(); 1565 } 1566 1567 started.set(false); 1568 executor.stop(); 1569 } 1570 1571 1576 protected SessionId getSessionId() { 1577 return info.getSessionId(); 1578 } 1579 1580 1598 protected void send(ActiveMQMessageProducer producer, 1599 ActiveMQDestination destination,Message message,int deliveryMode, 1600 int priority,long timeToLive, UsageManager producerWindow) throws JMSException { 1601 1602 checkClosed(); 1603 if(destination.isTemporary()&&connection.isDeleted(destination)){ 1604 throw new JMSException ("Cannot publish to a deleted Destination: " 1605 +destination); 1606 } 1607 synchronized(sendMutex){ 1608 doStartTransaction(); 1610 TransactionId txid=transactionContext.getTransactionId(); 1611 message.setJMSDestination(destination); 1612 message.setJMSDeliveryMode(deliveryMode); 1613 long expiration=0L; 1614 if(!producer.getDisableMessageTimestamp()){ 1615 long timeStamp=System.currentTimeMillis(); 1616 message.setJMSTimestamp(timeStamp); 1617 if(timeToLive>0){ 1618 expiration=timeToLive+timeStamp; 1619 } 1620 } 1621 message.setJMSExpiration(expiration); 1622 message.setJMSPriority(priority); 1623 long sequenceNumber=producer.getMessageSequence(); 1624 message.setJMSRedelivered(false); 1625 ActiveMQMessage msg=ActiveMQMessageTransformation.transformMessage( 1627 message,connection); 1628 if(msg==message){ 1630 msg.setMessageId(new MessageId(producer.getProducerInfo() 1631 .getProducerId(),sequenceNumber)); 1632 }else{ 1633 msg.setMessageId(new MessageId(producer.getProducerInfo() 1634 .getProducerId(),sequenceNumber)); 1635 message.setJMSMessageID(msg.getMessageId().toString()); 1636 } 1637 msg.setTransactionId(txid); 1638 if(connection.isCopyMessageOnSend()){ 1639 msg=(ActiveMQMessage)msg.copy(); 1640 } 1641 msg.setConnection(connection); 1642 msg.onSend(); 1643 msg.setProducerId(msg.getMessageId().getProducerId()); 1644 if(this.debug){ 1645 log.debug("Sending message: "+msg); 1646 } 1647 if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){ 1648 this.connection.asyncSendPacket(msg); 1649 if( producerWindow!=null ) { 1650 int size = msg.getSize(); 1655 producerWindow.increaseUsage(size); 1656 } 1657 }else{ 1658 this.connection.syncSendPacket(msg); 1659 } 1660 1661 } 1662 } 1663 1664 1670 protected void doStartTransaction() throws JMSException { 1671 if(getTransacted()&&!transactionContext.isInXATransaction()){ 1672 transactionContext.begin(); 1673 } 1674 } 1675 1676 1681 public boolean hasUncomsumedMessages() { 1682 return executor.hasUncomsumedMessages(); 1683 } 1684 1685 1690 public boolean isTransacted() { 1691 return this.acknowledgementMode == Session.SESSION_TRANSACTED; 1692 } 1693 1694 1699 protected boolean isClientAcknowledge() { 1700 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1701 } 1702 1703 1708 public boolean isAutoAcknowledge() { 1709 return acknowledgementMode==Session.AUTO_ACKNOWLEDGE; 1710 } 1711 1712 1717 public boolean isDupsOkAcknowledge() { 1718 return acknowledgementMode==Session.DUPS_OK_ACKNOWLEDGE; 1719 } 1720 1721 1726 public DeliveryListener getDeliveryListener() { 1727 return deliveryListener; 1728 } 1729 1730 1736 public void setDeliveryListener(DeliveryListener deliveryListener) { 1737 this.deliveryListener = deliveryListener; 1738 } 1739 1740 1746 protected SessionInfo getSessionInfo() throws JMSException { 1747 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 1748 return info; 1749 } 1750 1751 1758 public void asyncSendPacket(Command command) throws JMSException { 1759 connection.asyncSendPacket(command); 1760 } 1761 1762 1770 public Response syncSendPacket(Command command) throws JMSException { 1771 return connection.syncSendPacket(command); 1772 } 1773 1774 public long getNextDeliveryId() { 1775 return deliveryIdGenerator.getNextSequenceId(); 1776 } 1777 1778 public void redispatch(MessageDispatchChannel unconsumedMessages) throws JMSException { 1779 1780 List c = unconsumedMessages.removeAll(); 1781 Collections.reverse(c); 1782 1783 for (Iterator iter = c.iterator(); iter.hasNext();) { 1784 MessageDispatch md = (MessageDispatch) iter.next(); 1785 executor.executeFirst(md); 1786 } 1787 1788 } 1789 1790 public boolean isRunning() { 1791 return started.get(); 1792 } 1793 1794 public boolean isAsyncDispatch() { 1795 return asyncDispatch; 1796 } 1797 1798 public void setAsyncDispatch(boolean asyncDispatch) { 1799 this.asyncDispatch = asyncDispatch; 1800 } 1801 1802 1805 public boolean isSessionAsyncDispatch(){ 1806 return sessionAsyncDispatch; 1807 } 1808 1809 1812 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch){ 1813 this.sessionAsyncDispatch=sessionAsyncDispatch; 1814 } 1815 1816 public MessageTransformer getTransformer() { 1817 return transformer; 1818 } 1819 1820 1824 public void setTransformer(MessageTransformer transformer) { 1825 this.transformer = transformer; 1826 } 1827 1828 public BlobTransferPolicy getBlobTransferPolicy() { 1829 return blobTransferPolicy; 1830 } 1831 1832 1836 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1837 this.blobTransferPolicy = blobTransferPolicy; 1838 } 1839 1840 public List getUnconsumedMessages() { 1841 return executor.getUnconsumedMessages(); 1842 } 1843 1844 1845 public String toString() { 1846 return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}"; 1847 } 1848 1849 public void checkMessageListener() throws JMSException { 1850 if (messageListener != null) { 1851 throw new IllegalStateException ("Cannot synchronously receive a message when a MessageListener is set"); 1852 } 1853 for (Iterator i = consumers.iterator(); i.hasNext();) { 1854 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 1855 if( consumer.getMessageListener()!=null ) { 1856 throw new IllegalStateException ("Cannot synchronously receive a message when a MessageListener is set"); 1857 } 1858 } 1859 } 1860 1861 protected void setOptimizeAcknowledge(boolean value){ 1862 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 1863 ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next(); 1864 c.setOptimizeAcknowledge(value); 1865 } 1866 } 1867 1868 protected void setPrefetchSize(ConsumerId id,int prefetch){ 1869 for(Iterator iter=consumers.iterator();iter.hasNext();){ 1870 ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next(); 1871 if(c.getConsumerId().equals(id)){ 1872 c.setPrefetchSize(prefetch); 1873 break; 1874 } 1875 } 1876 } 1877 1878 protected void close(ConsumerId id){ 1879 for(Iterator iter=consumers.iterator();iter.hasNext();){ 1880 ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next(); 1881 if(c.getConsumerId().equals(id)){ 1882 try{ 1883 c.close(); 1884 }catch(JMSException e){ 1885 log.warn("Exception closing consumer",e); 1886 } 1887 log.warn("Closed consumer on Command"); 1888 break; 1889 } 1890 } 1891 } 1892 1893 public boolean isInUse(ActiveMQTempDestination destination) { 1894 for(Iterator iter=consumers.iterator();iter.hasNext();){ 1895 ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next(); 1896 if( c.isInUse(destination) ) { 1897 return true; 1898 } 1899 } 1900 return false; 1901 } 1902 1903} 1904 | Popular Tags |