1 18 package org.apache.activemq.network.jms; 19 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.jms.Topic ; 28 import javax.jms.TopicConnection ; 29 import javax.jms.TopicPublisher ; 30 import javax.jms.TopicSession ; 31 36 class TopicBridge extends DestinationBridge{ 37 protected Topic consumerTopic; 38 protected Topic producerTopic; 39 protected TopicSession consumerSession; 40 protected TopicSession producerSession; 41 protected String consumerName; 42 protected String selector; 43 protected TopicPublisher producer; 44 protected TopicConnection consumerConnection; 45 protected TopicConnection producerConnection; 46 47 48 public void stop() throws Exception { 49 super.stop(); 50 if(consumerSession!=null){ 51 consumerSession.close(); 52 } 53 if(producerSession!=null){ 54 producerSession.close(); 55 } 56 } 57 58 59 60 protected MessageConsumer createConsumer() throws JMSException { 61 consumerSession=consumerConnection.createTopicSession(false,Session.CLIENT_ACKNOWLEDGE); 63 MessageConsumer consumer=null; 64 if(consumerName!=null&&consumerName.length()>0){ 65 if(selector!=null&&selector.length()>0){ 66 consumer=consumerSession.createDurableSubscriber(consumerTopic,consumerName,selector,false); 67 }else{ 68 consumer=consumerSession.createDurableSubscriber(consumerTopic,consumerName); 69 } 70 }else{ 71 if(selector!=null&&selector.length()>0){ 72 consumer=consumerSession.createSubscriber(consumerTopic,selector,false); 73 }else{ 74 consumer=consumerSession.createSubscriber(consumerTopic); 75 } 76 } 77 return consumer; 78 } 79 80 81 82 protected synchronized MessageProducer createProducer() throws JMSException { 83 producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 84 producer = producerSession.createPublisher(null); 85 return producer; 86 } 87 88 protected synchronized void sendMessage(Message message) throws JMSException { 89 if (producer == null) { 90 createProducer(); 91 } 92 producer.publish(producerTopic,message); 93 } 94 95 98 public TopicConnection getConsumerConnection(){ 99 return consumerConnection; 100 } 101 102 106 public void setConsumerConnection(TopicConnection consumerConnection){ 107 this.consumerConnection=consumerConnection; 108 } 109 110 113 public String getConsumerName(){ 114 return consumerName; 115 } 116 117 121 public void setConsumerName(String consumerName){ 122 this.consumerName=consumerName; 123 } 124 125 128 public Topic getConsumerTopic(){ 129 return consumerTopic; 130 } 131 132 136 public void setConsumerTopic(Topic consumerTopic){ 137 this.consumerTopic=consumerTopic; 138 } 139 140 143 public TopicConnection getProducerConnection(){ 144 return producerConnection; 145 } 146 147 151 public void setProducerConnection(TopicConnection producerConnection){ 152 this.producerConnection=producerConnection; 153 } 154 155 158 public Topic getProducerTopic(){ 159 return producerTopic; 160 } 161 162 166 public void setProducerTopic(Topic producerTopic){ 167 this.producerTopic=producerTopic; 168 } 169 170 173 public String getSelector(){ 174 return selector; 175 } 176 177 181 public void setSelector(String selector){ 182 this.selector=selector; 183 } 184 185 protected Connection getConnnectionForConsumer(){ 186 return getConsumerConnection(); 187 } 188 189 protected Connection getConnectionForProducer(){ 190 return getProducerConnection(); 191 } 192 } 193 | Popular Tags |