1 18 package org.apache.activemq.network.jms; 19 20 import java.util.concurrent.atomic.AtomicBoolean ; 21 22 import org.apache.activemq.Service; 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 26 import javax.jms.Connection ; 27 import javax.jms.Destination ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageConsumer ; 31 import javax.jms.MessageListener ; 32 import javax.jms.MessageProducer ; 33 import javax.naming.NamingException ; 34 35 40 public abstract class DestinationBridge implements Service, MessageListener { 41 private static final Log log = LogFactory.getLog(DestinationBridge.class); 42 protected MessageConsumer consumer; 43 protected AtomicBoolean started = new AtomicBoolean (false); 44 protected JmsMesageConvertor jmsMessageConvertor; 45 protected boolean doHandleReplyTo = true; 46 protected JmsConnector jmsConnector; 47 private int maximumRetries = 10; 48 49 52 public MessageConsumer getConsumer() { 53 return consumer; 54 } 55 56 60 public void setConsumer(MessageConsumer consumer) { 61 this.consumer = consumer; 62 } 63 64 67 public void setJmsConnector(JmsConnector connector) { 68 this.jmsConnector = connector; 69 } 70 71 74 public JmsMesageConvertor getJmsMessageConvertor() { 75 return jmsMessageConvertor; 76 } 77 78 81 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 82 this.jmsMessageConvertor = jmsMessageConvertor; 83 } 84 85 public int getMaximumRetries() { 86 return maximumRetries; 87 } 88 89 93 public void setMaximumRetries(int maximumRetries) { 94 this.maximumRetries = maximumRetries; 95 } 96 97 protected Destination processReplyToDestination(Destination destination) { 98 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); 99 } 100 101 public void start() throws Exception { 102 if (started.compareAndSet(false, true)) { 103 MessageConsumer consumer = createConsumer(); 104 consumer.setMessageListener(this); 105 createProducer(); 106 } 107 } 108 109 public void stop() throws Exception { 110 started.set(false); 111 } 112 113 public void onMessage(Message message) { 114 if (started.get() && message != null) { 115 int attempt = 0; 116 try { 117 if (attempt > 0) { 118 restartProducer(); 119 } 120 Message converted; 121 if (doHandleReplyTo) { 122 Destination replyTo = message.getJMSReplyTo(); 123 if (replyTo != null) { 124 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); 125 } 126 else { 127 converted = jmsMessageConvertor.convert(message); 128 } 129 } 130 else { 131 message.setJMSReplyTo(null); 132 converted = jmsMessageConvertor.convert(message); 133 } 134 sendMessage(converted); 135 message.acknowledge(); 136 } 137 catch (Exception e) { 138 log.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e); 139 if (maximumRetries > 0 && attempt >= maximumRetries) { 140 try { 141 stop(); 142 } 143 catch (Exception e1) { 144 log.warn("Failed to stop cleanly", e1); 145 } 146 } 147 } 148 } 149 } 150 151 154 protected boolean isDoHandleReplyTo() { 155 return doHandleReplyTo; 156 } 157 158 162 protected void setDoHandleReplyTo(boolean doHandleReplyTo) { 163 this.doHandleReplyTo = doHandleReplyTo; 164 } 165 166 protected abstract MessageConsumer createConsumer() throws JMSException ; 167 168 protected abstract MessageProducer createProducer() throws JMSException ; 169 170 protected abstract void sendMessage(Message message) throws JMSException ; 171 172 protected abstract Connection getConnnectionForConsumer(); 173 174 protected abstract Connection getConnectionForProducer(); 175 176 protected void restartProducer() throws JMSException , NamingException { 177 try { 178 getConnectionForProducer().close(); 179 } 180 catch (Exception e) { 181 log.debug("Ignoring failure to close producer connection: " + e, e); 182 } 183 jmsConnector.restartProducerConnection(); 184 createProducer(); 185 } 186 } 187 | Popular Tags |