1 46 package org.mr.api.jms; 47 48 import javax.jms.*; 49 import javax.jms.IllegalStateException ; 50 51 import java.io.Serializable ; 52 import java.util.HashMap ; 53 import java.util.Iterator ; 54 55 import org.mr.MantaException; 56 import org.mr.kernel.services.MantaService; 57 import org.mr.kernel.services.ServiceProducer; 58 import org.mr.core.util.SystemTime; 59 60 import org.apache.commons.logging.Log; 61 import org.apache.commons.logging.LogFactory; 62 63 64 99 public class MantaMessageProducer implements Serializable , MessageProducer, TopicPublisher, QueueSender { 100 101 110 public MantaMessageProducer(String clientId, MantaSession sess,Destination destination, ServiceProducer service){ 111 112 theDestination = destination; 113 creatingSession = sess; 114 this.clientId = clientId; 115 theService = service; 116 producers = new HashMap (); 117 isClosed = false; 118 log = LogFactory.getLog("MantaMessageProducer"); 119 } 121 124 public MantaMessageProducer (String clientId, MantaSession sess) { 125 this(clientId, sess,null, null); 127 } 128 129 153 public void setDisableMessageID(boolean value) throws JMSException{ 154 155 156 if (creatingSession==null || creatingSession.isClosed || creatingSession.isClosing) 157 throw new IllegalStateException ("MNJMS00063 : FAILED ON METHOD setDisableMessageID() - PARENT SESSION IS CLOSED."); 158 159 } 162 163 172 public boolean getDisableMessageID() throws JMSException{ 173 174 checkLegalOperation(); 175 return false; 178 } 179 180 204 public void setDisableMessageTimestamp(boolean value) throws JMSException{ 205 206 if (creatingSession==null || creatingSession.isClosed || creatingSession.isClosing) 207 throw new IllegalStateException ("MNJMS00064 : FAILED ON METHOD setDisableMTimeStamp() - PARENT SESSION IS CLOSED."); 208 } 211 212 221 public boolean getDisableMessageTimestamp() throws JMSException{ 222 checkLegalOperation(); 223 return false; 225 } 226 227 247 public void setDeliveryMode(int newDeliveryMode) throws JMSException{ 248 checkLegalOperation(); 249 if (newDeliveryMode != MantaMessage.PERSISTENT && 250 newDeliveryMode != MantaMessage.NON_PERSISTENT) 251 252 throw new JMSException("MNJMS00065 : ILLEGAL DELIVERY MODE SUPPLIED : "+newDeliveryMode+" FAILED ON METHOD setDeliveryMode()"); 253 254 255 else 256 this.deliveryMode = newDeliveryMode; 257 258 } 259 260 271 public int getDeliveryMode() throws JMSException{ 272 checkLegalOperation(); 273 return this.deliveryMode; 274 } 275 276 297 public void setPriority(int defaultPriority) throws JMSException{ 298 if (defaultPriority>MantaMessage.MAX_PRIORITY || 299 defaultPriority<MantaMessage.MIN_PRIORITY) 300 301 throw new JMSException("MNJMS00066 : ILLEGAL PRIORITY SPECIFIED : "+defaultPriority+ " IS OUT OF RANGE. FAILED ON METHOD setPriority()"); 302 303 else{ 304 checkLegalOperation(); 305 this.messagePriority = defaultPriority; 306 } 307 } 308 309 320 public int getPriority() throws JMSException{ 321 checkLegalOperation(); 322 return messagePriority; 323 } 324 325 342 public void setTimeToLive(long timeToLive) throws JMSException{ 343 if (timeToLive<0) 344 throw new JMSException("MNJMS00067 : INVALID TIME TO LIVE SUPPLIED. NEGATIVE VALUES NOT ALLOWED. FAILED ON METHOD setTimeToLive()"); 345 346 else { 347 checkLegalOperation(); 348 this.messageTTL = timeToLive; 349 } 350 351 } 352 353 365 public long getTimeToLive() throws JMSException{ 366 checkLegalOperation(); 367 return this.messageTTL; 368 } 369 370 381 public Destination getDestination() throws JMSException{ 382 checkLegalOperation(); 383 return theDestination; 384 } 385 386 399 public void close() throws JMSException{ 400 401 if (isClosed) 402 return; 403 404 isClosed = true; 405 406 creatingSession.removeProducer(this); 409 Iterator producersToRemove = producers.keySet().iterator(); 410 while (producersToRemove.hasNext()) { 411 ServiceProducer sp = (ServiceProducer) producers.get(producersToRemove.next()); 412 try { 413 if (log.isInfoEnabled()) { 414 log.info("Recalling local producer "+sp); 415 } 416 creatingSession.owningConnection.getChannel().recallService(sp); 417 } catch (MantaException e) { 418 if (log.isErrorEnabled()) 419 log.error("MNJMS00067 : FAILED ON METHOD close(). UNABLE TO REMOVE PRODUCER FROM SESSION.",e); 420 } 421 } 422 423 creatingSession = null; 424 theService = null; 425 clientId = null; 426 428 } 429 430 455 public void send(Message msg) throws JMSException{ 456 457 459 460 send(theDestination,msg,deliveryMode,messagePriority,messageTTL); 461 462 } 463 464 493 public void send(Message msg, int msgDeliveryMode, int priority, long timeToLive) throws JMSException{ 494 495 send(theDestination,msg,msgDeliveryMode,priority,timeToLive); 497 498 } 499 500 532 public void send(Destination destination, Message message) throws JMSException{ 533 534 if (theDestination!=null) 536 throw new UnsupportedOperationException ("MNJMS00068 : FAILED ON METHOD send(). DEFAULT DESTINATION IS NOT NULL."); 537 538 send(destination,message,deliveryMode,messagePriority,messageTTL); 539 540 } 541 542 574 575 public void send(Destination destination, Message message, int delMode, int priority, long timeToLive) throws JMSException{ 576 577 checkLegalOperation(); 578 if (destination==null) 579 throw new UnsupportedOperationException ("MNJMS00069 : FAILED ON METHOD send(). NULL DESTINATION SUPPLIED."); 580 581 if (message==null) 582 throw new JMSException("MNJMS0006A : FAILED ON METHOD send(). NULL MESSAGE SUPPLIED."); 583 if (theDestination!=null && !destination.toString().equals(theDestination.toString())) 584 throw new UnsupportedOperationException ("MNJMS0006B : FAILED ON METHOD send(). PRODUCER HAS A PERMANANT DESTINATION."); 585 586 if (log.isDebugEnabled()) { 587 log.debug("A new message is ready to be published to "+destination.toString()); 588 } 589 590 ((MantaMessage)message).setWriteableState(true); 593 594 message.setJMSDeliveryMode(delMode); 595 message.setJMSPriority(priority); 596 message.setJMSDestination(destination); 597 message.setJMSTimestamp(SystemTime.gmtCurrentTimeMillis()); 599 600 if (message instanceof MantaMessage){ 602 ((MantaMessage)message).setClientId(clientId); 603 String connId = creatingSession.owningConnection.getClientID(); 605 ((MantaMessage)message).setConnId(connId); 606 607 } 608 609 if (timeToLive==0){ 611 message.setJMSExpiration(Long.MAX_VALUE); 612 } 613 else { 614 message.setJMSExpiration(message.getJMSTimestamp()+timeToLive); 617 } 618 619 if (this.theDestination!=destination){ 620 621 ServiceProducer producer = (ServiceProducer) producers.get(destination.toString()); 622 623 if (producer==null){ 625 626 MantaService ms; 627 if(destination instanceof Queue){ 628 ms = creatingSession.owningConnection.getChannel().getService( 629 destination.toString(),MantaService.SERVICE_TYPE_QUEUE); 630 631 }else{ 632 ms = creatingSession.owningConnection.getChannel().getService( 633 destination.toString(),MantaService.SERVICE_TYPE_TOPIC); 634 635 } 636 producer = ServiceProducer.createNew(ms); 637 638 try { 639 if (!(destination instanceof TemporaryQueue) && 640 !(destination instanceof TemporaryTopic)){ 641 if (log.isInfoEnabled()) { 643 log.info("Created local producer "+producer); 644 } 645 creatingSession.owningConnection.getChannel().advertiseService(producer); 646 producers.put(destination.toString(),producer); 647 } 648 else { 649 if (log.isDebugEnabled()) { 650 log.debug("Created producer on temporary queue "+producer); 651 } 652 } 653 654 655 } catch(MantaException me) { 656 throw new JMSException("MNJMS0006C : FAILED ON METHOD send(). ERROR TEXT : "+me.getMessage()); 657 658 } 659 660 } 661 662 creatingSession.sendMessage(producer,message); 663 664 }else{ 665 creatingSession.sendMessage(this.getService(),message); 666 } 667 668 669 } 671 672 673 680 protected byte getAcknowledgeMode(Message message){ 681 byte result = 0; 682 try { 683 result = (byte)message.getJMSDeliveryMode(); 684 } catch(JMSException e){ 686 result = MantaSession.CLIENT_ACKNOWLEDGE; 687 } 689 return result; 690 } 692 693 695 698 public Queue getQueue() throws JMSException 699 { 700 checkLegalOperation(); 701 return (Queue) getDestination(); 702 } 703 704 707 public void send(Queue queue, Message message, int qDeliveryMode, int priority, long timeToLive) throws JMSException 708 { 709 710 send((Destination)queue, message, qDeliveryMode, priority, timeToLive); 711 } 712 713 716 public void send(Queue queue, Message message) throws JMSException 717 { 718 send((Destination)queue, message,deliveryMode, 719 messagePriority, messageTTL); 720 721 } 722 723 726 public Topic getTopic() throws JMSException 727 { 728 return (Topic) getDestination(); 729 } 730 731 734 public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException 735 { 736 send(theDestination, message, deliveryMode, priority, timeToLive); 737 } 738 739 742 public void publish(Message message) throws JMSException 743 { 744 send(theDestination,message,deliveryMode,messagePriority,messageTTL); 745 746 } 747 748 751 public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) 752 throws JMSException 753 { 754 send((MantaDestination)topic, message, deliveryMode, priority, timeToLive); 755 } 756 757 760 public void publish(Topic topic, Message message) throws JMSException 761 { 762 send((Destination)topic, message,deliveryMode,messagePriority, 763 messageTTL); 764 } 765 766 770 protected String getClientId(){ 771 return clientId; 772 } 773 774 protected ServiceProducer getService() { 775 return theService; 776 } 777 778 private void checkLegalOperation() throws JMSException { 779 if (isClosed) 780 throw new IllegalStateException ("MNJMS0006D : OPERATION UNALLOWED. PRODUCER IS CLOSED. FAILED ON METHOD checkLegalOperation()"); 781 } 782 783 protected Destination theDestination = null; 785 786 787 protected int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 789 790 protected int messagePriority = Message.DEFAULT_PRIORITY; 792 793 protected long messageTTL = Message.DEFAULT_TIME_TO_LIVE; 795 796 protected MantaSession creatingSession = null; 798 799 protected String clientId; 801 802 protected ServiceProducer theService; 804 805 protected boolean isClosed; 806 807 protected HashMap producers; 808 809 private Log log; 810 } 811 | Popular Tags |