1 46 package org.mr.api.jms; 47 48 import java.io.IOException ; 49 import java.io.Serializable ; 50 import java.util.ArrayList ; 51 import java.util.Collection ; 52 import java.util.Enumeration ; 53 import java.util.Hashtable ; 54 import java.util.Iterator ; 55 import java.util.LinkedHashSet ; 56 import java.util.List ; 57 import java.util.Set ; 58 59 import javax.jms.BytesMessage ; 60 import javax.jms.Destination ; 61 import javax.jms.IllegalStateException ; 62 import javax.jms.InvalidDestinationException ; 63 import javax.jms.JMSException ; 64 import javax.jms.MapMessage ; 65 import javax.jms.Message ; 66 import javax.jms.MessageConsumer ; 67 import javax.jms.MessageListener ; 68 import javax.jms.MessageProducer ; 69 import javax.jms.ObjectMessage ; 70 import javax.jms.Queue ; 71 import javax.jms.QueueBrowser ; 72 import javax.jms.QueueReceiver ; 73 import javax.jms.QueueSender ; 74 import javax.jms.QueueSession ; 75 import javax.jms.Session ; 76 import javax.jms.StreamMessage ; 77 import javax.jms.TemporaryQueue ; 78 import javax.jms.TemporaryTopic ; 79 import javax.jms.TextMessage ; 80 import javax.jms.Topic ; 81 import javax.jms.TopicPublisher ; 82 import javax.jms.TopicSession ; 83 import javax.jms.TopicSubscriber ; 84 85 import org.apache.commons.logging.Log; 86 import org.apache.commons.logging.LogFactory; 87 import org.mr.IMessageListener; 88 import org.mr.MantaAgent; 89 import org.mr.MantaException; 90 import org.mr.api.jms.selector.syntax.Selector; 91 import org.mr.core.protocol.MantaBusMessage; 92 import org.mr.core.protocol.MantaBusMessageConsts; 93 import org.mr.core.util.Stage; 94 import org.mr.core.util.StageHandler; 95 import org.mr.core.util.StageParams; 96 import org.mr.core.util.SystemTime; 97 import org.mr.core.util.byteable.Byteable; 98 import org.mr.core.util.byteable.ByteableInputStream; 99 import org.mr.core.util.byteable.ByteableOutputStream; 100 import org.mr.core.util.byteable.ByteableRegistry; 101 import org.mr.kernel.security.MantaAuthorization; 102 import org.mr.kernel.security.SecurityActionTypes; 103 import org.mr.kernel.services.MantaService; 104 import org.mr.kernel.services.ServiceActor; 105 import org.mr.kernel.services.ServiceConsumer; 106 import org.mr.kernel.services.ServiceProducer; 107 import org.mr.kernel.services.topics.VirtualTopicManager; 108 109 110 142 public class MantaSession implements Serializable , Session , QueueSession , TopicSession , IMessageListener,StageHandler { 143 144 145 148 private static final long serialVersionUID = -1698529734026002731L; 149 public Log log; 150 153 private MantaAgent manta; 154 protected Counter listenersCount; 155 156 private Object stopEvent = new Object (); 160 161 private DeliveryListener deliveryListener; 163 164 protected TransactionContext transactionContext; 165 166 private IMessageListener busListener = null; 168 169 171 177 MantaSession(String csessId, MantaConnection con, int ackMode, boolean trx) throws JMSException { 178 179 if (con == null) 180 throw new JMSException ("MNJMS00072 : FAILED ON SESSION CREATION. Connection WAS NULL."); 181 owningConnection = con; 182 sessId = csessId; 183 if (trx) 185 sessionAcknowledgementMode=Session.SESSION_TRANSACTED; 186 else if (ackMode<Session.SESSION_TRANSACTED || ackMode>Session.DUPS_OK_ACKNOWLEDGE) 188 throw new IllegalStateException ("MNJMS00073 : FAILED ON SESSION CREATION. INAVLID ACKNOWLEDGE MODE : "+ackMode); 189 else 190 sessionAcknowledgementMode=ackMode; 191 192 unackedMessages = new LinkedHashSet (); 197 heldMessages = new LinkedHashSet (); 198 transactionContext = new TransactionContext(); 201 transactionContext.addSession(this); 202 214 isClosed = false; 215 isClosing = false; 216 isStopped = !con.isStarted; 217 log=LogFactory.getLog("MantaSession"); 218 219 StageParams innerQueueParms = new StageParams(); 221 innerQueueParms.setBlocking(false); 222 innerQueueParms.setHandler(this); 223 innerQueueParms.setMaxNumberOfThreads(1); 224 innerQueueParms.setNumberOfStartThreads(1); 225 innerQueueParms.setPersistent(false); 226 innerQueueParms.setStageName("Session["+sessId+"]@"); 227 innerQueueParms.setStagePriority(0); 228 innerQueue = new Stage(innerQueueParms); 229 lockMonitor = new Object (); 230 manta = owningConnection.getChannel(); 231 consumerMessages = new ArrayList (); 232 listenersCount = new Counter(); 233 } 235 236 243 void start() throws JMSException 244 { 245 checkLegalOperation(); 246 if (!isStopped) 247 return; 248 isStopped = false; 249 250 synchronized (lockMonitor) { 251 lockMonitor.notifyAll(); 252 } 253 } 254 255 262 void stop() throws JMSException 263 { 264 checkLegalOperation(); 265 if (isStopped) 266 return; 267 268 synchronized (lockMonitor) { 273 isStopped = true; 276 277 innerQueue.enqueue(stopEvent); 280 try { 281 lockMonitor.wait(); 282 } catch (InterruptedException e) { 283 if (log.isErrorEnabled()) 284 log.error("Error while stopping the session. ", e); 285 } 286 } 287 } 288 289 295 public BytesMessage createBytesMessage() throws JMSException { 296 checkLegalOperation(); 297 return new MantaBytesMessage(this); 298 } 300 306 public MapMessage createMapMessage() throws JMSException { 307 checkLegalOperation(); 308 return new MantaMapMessage(this); 309 } 311 317 public Message createMessage() throws JMSException { 318 checkLegalOperation(); 319 return new MantaMessage(this); 320 } 322 328 public ObjectMessage createObjectMessage() throws JMSException { 329 checkLegalOperation(); 330 return new MantaObjectMessage(this); 331 } 333 343 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 344 checkLegalOperation(); 345 return new MantaObjectMessage(this,object); 346 } 348 354 public StreamMessage createStreamMessage() throws JMSException { 355 checkLegalOperation(); 356 return new MantaStreamMessage(this); 357 } 359 365 public TextMessage createTextMessage() throws JMSException { 366 checkLegalOperation(); 367 return new MantaTextMessage(this); 368 } 370 379 public TextMessage createTextMessage(String text) throws JMSException { 380 checkLegalOperation(); 381 return new MantaTextMessage(this,text); 382 } 384 391 public final boolean getTransacted() throws JMSException { 392 393 checkLegalOperation(); 394 return sessionAcknowledgementMode == Session.SESSION_TRANSACTED || 396 transactionContext.isInXATransaction(); 397 398 } 400 401 417 public final int getAcknowledgeMode() throws JMSException { 418 419 checkLegalOperation(); 420 return sessionAcknowledgementMode; 421 } 423 436 public void commit() throws JMSException { 437 transactionContext.commit(); 441 } 442 443 446 protected void commitSession() throws JMSException { 447 synchronized (unackedMessages) { 449 Iterator ackIterator = unackedMessages.iterator(); 450 int size = unackedMessages.size(); 451 for (int i = 0;i<size;i++) { 452 MantaBusMessage mbm = (MantaBusMessage)ackIterator.next(); 453 owningConnection.ack(mbm); 454 } 455 unackedMessages.clear(); 456 } 457 sendAllMessages(heldMessages); 459 heldMessages.clear(); 460 } 462 473 public void rollback() throws JMSException { 474 transactionContext.rollback(); 478 } 480 481 484 protected void rollbackSession() throws JMSException { 485 heldMessages.clear(); 486 487 synchronized(listenersCount) { 488 while (listenersCount.val() != 0) { 490 try { 491 listenersCount.wait(); 492 } 493 catch (InterruptedException ie) {} 494 } 495 496 sendUnackedMessages(); 499 } 500 } 502 505 protected void sendUnackedMessages() throws JMSException { 506 MantaBusMessage mbm; 507 String consumer; 508 MantaMessageConsumer destConsumer; 509 synchronized(lockMonitor) { 510 List unackedMessagesCopy = new ArrayList (); 511 synchronized(unackedMessages) { 512 unackedMessagesCopy.addAll(unackedMessages); 513 unackedMessages.clear(); 514 } 515 Iterator unacked = unackedMessagesCopy.iterator(); 516 while (unacked.hasNext()) { 517 mbm = (MantaBusMessage) unacked.next(); 518 consumer = ((ServiceActor)mbm.getRecipient()).getId(); 519 destConsumer = (MantaMessageConsumer)messageConsumers.get(consumer); 520 if (destConsumer != null) { 521 synchronized (destConsumer) { 522 if (!destConsumer.isClosed) { 523 MantaMessage result = (MantaMessage) mbm.getPayload(); 524 result.flags = result.flags | MantaMessage.IS_REDELIVERED; 525 destConsumer.feedMessageListener(mbm); 526 } 527 } 528 } 529 else { 530 if (log.isInfoEnabled()) 531 log.info("A message cannot be sent to a closed consumer. Returning to wait."); 532 unackedMessages.add(mbm); 533 } 534 } 535 } 536 } 538 539 void startLocalTransactionIfNeeded() throws JMSException { 540 if (sessionAcknowledgementMode == Session.SESSION_TRANSACTED && 541 !transactionContext.isInLocalTransaction() && 542 !transactionContext.isInXATransaction()) { 543 transactionContext.begin(); 544 } 545 } 546 547 548 572 public synchronized void close() throws JMSException { 573 574 if (isClosed || isClosing) 575 return; 576 577 stop(); 578 579 if (transactionContext.isInLocalTransaction() && !(this instanceof MantaXASession)) 587 rollback(); 588 589 isClosing = true; 590 591 synchronized(messageProducers) { 593 List l = new ArrayList (messageProducers.size()); 594 l.addAll(messageProducers.values()); 595 Iterator producers = l.iterator(); 596 MantaMessageProducer mmp; 597 while (producers.hasNext()) { 598 mmp = (MantaMessageProducer)producers.next(); 599 if (mmp != null) { 600 mmp.close(); 601 } 602 } 603 if (messageProducers.size() > 0) { 605 reportRemainingToLog(messageProducers, "producers"); 606 } 607 messageProducers.clear(); 608 } 609 610 synchronized(messageConsumers) { 612 List l = new ArrayList (messageConsumers.size()); 613 l.addAll(messageConsumers.values()); 614 Iterator consumers = l.iterator(); 615 MantaMessageConsumer mmc; 616 while (consumers.hasNext()) { 617 mmc = (MantaMessageConsumer)consumers.next(); 618 if (mmc != null) { 619 mmc.close(); 620 } 621 } 622 if (messageConsumers.size() > 0) { 624 reportRemainingToLog(messageConsumers, "consumers"); 625 } 626 messageConsumers.clear(); 627 } 628 629 busListener = null; 630 631 owningConnection.deleteSession(this); 632 owningConnection = null; 633 634 638 if (heldMessages != null) { 639 heldMessages.clear(); 640 } 641 642 isClosed = true; 644 isClosing = false; 645 innerQueue.stop(); 646 647 synchronized (lockMonitor) { 648 lockMonitor.notifyAll(); 649 } 650 } 652 653 void saveMessages(MantaXADescriptor descriptor) { 655 descriptor.addHeldMessages(heldMessages); 656 heldMessages.clear(); 657 synchronized (unackedMessages) { 658 descriptor.addUnackedMessages(unackedMessages); 659 unackedMessages.clear(); 660 } 661 } 662 663 private void reportRemainingToLog(Hashtable table, String listName) { 666 if (log.isInfoEnabled()) { 667 String delimiter = ", "; 668 StringBuffer buf = new StringBuffer (); 669 Iterator i = table.values().iterator(); 670 while (i.hasNext()) { 671 buf.append(i.next()); 672 buf.append(delimiter); 673 } 674 log.info("Some "+listName+" remained after closing the session. These "+listName+" will be removed: "+buf.toString()); 675 } 676 } 677 678 705 public void recover() throws JMSException { 706 707 checkLegalOperation(); 708 if (this.getTransacted()) 710 throw new IllegalStateException ("MNJMS00078 : FAILED ON METHOD recover(). SESSION IS TRANSACTED."); 711 712 if (sessionAcknowledgementMode != Session.CLIENT_ACKNOWLEDGE) 713 return; 714 715 sendUnackedMessages(); 716 } 718 719 void setBusMessageListener(IMessageListener listener) { 722 busListener = listener; 723 } 724 725 726 730 public void run() { 731 synchronized (consumerMessages) { 732 Message message; 733 MantaBusMessage busMessage; 734 while (!consumerMessages.isEmpty()) { 735 busMessage = (MantaBusMessage) consumerMessages.remove(0); 736 try { 737 message = MantaMessageConsumer.convertToJMSMessage(busMessage, this); 738 ackOrHold(busMessage); 739 } catch (JMSException e) { 740 e.printStackTrace(); 741 busMessage = null; 742 message = null; 743 continue; 744 } 745 746 747 if (deliveryListener != null) 750 deliveryListener.beforeDelivery(this, message); 751 752 try { 754 this.startLocalTransactionIfNeeded(); 755 } catch (JMSException e) { 756 e.printStackTrace(); 757 } 758 759 this.sessionListener.onMessage(message); 761 762 if (deliveryListener != null) 765 deliveryListener.afterDelivery(this, message); 766 } 767 } 768 } 769 770 771 772 785 public MessageListener getMessageListener() throws JMSException { 786 787 checkLegalOperation(); 788 return this.sessionListener; 789 } 791 813 public final void setMessageListener(MessageListener listener) throws JMSException { 814 815 checkLegalOperation(); 816 sessionListener = listener; 817 818 } 820 821 822 846 public MessageProducer createProducer(Destination destination) throws JMSException { 847 848 checkLegalOperation(); 849 850 MantaMessageProducer mmp = null; 851 if (destination == null ) mmp = new MantaMessageProducer(manta.getMessageId(),this); 853 854 else if (!destination.toString().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) { 855 MantaService service = null; 856 if(destination instanceof Queue ){ 857 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_QUEUE,destination.toString() ); 858 service = manta.getService(destination.toString(),MantaService.SERVICE_TYPE_QUEUE); 859 }else{ 860 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_TOPIC,destination.toString() ); 861 service = manta.getService(destination.toString(),MantaService.SERVICE_TYPE_TOPIC); 862 }if (service == null){ 864 throw new JMSException ("MNJMS00079 : FAILED ON METHOD createProducer() FOR DESTINATION "+destination); 865 } 866 ServiceProducer sActor = ServiceProducer.createNew(service); 867 868 try { 869 if (log.isInfoEnabled()) { 870 log.info("Created local producer "+sActor); 871 } 872 manta.advertiseService(sActor); 873 mmp = new MantaMessageProducer(sActor.getId(),this,destination,sActor); 874 } 875 catch (MantaException me) { 876 mmp = null; 877 sActor = null; 878 throw new JMSException ("MNJMS0007A : FAILED ON METHOD createProducer(). ERROR TEXT : "+me.getMessage()); 879 } 880 } 881 882 else { ServiceProducer producer =null ; 884 if(destination instanceof Queue ){ 885 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_QUEUE); 886 producer = new ServiceProducer( manta.getAgentName(),destination.toString(), MantaService.SERVICE_TYPE_QUEUE); 887 }else{ 888 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_TOPIC); 889 producer = new ServiceProducer( manta.getAgentName(),destination.toString(), MantaService.SERVICE_TYPE_TOPIC); 890 } 891 mmp = new MantaMessageProducer( 892 manta.getMessageId(), 893 this, destination, 894 producer); 895 896 } 897 messageProducers.put(mmp.getClientId(),mmp); 898 899 int delay = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getIntProperty("jms.producer_discovery_delay",100); 903 try { 904 Thread.sleep(delay); 905 } catch (InterruptedException e) { 906 if (log.isWarnEnabled()) { 907 log.warn("Interrupted during sleep."); 908 } 909 } 910 return mmp; 911 } 913 930 public MessageConsumer createConsumer(Destination destination) throws JMSException { 931 932 return createConsumer(destination,null,false); 933 934 } 936 967 public final MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 968 969 return createConsumer(destination,messageSelector,false); 970 971 } 973 974 1019 public final MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1020 1021 checkLegalOperation(); 1022 if (destination==null) 1023 throw new InvalidDestinationException ("MNJMS0007B : FAILED ON METHOD createConsumer(). NULL DESTINATION WAS SUPPLIED."); 1024 1025 if(messageSelector!= null) 1027 messageSelector = messageSelector.trim(); 1028 Selector s = new Selector(messageSelector); 1029 MantaMessageConsumer mmc; 1030 MantaService service = null; 1032 byte type; 1033 if(destination instanceof Queue ){ 1034 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_CONSUMER_FOR_QUEUE,destination.toString() ); 1035 service = manta.getService(destination.toString(), MantaService.SERVICE_TYPE_QUEUE); 1036 type = MantaService.SERVICE_TYPE_QUEUE; 1037 }else{ 1038 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_CONSUMER_FOR_TOPIC,destination.toString() ); 1039 service = manta.getService(destination.toString(), MantaService.SERVICE_TYPE_TOPIC); 1040 type = MantaService.SERVICE_TYPE_TOPIC; 1041 } 1042 1043 boolean isWildCard = (type == MantaService.SERVICE_TYPE_TOPIC) && 1046 VirtualTopicManager.isWildCardTopic(destination.toString()); 1047 if (service == null && !isWildCard) 1048 throw new JMSException ("MNJMS0007C : FAILED ON METHOD createConsumer(). COULD NOT REGISTER ON DESTINATION "+destination); 1049 1050 ServiceConsumer sActor; 1051 try { 1052 sActor = new ServiceConsumer(manta.getAgentName(), manta.getDomainName(), 1053 destination.toString() , type,(byte)getAcknowledgeMode()); 1054 if (type==MantaService.SERVICE_TYPE_TOPIC) { 1055 1057 registerListener(sActor.getId()); 1058 1059 1060 } 1061 sActor.setSelectorStatment(messageSelector); 1062 1063 mmc = new MantaMessageConsumer(sActor.getId(),this, destination,messageSelector,noLocal,sActor); 1064 messageConsumers.put(mmc.getClientId(),mmc); 1065 if (log.isInfoEnabled()) { 1066 log.info("Created local consumer "+sActor); 1067 } 1068 manta.advertiseService(sActor); 1069 } 1070 catch (MantaException me) { 1071 mmc = null; 1072 sActor = null; 1073 throw new JMSException ("MNJMS0007D : FAILED ON METHOD createConsumer(). ERROR TEXT : "+me.getMessage()); 1074 1075 } 1076 return mmc; 1077 } 1079 1105 public Queue createQueue(String queueName) throws JMSException { 1106 1107 checkLegalOperation(); 1108 return new MantaQueue(queueName); 1109 } 1111 1137 public Topic createTopic(String topicName) throws JMSException { 1138 1139 checkLegalOperation(); 1140 return new MantaTopic(topicName); 1141 } 1143 1144 1188 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1189 1190 return createDurableSubscriber(topic,name,null,false); 1191 } 1193 1194 1246 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1247 1248 checkLegalOperation(); 1249 owningConnection.authorize(SecurityActionTypes.ACTION_SUBSCRIBE_DURABLE_ON_TOPIC,topic.toString() ); 1250 1251 if (topic==null) 1252 throw new InvalidDestinationException ("MNJMS0007E : FAILED ON METHOD createDurableSubscriber(). A NULL TOPIC WAS SPECIFIED."); 1253 1254 if(messageSelector!= null) { 1257 messageSelector = messageSelector.trim(); 1258 } 1259 Selector s = new Selector(messageSelector); 1260 1261 cleanSubscriptionIfNeeded(topic, name, messageSelector, noLocal); 1263 1264 MantaTopicSubscriber newSub = null; 1266 MantaService service = manta.getService(topic.toString(), MantaService.SERVICE_TYPE_TOPIC); 1267 boolean isWildCard = VirtualTopicManager.isWildCardTopic(topic.toString()); 1270 if (service == null && !isWildCard) 1271 throw new InvalidDestinationException ("MNJMS0007F : FAILED ON METHOD createDurableSubscriber(). TOPIC "+topic+" NOT VALID."); 1272 1273 ServiceConsumer sActor; 1275 1276 try { 1277 sActor = new ServiceConsumer(manta.getAgentName(), manta.getDomainName(), 1278 topic.toString() , MantaService.SERVICE_TYPE_TOPIC,(byte)getAcknowledgeMode(),name); 1279 DurableSubscribers.put(name,sActor); 1280 registerListener(sActor.getId()); 1281 newSub = new MantaTopicSubscriber(sActor.getId(),this,topic,noLocal,true,name,messageSelector,sActor); 1282 messageConsumers.put(newSub.getClientId(),newSub); 1284 sActor.setSelectorStatment(messageSelector); 1285 if (log.isInfoEnabled()) { 1287 log.info("Created local durable subscriber "+sActor); 1288 } 1289 manta.advertiseService(sActor); 1290 1291 } 1292 catch (MantaException me) { 1293 newSub = null; 1294 sActor = null; 1295 throw new JMSException ("MNJMS00080 : FAILED ON METHOD CreateDurableSubscriber(). ERROR TEXT : "+me.getMessage()); 1296 } 1297 return newSub; 1298 1299 } 1301 1302 private void cleanSubscriptionIfNeeded(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1303 MantaTopicSubscriber old = (MantaTopicSubscriber)messageConsumers.get(name); 1304 if (old != null) { 1305 if ( !checkEqual(old.getMessageSelector(), messageSelector) || 1311 !((Topic )old.getDestination()).getTopicName().equals(topic.getTopicName())) { 1312 if (log.isDebugEnabled()) { 1313 log.debug("The durable subscriber '"+name+"' was changed. Deleting old subscription and creating new subscription."); 1314 } 1315 old.close(); 1316 unsubscribe(name); 1317 return; 1318 } 1319 else { 1320 throw new JMSException ("A durable subscriber with the name '"+name+"' already exists."); 1321 } 1322 } 1323 ServiceConsumer durable = (ServiceConsumer)DurableSubscribers.get(name); 1326 if(durable != null) { 1327 if ( !checkEqual(durable.getSelectorStatment(), messageSelector) || 1329 !durable.getServiceName().equals(topic.getTopicName())) { 1330 if (log.isDebugEnabled()) { 1331 log.debug("The durable subscriber '"+name+"' was changed. Deleting old subscription and creating new subscription."); 1332 } 1333 unsubscribe(name); 1334 } 1335 } 1336 } 1337 1338 private boolean checkEqual(Object o1, Object o2) { 1339 if (o1 == null) { 1340 return o2 == null; 1341 } 1342 if (o2 != null) { 1343 return o1.equals(o2); 1344 } 1345 return false; 1346 } 1347 1348 1364 public MantaBusMessage receive(ServiceConsumer consumer, long timeout) throws JMSException , MantaException 1365 { 1366 checkLegalOperation(); 1367 MantaBusMessage msg = null; 1368 synchronized (lockMonitor) { 1369 if (isClosing || isClosed) 1370 return null; 1371 1372 if (!owningConnection.isStarted()) { 1373 long startTime = System.currentTimeMillis(); 1374 try { 1375 lockMonitor.wait(timeout); 1376 } catch (InterruptedException e) { 1377 if (log.isErrorEnabled()) { 1378 log.error("Error while waiting for the session to resume. ", e); 1379 } 1380 } 1381 timeout = timeout-(System.currentTimeMillis()-startTime); 1382 if (timeout < 1000) return null; 1384 } 1385 1386 if ( (manta.getService(consumer.getServiceName(), consumer.getType())).getServiceType()==MantaService.SERVICE_TYPE_TOPIC) { 1389 1390 if (timeout==-1) { 1391 return null; } 1393 1394 1395 removeSessionFrom(consumer.getId()); 1400 ReceiveListener listen = new ReceiveListener(); 1402 manta.subscribeMessageListener(listen,consumer.getId()); 1403 msg = listen.waitForInfo(timeout); 1404 } 1405 else { if (timeout == 0) { 1407 msg = manta.receive(consumer); 1408 } 1409 else if (timeout == -1) { 1410 msg = manta.receiveNoWait(consumer); 1411 } 1412 else { msg = manta.receive(consumer,timeout); 1414 } 1415 } 1416 } 1417 1418 if (msg != null) { 1419 this.startLocalTransactionIfNeeded(); 1421 1422 if (sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE || this.getTransacted()) { 1423 synchronized (unackedMessages) { 1424 unackedMessages.add(msg); 1425 } 1426 } 1427 } 1428 1429 return msg; 1430 } 1431 1432 1448 public QueueBrowser createBrowser(Queue queue) throws JMSException 1449 { 1450 return createBrowser(queue, null); 1451 } 1452 1453 1476 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException 1477 { 1478 checkLegalOperation(); 1479 owningConnection.authorize(SecurityActionTypes.ACTION_CREATE_BROSWER_FOR_QUEUE,queue.toString() ); 1480 1481 if (queue == null) 1482 throw new InvalidDestinationException ("MNJMS00081 : FAILED ON METHOD createBrowser(). A NULL QUEUE WAS SPECIFIED."); 1483 1484 MantaQueueBrowser mqb = null; 1485 MantaService service = manta.getService(queue.toString(), MantaService.SERVICE_TYPE_QUEUE); 1486 if (service == null) 1487 throw new InvalidDestinationException ("MNJMS00082 : FAILED ON METHOD createBrowser(). QUEUE "+queue+" NOT VALID."); 1488 1489 ServiceConsumer sActor; 1490 1491 try { 1492 sActor = new ServiceConsumer(manta.getAgentName(), manta.getDomainName(), 1493 service.getServiceName() , service.getServiceType(),(byte)getAcknowledgeMode()); 1494 mqb = new MantaQueueBrowser(sActor.getId(),this, queue,messageSelector,sActor); 1495 sActor.setSelectorStatment(messageSelector); 1496 if (log.isInfoEnabled()) { 1497 log.info("Created local queue browser "+sActor); 1498 } 1499 manta.advertiseService(sActor); 1500 1501 } 1502 catch (MantaException me) { 1503 mqb = null; 1504 sActor = null; 1505 throw new JMSException ("MNJMS00083 : FAILED ON METHOD createBrowser(). ERROR TEXT : "+me.getMessage()); 1506 1507 } 1508 1509 1510 1511 return mqb; 1512 1513 } 1514 1515 1516 1529 public TemporaryQueue createTemporaryQueue() throws JMSException { 1530 1531 checkLegalOperation(); 1532 1533 return owningConnection.addTempQueue(); 1534 1535 } 1537 1538 1551 public TemporaryTopic createTemporaryTopic() throws JMSException { 1552 1553 checkLegalOperation(); 1554 TemporaryTopic t = owningConnection.addTempTopic(); 1555 return new MantaTemporaryTopic(t.getTopicName(),this.owningConnection); 1556 } 1558 1559 1584 public void unsubscribe(String name) throws JMSException { 1585 1586 checkLegalOperation(); 1587 ServiceActor durable = (ServiceActor) DurableSubscribers.remove(name); 1589 if(durable != null){ 1590 if (log.isInfoEnabled()) 1591 log.info("Unsubscribing durable service consumer: "+durable); 1592 try { 1593 manta.recallDurableSubscription(durable); 1594 } catch (MantaException me) { 1595 throw new JMSException ("MNJMS00083 : FAILED ON METHOD unsubscribe(). FROM TOPIC "+name+" ERROR TEXT : "+me.getMessage()); 1596 1597 } 1598 } 1599 1600 } 1602 final void removeSessionFrom(String dest) { 1603 manta.unsubscribeFromTopic(this,dest); 1604 } 1605 1606 1614 synchronized void sendMessage(ServiceProducer sp, Message orig) throws JMSException { 1615 1616 checkLegalOperation(); 1617 1618 MantaMessage msg; 1619 if(orig instanceof MantaMessage){ 1620 msg = ((MantaMessage)orig).makeCopy(); 1621 }else{ 1622 msg = fromForeignMsgToManta(orig); 1623 } 1624 1625 Destination dest = msg.getJMSDestination(); 1626 1627 1629 this.startLocalTransactionIfNeeded(); 1630 1631 if (this.sessionAcknowledgementMode == SESSION_TRANSACTED) { 1632 msg.setJMSMessageID("ID:in-transaction"); 1633 orig.setJMSMessageID("ID:in-transaction"); 1634 heldMessages.add(new HeldMessage(sp,msg)); 1635 if (log.isDebugEnabled()) { 1636 log.debug("Transacted session: Caching message until commit is invoked. Message="+msg); 1637 } 1638 } 1639 else{ 1640 MantaBusMessage mbm = prepareMessageForSending(msg); 1645 orig.setJMSMessageID(msg.JMSMessageId); 1646 msg.setWriteableState(false); 1647 if (sp.getServiceType()==MantaService.SERVICE_TYPE_QUEUE) { 1648 try { 1649 if (log.isDebugEnabled()) { 1650 log.debug("About to send massage to queue. Message ID="+mbm.getMessageId()+", Queue="+sp.getServiceName()); 1651 } 1652 manta.enqueueMessage(mbm, 1653 sp, 1654 (byte)msg.getJMSDeliveryMode(), 1655 (byte)msg.getJMSPriority(), 1656 msg.getJMSExpiration()); 1657 } 1658 catch (MantaException me) { 1659 mbm = null; 1660 throw new JMSException ("MNJMS00084 : FAILED ON METHOD sendMessage(). ERROR TEXT : "+me.getMessage()); 1661 } 1662 } 1663 else { 1665 try { 1666 if (log.isDebugEnabled()) { 1667 log.debug("Sending massage to topic. Message ID="+mbm.getMessageId()+", Topic="+sp.getServiceName()); 1668 } 1669 manta.publish(mbm,sp); 1670 } 1671 catch (MantaException me) { 1672 mbm = null; 1673 throw new JMSException ("MNJMS00085 : COULD NOT PUBLISH TO "+sp.getServiceName()+" FAILED ON METHOD sendMessage(). ERROR TEXT : "+me.getMessage()); 1674 } 1675 } 1676 } 1679 if (orig instanceof MantaMessage) 1680 ((MantaMessage) orig).setWriteableState(true); 1681 } 1682 1683 1684 void ackAllMessages(Collection msgs) throws JMSException { 1685 Iterator ackIterator = msgs.iterator(); 1686 while (ackIterator.hasNext()) { 1687 MantaBusMessage mbm = (MantaBusMessage)ackIterator.next(); 1688 owningConnection.ack(mbm); 1689 } 1690 msgs.clear(); 1691 } 1692 1693 1697 protected void sendAllMessages(Collection msgs) throws JMSException { 1698 1699 Destination dest; 1700 MantaMessage msg; 1701 Iterator msgIterator = msgs.iterator(); 1702 HeldMessage hm; 1703 1704 while (msgIterator.hasNext()) { 1705 hm = (HeldMessage)msgIterator.next(); 1706 msg = hm.msg; 1707 dest = msg.getJMSDestination(); 1708 MantaBusMessage mbm = prepareMessageForSending(msg); 1711 ServiceProducer sp = hm.service; 1712 if (dest instanceof Queue ) { 1713 try { 1714 manta.enqueueMessage(mbm, 1715 sp, 1716 (byte)msg.getJMSDeliveryMode(), 1717 (byte)msg.getJMSPriority(), 1718 msg.getJMSExpiration()); 1719 } 1720 catch (MantaException me) { 1721 mbm = null; 1722 sp = null; 1723 throw new JMSException ("MNJMS00086 : FAILED ON METHOD sendAllMessages(). ERROR TEXT : "+me.getMessage()); 1724 } 1725 } 1726 else { 1728 try { 1729 manta.publish(mbm,sp); 1730 } 1731 catch (Exception me) { mbm = null; 1733 throw new JMSException ("MNJMS00087 : FAILED ON METHOD sendAllMessages(). ERROR TEXT : "+me.getMessage()); 1734 } 1735 } 1736 msgIterator.remove(); 1738 } 1739 } 1740 1741 1742 1750 void checkLegalOperation() throws JMSException 1751 { 1752 if (isClosed || isClosing) 1753 throw new IllegalStateException ("MNJMS00088 : OPERATION FAILED ON METHOD checkLegalOperation(). SESSION IS CLOSED."); 1754 } 1755 1756 1762 public void onMessage(MantaBusMessage msg) { 1763 innerQueue.enqueue(msg); 1764 } 1765 1766 1767 void ackOrHold(MantaBusMessage mbm) throws JMSException { 1768 if (getTransacted() || sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { 1769 if (mbm != null) { 1770 synchronized(unackedMessages) { 1771 unackedMessages.add(mbm); 1772 } 1773 } 1774 } 1775 else { 1776 ackMessage(mbm); 1777 } 1778 } 1779 1788 void ackMessage (MantaBusMessage msg) throws JMSException { 1789 checkLegalOperation(); 1790 1791 if (getTransacted()) 1793 return; 1794 1795 if (sessionAcknowledgementMode != Session.CLIENT_ACKNOWLEDGE) { 1796 if (msg != null) 1797 owningConnection.ack(msg); 1798 } 1799 else { 1800 synchronized (unackedMessages) { 1801 Iterator ackIterator = unackedMessages.iterator(); 1802 while (ackIterator.hasNext()) { 1803 MantaBusMessage cbm = (MantaBusMessage)ackIterator.next(); 1804 owningConnection.ack(cbm); 1805 } 1806 unackedMessages.clear(); 1807 } 1808 } 1809 } 1810 1811 1812 1814 1817 1818 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1819 return (TopicPublisher ) createProducer(topic); 1820 } 1822 1825 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1826 return (TopicSubscriber ) createConsumer(topic); 1827 } 1828 1829 1832 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1833 1834 TopicSubscriber theSub = (TopicSubscriber )createConsumer(topic, messageSelector, noLocal); 1836 1837 return theSub; 1838 } 1839 1840 1841 1843 1846 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1847 return (QueueReceiver ) createConsumer(queue, messageSelector); 1848 } 1849 1850 1853 public QueueReceiver createReceiver(Queue queue) throws JMSException 1854 { 1855 return (QueueReceiver ) createConsumer(queue); 1856 } 1857 1858 1861 public QueueSender createSender(Queue queue) throws JMSException 1862 { 1863 return (QueueSender ) createProducer(queue); 1864 } 1865 1866 1867 1873 private MantaBusMessage prepareMessageForSending(Message message) throws JMSException { 1874 1875 1876 MantaBusMessage mantaBusMessage = manta.getMantaBusMessage(); 1877 message.setJMSMessageID("ID:"+mantaBusMessage.getMessageId()); 1878 mantaBusMessage.setPayload((Byteable)message); 1879 mantaBusMessage.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 1880 mantaBusMessage.setPriority((byte)(message.getJMSPriority())); 1881 mantaBusMessage.addHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE,MantaBusMessageConsts.PAYLOAD_TYPE_JMS); 1882 mantaBusMessage.setDeliveryMode((byte) message.getJMSDeliveryMode()); 1883 mantaBusMessage.setValidUntil(message.getJMSExpiration()); 1884 1885 return mantaBusMessage; 1886 1887 } 1889 1892 void removeConsumer(MantaMessageConsumer mc) throws JMSException { 1893 1894 if (isClosed) { 1896 return; 1897 } 1898 1899 messageConsumers.remove(mc.getClientId()); 1900 manta.unsubscribeMessageListener(this, mc.getService().getId()); 1901 1902 if (!mc.getService().getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) { 1903 try { 1905 Thread.sleep(500); 1906 } catch (InterruptedException ie) { 1907 if(log.isInfoEnabled()) { 1908 log.info("removeConsumer() : acks may have been lost for this consumer - "+mc); 1909 } 1910 } 1911 } 1912 1913 try { 1915 if(mc.getService()!= null){ 1916 if (log.isInfoEnabled()) { 1917 if (!mc.getService().isDurable()) { 1918 log.info("Recalling local consumer "+mc.getService()); 1919 } 1920 else { 1921 log.info("Recalling local durable subscriber "+mc.getService()); 1922 } 1923 } 1924 manta.recallService(mc.getService()); 1925 } 1926 } 1927 catch (MantaException ce) { 1928 if (log.isErrorEnabled()) { 1929 log.error("removeConsumer(): could not remove service "+mc.getService().getServiceName(), ce); 1930 } 1931 throw new JMSException ("MNJMS00077 : FAILED ON close(). CONSUMER ON SERVICE "+ 1932 mc.getService().getServiceName()+" WAS NOT RECALLED. ERROR TEXT : "+ce.getMessage()); 1933 } 1934 } 1935 1936 1940 void removeBrowser(MantaQueueBrowser qb) throws JMSException { 1941 1942 if (log.isInfoEnabled()) { 1943 log.info("Recalling local queue browser "+qb.service); 1944 } 1945 try { 1946 manta.recallService(qb.service); 1947 } 1948 catch (MantaException me) { 1949 if (log.isErrorEnabled()) { 1950 log.error("removeBrowser(): could not remove browser "+qb.getService(),me); 1951 } 1952 throw new JMSException (me.getMessage()); 1953 } 1954 } 1955 1956 1961 void removeProducer(MantaMessageProducer mp) throws JMSException { 1962 1963 if (isClosed) 1965 return; 1966 1967 messageProducers.remove(mp.getClientId()); 1968 1969 try { 1971 if(mp.getService()!= null){ 1972 if (log.isInfoEnabled()) { 1973 log.info("Recalling local producer "+mp.getService()); 1974 } 1975 manta.recallService(mp.getService()); 1976 } 1977 } 1978 catch (MantaException ce) { 1979 if (log.isErrorEnabled()) { 1980 log.error("removeProducer(): could not remove service "+mp.getService().getServiceName(), ce); 1981 } 1982 throw new JMSException ("MNJMS00076 : FAILED ON close(). PRODUCER ON SERVICE "+ 1983 mp.getService().getServiceName()+" WAS NOT RECALLED. ERROR TEXT : "+ce.getMessage()); 1984 } 1985 1986 } 1987 1988 1995 Enumeration getMessagesFor(MantaQueueBrowser qb) throws JMSException { 1996 try { 1997 1998 return manta.peekAtQueue(qb.getService()); 1999 } 2000 catch (MantaException me) { 2001 throw new JMSException ("MNJMS0008A : FAILED ON METHOD getMessagesFor(). ERROR TEXT : "+me.getMessage()); 2002 } 2003 2004 } 2005 2006 2009 void addConsumerMessage(MantaBusMessage msg) { 2010 synchronized (consumerMessages) { 2011 this.consumerMessages.add(msg); 2012 } 2013 } 2014 2015 2016 void addConsumerMessages(Collection msgs) { 2019 synchronized (consumerMessages) { 2020 this.consumerMessages.addAll(0,msgs); 2021 } 2022 } 2023 2024 public boolean hasConsumerMessages() { 2025 synchronized (consumerMessages) { 2026 return !consumerMessages.isEmpty(); 2027 } 2028 } 2029 2030 2031 2034 void registerListener(String regString) throws MantaException { 2035 2036 manta.subscribeMessageListener(this,regString); 2037 } 2038 2039 2044 void listenToQueue(MantaMessageConsumer mmc) throws JMSException { 2045 2046 try { 2047 this.manta.subscribeToQueue(mmc.theService, this); 2048 2049 } catch (Exception e) { 2050 throw new InvalidDestinationException ("MNJMS0008B : FAILED ON METHOD listenToQueue(). ERROR TEXT : "+e.getMessage()); 2051 2052 } 2053 2054 } 2055 2056 void deregisterFromQueue(MantaMessageConsumer mmc) { 2057 try { 2058 this.manta.unsubscribeFromQueue(mmc.theService,this); 2059 } catch (Exception e) { 2060 e.printStackTrace(); 2061 } 2062 } 2063 2064 2065 protected int sessionAcknowledgementMode = AUTO_ACKNOWLEDGE; 2066 protected boolean isStopped; 2068 protected boolean isClosed = false; 2069 protected boolean isClosing = false; 2070 protected MantaConnection owningConnection; 2071 protected MessageListener sessionListener = null; 2072 protected Object lockMonitor = new Object (); 2073 protected static long internalId = 0; 2074 protected String sessId; 2075 protected Set heldMessages; 2076 protected ArrayList consumerMessages; 2077 protected LinkedHashSet unackedMessages; 2078 protected Hashtable messageConsumers = new Hashtable (); 2079 protected Hashtable messageProducers = new Hashtable (); 2080 protected Hashtable DurableSubscribers = new Hashtable (); 2081 protected Stage innerQueue; 2082 2083 2089 class HeldMessage implements Byteable{ 2090 ServiceProducer service; 2091 MantaMessage msg; 2092 2093 public HeldMessage(ServiceProducer sp, MantaMessage m) { 2094 msg = m; 2095 service = sp; 2096 2097 } 2098 2099 public String getByteableName() { 2100 2101 return "org.mr.api.jms.MantaSession$HeldMessage"; 2102 } 2103 2104 public void toBytes(ByteableOutputStream out) throws IOException { 2105 2106 out.writeByteable(service); 2107 out.writeByteable(msg); 2108 } 2109 2110 public Byteable createInstance(ByteableInputStream in) throws IOException { 2111 2112 2113 HeldMessage hm = new HeldMessage((ServiceProducer) in.readByteable(),(MantaMessage)in.readByteable()); 2114 return hm; 2115 } 2116 2117 public void registerToByteableRegistry() { 2118 ByteableRegistry.registerByteableFactory(getByteableName() , this); 2119 2120 } 2121 } 2122 2123 2126 public boolean handle(Object event) { 2127 2128 while (isStopped) { 2129 try { 2132 synchronized(lockMonitor) { 2133 lockMonitor.notify(); 2136 2137 lockMonitor.wait(); 2138 } 2139 } 2140 catch(InterruptedException ie) { 2141 if (log.isErrorEnabled()) { 2143 log.error("Error while waiting for the session to resume. ", ie); 2144 } 2145 } 2146 2147 if (this.isClosing || this.isClosed) { 2150 return false; 2151 } 2152 } 2153 2154 if (event == stopEvent) { 2158 return true; 2159 } 2160 2161 MantaBusMessage msg = (MantaBusMessage) event; 2162 2163 if (msg!=null) { 2164 2165 if (busListener != null) { 2168 busListener.onMessage(msg); 2169 return true; 2170 } 2171 2172 String consumer = ((ServiceActor)msg.getRecipient()).getId(); 2173 MantaMessageConsumer destConsumer = 2174 (MantaMessageConsumer)messageConsumers.get(consumer); 2175 2176 if (destConsumer != null) { 2177 synchronized (listenersCount) { 2178 listenersCount.add(); 2179 } 2180 synchronized (destConsumer) { 2181 if (!destConsumer.isClosed) { 2182 try { 2183 destConsumer.feedMessageListener(msg); 2184 } 2185 catch (JMSException jmse) { 2186 log.error("Exception occured in listeners feeding stage", jmse); 2187 } 2188 } 2189 } 2190 synchronized (listenersCount) { 2191 listenersCount.remove(); 2192 if (listenersCount.val() == 0) { 2193 listenersCount.notifyAll(); 2194 } 2195 } 2196 } 2197 else { 2198 if (log.isDebugEnabled()) 2199 log.debug("A message arrived for a recipient that's closed or not registered on this session."); 2200 } 2201 } 2202 return true; 2203 } 2204 2205 2210 private MantaMessage fromForeignMsgToManta(Message foreignMessage) throws JMSException { 2211 2212 MantaMessage mantaResult = null; 2213 if (foreignMessage instanceof TextMessage ) { 2214 TextMessage mantaTextMsg = (TextMessage ) foreignMessage; 2216 MantaTextMessage msg = new MantaTextMessage(); 2217 msg.setText(mantaTextMsg.getText()); 2218 mantaResult = msg; 2219 } 2220 else if (foreignMessage instanceof ObjectMessage ) 2221 { 2222 ObjectMessage mantaObjectMsg = (ObjectMessage ) foreignMessage; 2224 MantaObjectMessage msg = new MantaObjectMessage(); 2225 msg.setObject(mantaObjectMsg.getObject()); 2226 mantaResult = msg; 2227 } 2228 else if (foreignMessage instanceof MapMessage ) 2229 { 2230 MapMessage mantaMapMsg = (MapMessage ) foreignMessage; 2232 MantaMapMessage msg = new MantaMapMessage(); 2233 for (Enumeration iter = mantaMapMsg.getMapNames(); iter.hasMoreElements();) { 2234 String name = iter.nextElement().toString(); 2235 msg.setObject(name, mantaMapMsg.getObject(name)); 2236 } 2237 mantaResult = msg; 2238 } 2239 else if (foreignMessage instanceof BytesMessage ) 2240 { 2241 BytesMessage mantaBytesMsg = (BytesMessage ) foreignMessage; 2243 mantaBytesMsg.reset(); 2244 MantaBytesMessage msg = new MantaBytesMessage(); 2245 try { 2246 while(true) { 2247 msg.writeByte(mantaBytesMsg.readByte()); 2248 } 2249 } 2250 catch (JMSException e) { 2251 } 2253 mantaResult = msg; 2254 } 2255 else if (foreignMessage instanceof StreamMessage ) 2256 { 2257 StreamMessage mantaStreamMessage = (StreamMessage ) foreignMessage; 2258 mantaStreamMessage.reset(); 2260 MantaStreamMessage mantaStreamMsg = new MantaStreamMessage(); 2261 Object obj = null; 2262 try { 2263 while ((obj = mantaStreamMessage.readObject()) != null) { 2264 mantaStreamMsg.writeObject(obj); 2265 } 2266 } 2267 catch (JMSException e) { 2268 } 2270 mantaResult = mantaStreamMsg; 2271 } 2272 2273 mantaResult.setJMSTimestamp(foreignMessage.getJMSTimestamp()); 2274 mantaResult.setJMSReplyTo(fromForeignDesToManta(foreignMessage.getJMSReplyTo())); 2275 mantaResult.setJMSMessageID(foreignMessage.getJMSMessageID()); 2276 mantaResult.setJMSCorrelationID(foreignMessage.getJMSCorrelationID()); 2277 mantaResult.setJMSExpiration(foreignMessage.getJMSExpiration()); 2278 mantaResult.setJMSDestination(fromForeignDesToManta(foreignMessage.getJMSDestination())); 2279 mantaResult.setJMSPriority(foreignMessage.getJMSPriority()); 2280 mantaResult.setJMSDeliveryMode(foreignMessage.getJMSDeliveryMode()); 2281 2282 if (foreignMessage.getJMSRedelivered()) 2283 mantaResult.flags=mantaResult.flags|MantaMessage.IS_REDELIVERED; 2284 2285 mantaResult.setJMSPriority(foreignMessage.getJMSPriority()); 2286 Enumeration propertyKeys = foreignMessage.getPropertyNames(); 2287 2288 while( propertyKeys.hasMoreElements()) 2289 { 2290 String key = propertyKeys.nextElement().toString(); 2291 Object obj = foreignMessage.getObjectProperty(key); 2292 mantaResult.setObjectProperty(key, obj); 2293 } 2294 return mantaResult; 2295 } 2296 2297 public boolean isStopped() { 2298 return isStopped; 2299 } 2300 2305 private MantaDestination fromForeignDesToManta(Destination destination) throws JMSException { 2306 2307 if (destination==null) 2308 return null; 2309 2310 if (destination instanceof MantaDestination) 2311 return (MantaDestination)destination; 2312 2313 MantaDestination result = null; 2314 if (destination instanceof TemporaryQueue ) { 2315 result = new MantaTemporaryQueue(((Queue ) destination).getQueueName(),null); 2316 } 2317 else if (destination instanceof TemporaryTopic ) { 2318 result = new MantaTemporaryTopic(((Topic ) destination).getTopicName(),null); 2319 } 2320 else if (destination instanceof Queue ) { 2321 result = new MantaQueue(((Queue ) destination).getQueueName()); 2322 } 2323 else if (destination instanceof Topic ) { 2324 result = new MantaTopic(((Topic ) destination).getTopicName()); 2325 } 2326 2327 return result; 2328 2329 } 2331 2332 public DeliveryListener getDeliveryListener() { 2333 return deliveryListener; 2334 } 2335 2336 2337 public void setDeliveryListener(DeliveryListener deliveryListener) { 2338 this.deliveryListener = deliveryListener; 2339 } 2340 2341 2346 public void setTransactionContext(TransactionContext newContext) { 2347 if (transactionContext != null) { 2348 transactionContext.removeSession(this); 2349 } 2350 transactionContext = newContext; 2351 transactionContext.addSession(this); 2352 } 2353 2354 2358 public TransactionContext getTransactionContext() { 2359 return transactionContext; 2360 } 2361 2362 2363 protected class Counter { 2364 int count=0; 2365 void add() { 2366 count++; 2367 } 2368 void remove() { 2369 count--; 2370 } 2371 int val() { 2372 return count; 2373 } 2374 2375 } 2376 2377 2383 public static interface DeliveryListener { 2384 public void beforeDelivery(MantaSession session, Message msg); 2385 public void afterDelivery(MantaSession session, Message msg); 2386 } 2387} | Popular Tags |