1 18 package org.apache.activemq.network.jms; 19 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Queue ; 27 import javax.jms.QueueConnection ; 28 import javax.jms.QueueSender ; 29 import javax.jms.QueueSession ; 30 import javax.jms.Session ; 31 import javax.jms.Topic ; 32 37 class QueueBridge extends DestinationBridge{ 38 protected Queue consumerQueue; 39 protected Queue producerQueue; 40 protected QueueSession consumerSession; 41 protected QueueSession producerSession; 42 43 protected String selector; 44 protected QueueSender producer; 45 protected QueueConnection consumerConnection; 46 protected QueueConnection producerConnection; 47 48 49 public void stop() throws Exception { 50 super.stop(); 51 if(consumerSession!=null){ 52 consumerSession.close(); 53 } 54 if(producerSession!=null){ 55 producerSession.close(); 56 } 57 } 58 59 60 protected MessageConsumer createConsumer() throws JMSException { 61 consumerSession=consumerConnection.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE); 63 MessageConsumer consumer=null; 64 65 if(selector!=null&&selector.length()>0){ 66 consumer=consumerSession.createReceiver(consumerQueue,selector); 67 }else{ 68 consumer=consumerSession.createReceiver(consumerQueue); 69 } 70 71 return consumer; 72 } 73 74 protected synchronized MessageProducer createProducer() throws JMSException { 75 producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 76 producer = producerSession.createSender(null); 77 return producer; 78 } 79 80 81 82 83 protected synchronized void sendMessage(Message message) throws JMSException { 84 if (producer == null) { 85 createProducer(); 86 } 87 producer.send(producerQueue,message); 88 } 89 90 93 public QueueConnection getConsumerConnection(){ 94 return consumerConnection; 95 } 96 97 100 public void setConsumerConnection(QueueConnection consumerConnection){ 101 this.consumerConnection=consumerConnection; 102 } 103 104 107 public Queue getConsumerQueue(){ 108 return consumerQueue; 109 } 110 111 114 public void setConsumerQueue(Queue consumerQueue){ 115 this.consumerQueue=consumerQueue; 116 } 117 118 121 public QueueConnection getProducerConnection(){ 122 return producerConnection; 123 } 124 125 128 public void setProducerConnection(QueueConnection producerConnection){ 129 this.producerConnection=producerConnection; 130 } 131 132 135 public Queue getProducerQueue(){ 136 return producerQueue; 137 } 138 139 142 public void setProducerQueue(Queue producerQueue){ 143 this.producerQueue=producerQueue; 144 } 145 146 149 public String getSelector(){ 150 return selector; 151 } 152 153 156 public void setSelector(String selector){ 157 this.selector=selector; 158 } 159 160 protected Connection getConnnectionForConsumer(){ 161 return getConsumerConnection(); 162 } 163 164 protected Connection getConnectionForProducer(){ 165 return getProducerConnection(); 166 } 167 168 169 } 170 | Popular Tags |