1 18 package org.apache.activemq.ra; 19 20 import java.lang.reflect.Method ; 21 22 import javax.jms.Connection ; 23 import javax.jms.ConnectionConsumer ; 24 import javax.jms.ExceptionListener ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.Session ; 29 import javax.jms.Topic ; 30 import javax.resource.ResourceException ; 31 import javax.resource.spi.endpoint.MessageEndpointFactory ; 32 import javax.resource.spi.work.Work ; 33 import javax.resource.spi.work.WorkException ; 34 import javax.resource.spi.work.WorkManager ; 35 36 import org.apache.activemq.ActiveMQConnection; 37 import org.apache.activemq.command.ActiveMQDestination; 38 import org.apache.activemq.command.ActiveMQQueue; 39 import org.apache.activemq.command.ActiveMQTopic; 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 46 public class ActiveMQEndpointWorker { 47 48 private static final Log log = LogFactory.getLog(ActiveMQEndpointWorker.class); 49 50 53 public static final Method ON_MESSAGE_METHOD; 54 55 private static final long INITIAL_RECONNECT_DELAY = 1000; private static final long MAX_RECONNECT_DELAY = 1000*30; private static final ThreadLocal threadLocal = new ThreadLocal (); 58 59 static { 60 try { 61 ON_MESSAGE_METHOD = MessageListener .class.getMethod("onMessage", new Class []{Message .class}); 62 } 63 catch (Exception e) { 64 throw new ExceptionInInitializerError (e); 65 } 66 } 67 68 protected MessageResourceAdapter adapter; 69 protected ActiveMQEndpointActivationKey endpointActivationKey; 70 protected MessageEndpointFactory endpointFactory; 71 protected WorkManager workManager; 72 protected boolean transacted; 73 74 75 private ConnectionConsumer consumer; 76 private ServerSessionPoolImpl serverSessionPool; 77 private ActiveMQDestination dest; 78 private boolean running; 79 private Work connectWork; 80 protected ActiveMQConnection connection; 81 82 private long reconnectDelay=INITIAL_RECONNECT_DELAY; 83 84 85 88 public static void safeClose(Session s) { 89 try { 90 if (s != null) { 91 s.close(); 92 } 93 } 94 catch (JMSException e) { 95 } 97 } 98 99 102 public static void safeClose(Connection c) { 103 try { 104 if (c != null) { 105 c.close(); 106 } 107 } 108 catch (JMSException e) { 109 } 111 } 112 113 116 public static void safeClose(ConnectionConsumer cc) { 117 try { 118 if (cc != null) { 119 cc.close(); 120 } 121 } 122 catch (JMSException e) { 123 } 125 } 126 127 130 public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { 131 this.endpointActivationKey = key; 132 this.adapter = adapter; 133 this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); 134 this.workManager = adapter.getBootstrapContext().getWorkManager(); 135 try { 136 this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD); 137 } 138 catch (NoSuchMethodException e) { 139 throw new ResourceException ("Endpoint does not implement the onMessage method."); 140 } 141 142 connectWork = new Work () { 143 144 public void release() { 145 } 147 148 synchronized public void run() { 149 if( !isRunning() ) 150 return; 151 if( connection!=null ) 152 return; 153 154 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 155 try { 156 connection = adapter.makeConnection(activationSpec); 157 connection.start(); 158 connection.setExceptionListener(new ExceptionListener () { 159 public void onException(JMSException error) { 160 if (!serverSessionPool.isClosing()) { 161 reconnect(error); 162 } 163 } 164 }); 165 166 if (activationSpec.isDurableSubscription()) { 167 consumer = connection.createDurableConnectionConsumer( 168 (Topic ) dest, 169 activationSpec.getSubscriptionName(), 170 emptyToNull(activationSpec.getMessageSelector()), 171 serverSessionPool, 172 activationSpec.getMaxMessagesPerSessionsIntValue(), 173 activationSpec.getNoLocalBooleanValue()); 174 } else { 175 consumer = connection.createConnectionConsumer( 176 dest, 177 emptyToNull(activationSpec.getMessageSelector()), 178 serverSessionPool, 179 activationSpec.getMaxMessagesPerSessionsIntValue(), 180 activationSpec.getNoLocalBooleanValue()); 181 } 182 183 } catch (JMSException error) { 184 log.debug("Fail to to connect: "+error, error); 185 reconnect(error); 186 } 187 } 188 }; 189 190 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 191 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { 192 dest = new ActiveMQQueue(activationSpec.getDestination()); 193 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { 194 dest = new ActiveMQTopic(activationSpec.getDestination()); 195 } else { 196 throw new ResourceException ("Unknown destination type: " + activationSpec.getDestinationType()); 197 } 198 199 } 200 201 204 synchronized public void start() throws WorkException , ResourceException { 205 if (running) 206 return; 207 running = true; 208 209 log.debug("Starting"); 210 serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); 211 connect(); 212 log.debug("Started"); 213 } 214 215 218 synchronized public void stop() throws InterruptedException { 219 if (!running) 220 return; 221 running = false; 222 serverSessionPool.close(); 223 disconnect(); 224 } 225 226 private boolean isRunning() { 227 return running; 228 } 229 230 synchronized private void connect() { 231 if (!running) 232 return; 233 234 try { 235 workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); 236 } catch (WorkException e) { 237 running = false; 238 log.error("Work Manager did not accept work: ",e); 239 } 240 } 241 242 245 synchronized private void disconnect() { 246 safeClose(consumer); 247 consumer=null; 248 safeClose(connection); 249 connection=null; 250 } 251 252 private void reconnect(JMSException error){ 253 log.debug("Reconnect cause: ",error); 254 long reconnectDelay; 255 synchronized(this) { 256 reconnectDelay = this.reconnectDelay; 257 if (reconnectDelay == MAX_RECONNECT_DELAY) { 259 log.error("Endpoint connection to JMS broker failed: " + error.getMessage()); 260 log.error("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds"); 261 } 262 } 263 try { 264 disconnect(); 265 Thread.sleep(reconnectDelay); 266 267 synchronized(this) { 268 this.reconnectDelay*=2; 270 if (this.reconnectDelay > MAX_RECONNECT_DELAY) 271 this.reconnectDelay=MAX_RECONNECT_DELAY; 272 } 273 connect(); 274 } catch(InterruptedException e) { 275 } 277 } 278 279 protected void registerThreadSession(Session session) { 280 threadLocal.set(session); 281 } 282 283 protected void unregisterThreadSession(Session session) { 284 threadLocal.set(null); 285 } 286 287 private String emptyToNull(String value) { 288 if (value == null || value.length() == 0) { 289 return null; 290 } 291 return value; 292 } 293 294 } 295 | Popular Tags |