1 18 package org.apache.activemq.network.jms; 19 20 import java.util.Iterator ; 21 import java.util.List ; 22 import java.util.Map ; 23 24 import javax.jms.Connection ; 25 import javax.jms.Destination ; 26 import javax.jms.JMSException ; 27 import javax.naming.NamingException ; 28 29 import org.apache.activemq.ActiveMQConnectionFactory; 30 import org.apache.activemq.Service; 31 import org.apache.activemq.broker.BrokerService; 32 import org.apache.activemq.util.LRUCache; 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.springframework.jndi.JndiTemplate; 36 37 import java.util.concurrent.CopyOnWriteArrayList ; 38 import java.util.concurrent.atomic.AtomicBoolean ; 39 40 47 public abstract class JmsConnector implements Service { 48 49 private static final Log log = LogFactory.getLog(JmsConnector.class); 50 protected JndiTemplate jndiLocalTemplate; 51 protected JndiTemplate jndiOutboundTemplate; 52 protected JmsMesageConvertor inboundMessageConvertor; 53 protected JmsMesageConvertor outboundMessageConvertor; 54 private List inboundBridges = new CopyOnWriteArrayList (); 55 private List outboundBridges = new CopyOnWriteArrayList (); 56 protected AtomicBoolean initialized = new AtomicBoolean (false); 57 protected AtomicBoolean started = new AtomicBoolean (false); 58 protected ActiveMQConnectionFactory embeddedConnectionFactory; 59 protected int replyToDestinationCacheSize = 10000; 60 protected String outboundUsername; 61 protected String outboundPassword; 62 protected String localUsername; 63 protected String localPassword; 64 private String name; 65 66 protected LRUCache replyToBridges = createLRUCache(); 67 68 static private LRUCache createLRUCache() { 69 return new LRUCache() { 70 private static final long serialVersionUID = -7446792754185879286L; 71 72 protected boolean removeEldestEntry(Map.Entry enty) { 73 if (size() > maxCacheSize) { 74 Iterator iter = entrySet().iterator(); 75 Map.Entry lru = (Map.Entry ) iter.next(); 76 remove(lru.getKey()); 77 DestinationBridge bridge = (DestinationBridge) lru.getValue(); 78 try { 79 bridge.stop(); 80 log.info("Expired bridge: " + bridge); 81 } 82 catch (Exception e) { 83 log.warn("stopping expired bridge" + bridge + " caused an exception", e); 84 } 85 } 86 return false; 87 } 88 }; 89 } 90 91 93 public boolean init() { 94 boolean result = initialized.compareAndSet(false, true); 95 if (result) { 96 if (jndiLocalTemplate == null) { 97 jndiLocalTemplate = new JndiTemplate(); 98 } 99 if (jndiOutboundTemplate == null) { 100 jndiOutboundTemplate = new JndiTemplate(); 101 } 102 if (inboundMessageConvertor == null) { 103 inboundMessageConvertor = new SimpleJmsMessageConvertor(); 104 } 105 if (outboundMessageConvertor == null) { 106 outboundMessageConvertor = new SimpleJmsMessageConvertor(); 107 } 108 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); 109 } 110 return result; 111 } 112 113 public void start() throws Exception { 114 init(); 115 if (started.compareAndSet(false, true)) { 116 for (int i = 0; i < inboundBridges.size(); i++) { 117 DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i); 118 bridge.start(); 119 } 120 for (int i = 0; i < outboundBridges.size(); i++) { 121 DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i); 122 bridge.start(); 123 } 124 log.info("JMS Connector " + getName() + " Started"); 125 } 126 } 127 128 public void stop() throws Exception { 129 if (started.compareAndSet(true, false)) { 130 for (int i = 0; i < inboundBridges.size(); i++) { 131 DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i); 132 bridge.stop(); 133 } 134 for (int i = 0; i < outboundBridges.size(); i++) { 135 DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i); 136 bridge.stop(); 137 } 138 log.info("JMS Connector " + getName() + " Stopped"); 139 } 140 } 141 142 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); 143 144 150 public void setBrokerService(BrokerService service) { 151 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); 152 } 153 154 157 public JndiTemplate getJndiLocalTemplate() { 158 return jndiLocalTemplate; 159 } 160 161 165 public void setJndiLocalTemplate(JndiTemplate jndiTemplate) { 166 this.jndiLocalTemplate = jndiTemplate; 167 } 168 169 172 public JndiTemplate getJndiOutboundTemplate() { 173 return jndiOutboundTemplate; 174 } 175 176 180 public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) { 181 this.jndiOutboundTemplate = jndiOutboundTemplate; 182 } 183 184 187 public JmsMesageConvertor getInboundMessageConvertor() { 188 return inboundMessageConvertor; 189 } 190 191 195 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 196 this.inboundMessageConvertor = jmsMessageConvertor; 197 } 198 199 202 public JmsMesageConvertor getOutboundMessageConvertor() { 203 return outboundMessageConvertor; 204 } 205 206 210 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) { 211 this.outboundMessageConvertor = outboundMessageConvertor; 212 } 213 214 217 public int getReplyToDestinationCacheSize() { 218 return replyToDestinationCacheSize; 219 } 220 221 225 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { 226 this.replyToDestinationCacheSize = replyToDestinationCacheSize; 227 } 228 229 232 public String getLocalPassword() { 233 return localPassword; 234 } 235 236 240 public void setLocalPassword(String localPassword) { 241 this.localPassword = localPassword; 242 } 243 244 247 public String getLocalUsername() { 248 return localUsername; 249 } 250 251 255 public void setLocalUsername(String localUsername) { 256 this.localUsername = localUsername; 257 } 258 259 262 public String getOutboundPassword() { 263 return outboundPassword; 264 } 265 266 270 public void setOutboundPassword(String outboundPassword) { 271 this.outboundPassword = outboundPassword; 272 } 273 274 277 public String getOutboundUsername() { 278 return outboundUsername; 279 } 280 281 285 public void setOutboundUsername(String outboundUsername) { 286 this.outboundUsername = outboundUsername; 287 } 288 289 protected void addInboundBridge(DestinationBridge bridge) { 290 inboundBridges.add(bridge); 291 } 292 293 protected void addOutboundBridge(DestinationBridge bridge) { 294 outboundBridges.add(bridge); 295 } 296 297 protected void removeInboundBridge(DestinationBridge bridge) { 298 inboundBridges.add(bridge); 299 } 300 301 protected void removeOutboundBridge(DestinationBridge bridge) { 302 outboundBridges.add(bridge); 303 } 304 305 public String getName() { 306 if (name == null) { 307 name = "Connector:" + getNextId(); 308 } 309 return name; 310 } 311 312 static int nextId; 313 314 static private synchronized int getNextId() { 315 return nextId++; 316 } 317 318 public void setName(String name) { 319 this.name = name; 320 } 321 322 public abstract void restartProducerConnection() throws NamingException , JMSException ; 323 } 324 | Popular Tags |