1 46 47 package org.mr.ra.inbound; 48 49 50 import javax.jms.ConnectionConsumer ; 51 import javax.jms.ExceptionListener ; 52 import javax.jms.JMSException ; 53 import javax.jms.Session ; 54 import javax.jms.Topic ; 55 import javax.resource.ResourceException ; 56 import javax.resource.spi.work.Work ; 57 import javax.resource.spi.work.WorkEvent ; 58 import javax.resource.spi.work.WorkException ; 59 import javax.resource.spi.work.WorkListener ; 60 import javax.resource.spi.work.WorkManager ; 61 62 import org.apache.commons.logging.Log; 63 import org.apache.commons.logging.LogFactory; 64 import org.mr.api.jms.MantaDestination; 65 import org.mr.api.jms.MantaQueue; 66 import org.mr.api.jms.MantaTopic; 67 import org.mr.api.jms.MantaXAConnection; 68 import org.mr.ra.ResourceAdapterImpl; 69 70 73 public class MantaAsfEndpointWorker extends MantaBaseEndpointWorker { 74 75 private static final Log log = LogFactory.getLog(MantaAsfEndpointWorker.class); 76 77 private static final long INITIAL_RECONNECT_DELAY = 1000; private static final long MAX_RECONNECT_DELAY = 1000*30; private static final ThreadLocal threadLocal = new ThreadLocal (); 80 81 private ConnectionConsumer consumer; 82 private ServerSessionPoolImpl serverSessionPool; 83 private MantaDestination dest; 84 private boolean running; 85 private Work connectWork; 86 protected MantaXAConnection connection; 87 88 private long reconnectDelay=INITIAL_RECONNECT_DELAY; 89 90 95 public MantaAsfEndpointWorker(final ResourceAdapterImpl adapter, 96 EndpointKey key) throws ResourceException { 97 super(adapter, key); 98 connectWork = new Work () { 99 100 public void release() { 101 } 102 103 synchronized public void run() { 104 if(!isRunning()) 105 return; 106 107 if(connection != null) 108 return; 109 110 ActivationSpecImpl activationSpec = endpointActivationKey.getActivationSpec(); 111 try { 112 connection = adapter.makeConnection(activationSpec); 113 connection.start(); 114 connection.setExceptionListener(new ExceptionListener () { 115 public void onException(JMSException error) { 116 reconnect(error); 117 } 118 }); 119 120 if (activationSpec.isDurableSubscription()) { 121 consumer = connection.createDurableConnectionConsumer( 122 (Topic ) dest, 123 activationSpec.getSubscriptionName(), 124 emptyToNull(activationSpec.getMessageSelector()), 125 serverSessionPool, 126 activationSpec.getMaxMessagesPerSessionsIntValue()); 127 } else { 130 consumer = connection.createConnectionConsumer( 131 dest, 132 emptyToNull(activationSpec.getMessageSelector()), 133 serverSessionPool, 134 activationSpec.getMaxMessagesPerSessionsIntValue()); 135 } 138 } catch (JMSException error) { 145 reconnect(error); 146 } 147 } 148 }; 149 150 ActivationSpecImpl activationSpec = endpointActivationKey.getActivationSpec(); 151 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { 152 dest = new MantaQueue(activationSpec.getDestination()); 153 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { 154 try { 155 dest = new MantaTopic(activationSpec.getDestination()); 156 } catch (JMSException e) { 157 log.error(e.getMessage()); 158 throw new ResourceException ("Unable to create resource: ",e); 159 } 160 } else { 161 throw new ResourceException ("Unknown destination type: " + activationSpec.getDestinationType()); 162 } 163 } 164 165 synchronized public void start() throws WorkException , ResourceException { 166 if (running) 167 return; 168 running = true; 169 log.debug("Starting"); 170 serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); 171 connect(); 172 log.debug("Started"); 173 } 174 175 178 synchronized public void stop() throws InterruptedException { 179 if (!running) 180 return; 181 running = false; 182 log.debug("Disconnecting"); 183 serverSessionPool.close(); 184 disconnect(); 185 log.debug("Disconnected"); 186 } 187 188 private boolean isRunning() { 189 return running; 190 } 191 192 synchronized private void connect() { 193 if (!running) 194 return; 195 196 try { 197 workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); 218 } catch (WorkException e) { 219 running = false; 220 log.error("Work Manager did not accept work: ",e); 221 } 222 } 223 224 227 synchronized private void disconnect() { 228 safeClose(consumer); 229 consumer=null; 230 safeClose(connection); 231 connection=null; 232 } 233 234 synchronized private void reconnect(JMSException error) { 235 log.debug("Reconnect cause: ", error); 236 if(reconnectDelay == MAX_RECONNECT_DELAY) { 238 log.info("Endpoint connection failed: "+error.getMessage()); 239 log.info("Endpoint will try to reconnect in "+(MAX_RECONNECT_DELAY/1000)+" seconds"); 240 } 241 try { 242 disconnect(); 243 Thread.sleep(reconnectDelay); 244 245 reconnectDelay*=2; 247 if(reconnectDelay > MAX_RECONNECT_DELAY) 248 reconnectDelay = MAX_RECONNECT_DELAY; 249 250 connect(); 251 } catch (InterruptedException e) { 252 e.printStackTrace(); 253 } 254 } 255 256 protected void registerThreadSession(Session session) { 257 threadLocal.set(session); 258 } 259 260 protected void unregisterThreadSession(Session session) { 261 threadLocal.set(null); 262 } 263 264 private String emptyToNull(String value) { 265 if (value == null || value.length() == 0) { 266 return null; 267 } 268 return value; 269 } 270 271 } 272 | Popular Tags |