1 23 package org.objectweb.joram.mom.util; 24 25 import javax.jms.*; 26 import javax.jms.IllegalStateException ; 27 28 import java.util.Vector ; 29 30 31 35 public class BridgePubSubModule extends BridgeUnifiedModule 36 { 37 38 private transient TopicPublisher publisher; 39 40 41 42 public BridgePubSubModule() 43 { 44 super(); 45 } 46 47 48 56 public void send(org.objectweb.joram.shared.messages.Message message) 57 throws JMSException 58 { 59 if (! usable) 60 throw new IllegalStateException (notUsableMessage); 61 62 try { 63 publisher.publish(MessageConverterModule.convert(producerSession, 64 message)); 65 acknowledge(message); 66 } 67 catch (javax.jms.MessageFormatException exc) { 68 throw exc; 69 } 70 catch (javax.jms.JMSException exc) { 72 qout.add(message); 73 } 74 } 75 76 82 protected void doConnect() throws JMSException 83 { 84 TopicConnectionFactory topicCnxFact = (TopicConnectionFactory) cnxFact; 85 Topic topic = (Topic) dest; 86 87 if (userName != null && password != null) 88 cnx = topicCnxFact.createTopicConnection(userName, password); 89 else 90 cnx = topicCnxFact.createTopicConnection(); 91 cnx.setExceptionListener(this); 92 93 if (clientID != null) 94 cnx.setClientID(clientID); 95 96 producerSession = 97 ((TopicConnection) cnx).createTopicSession(false, 98 Session.AUTO_ACKNOWLEDGE); 99 publisher = ((TopicSession) producerSession).createPublisher(topic); 100 101 consumerSession = ((TopicConnection) cnx).createTopicSession(true, 0); 102 consumer = consumerSession.createDurableSubscriber((Topic) dest, 103 agentId.toString(), 104 selector, 105 false); 106 } 107 } 108 | Popular Tags |