1 16 package org.outerj.daisy.jms.impl; 17 18 import java.util.ArrayList ; 19 import java.util.Collections ; 20 import java.util.Iterator ; 21 import java.util.List ; 22 import java.util.Properties ; 23 24 import javax.jms.*; 25 import javax.naming.Context ; 26 import javax.naming.InitialContext ; 27 import javax.naming.NamingException ; 28 29 import org.apache.avalon.framework.activity.Disposable; 30 import org.apache.avalon.framework.activity.Initializable; 31 import org.apache.avalon.framework.configuration.Configurable; 32 import org.apache.avalon.framework.configuration.Configuration; 33 import org.apache.avalon.framework.configuration.ConfigurationException; 34 import org.apache.avalon.framework.logger.AbstractLogEnabled; 35 import org.apache.avalon.framework.logger.Logger; 36 import org.apache.avalon.framework.service.ServiceException; 37 import org.apache.avalon.framework.service.ServiceManager; 38 import org.apache.avalon.framework.service.Serviceable; 39 import org.apache.avalon.framework.thread.ThreadSafe; 40 import org.outerj.daisy.jms.JmsClient; 41 import org.outerj.daisy.jms.Sender; 42 import org.outerj.daisy.configutil.PropertyResolver; 43 44 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; 45 46 50 public class JmsClientImpl extends AbstractLogEnabled implements JmsClient, Configurable, Initializable, Disposable, ThreadSafe, Serviceable { 51 private Properties contextProperties; 52 private String jmsUserName; 53 private String jmsPassword; 54 private String connectionFactoryName; 55 private String clientId; 56 private Connection jmsConnection; 57 private static final int CONN_RETRY_INTERVAL = 10000; 58 private boolean stopping = false; 59 private List consumers = new ArrayList (); 60 private List senders = new ArrayList (); 61 private List runningThreads = Collections.synchronizedList(new ArrayList ()); 62 private WriterPreferenceReadWriteLock suspendLock = new WriterPreferenceReadWriteLock(); 63 64 public JmsClientImpl() { 65 } 66 67 71 public JmsClientImpl(Properties contextProperties, String userName, String password, String clientId, 72 String connectionFactoryName, Logger logger) throws Exception { 73 this.contextProperties = contextProperties; 74 this.jmsUserName = userName; 75 this.jmsPassword = password; 76 this.clientId = clientId; 77 this.connectionFactoryName = connectionFactoryName; 78 enableLogging(logger); 79 initialize(); 80 } 81 82 public boolean suspend(long msecs) throws InterruptedException { 83 return this.suspendLock.writeLock().attempt(msecs); 84 } 85 86 public void resume() { 87 this.suspendLock.writeLock().release(); 88 } 89 90 91 94 public void service(ServiceManager serviceManager) throws ServiceException { 95 } 97 98 public void configure(Configuration configuration) throws ConfigurationException { 99 Configuration jmsConf = configuration.getChild("jmsConnection"); 100 Configuration[] propertiesConf = jmsConf.getChild("initialContext").getChildren("property"); 101 contextProperties = new Properties (); 102 for (int i = 0; i < propertiesConf.length; i++) { 103 if (!contextProperties.containsKey(propertiesConf[i].getAttribute("name"))) { 104 String value = PropertyResolver.resolveProperties(propertiesConf[i].getAttribute("value")); 105 if (value.indexOf("brokerConfig=xbean:file:") != -1) { 109 value = value.replaceAll("\\\\", "/"); 110 } 111 contextProperties.put(propertiesConf[i].getAttribute("name"), value); 112 } 113 } 114 115 Configuration jmsCredentials = jmsConf.getChild("credentials", false); 116 if (jmsCredentials != null) { 117 jmsUserName = jmsCredentials.getAttribute("username"); 118 jmsPassword = jmsCredentials.getAttribute("password"); 119 } 120 121 clientId = jmsConf.getChild("clientId").getValue(); 122 123 connectionFactoryName = jmsConf.getChild("connectionFactoryName").getValue(); 124 } 125 126 public void initialize() throws Exception { 127 initializeJmsConnection(true); 131 } 132 133 protected void initializeJmsConnection(boolean failOnError) throws Exception { 134 while (jmsConnection == null) { 135 try { 136 getLogger().debug("Trying to establish JMS connection..."); 137 Context context = getContext(); 138 139 ConnectionFactory jmsFactory = (ConnectionFactory) context.lookup(connectionFactoryName); 140 if (jmsUserName != null) 141 jmsConnection = jmsFactory.createConnection(jmsUserName, jmsPassword); 142 else 143 jmsConnection = jmsFactory.createConnection(); 144 jmsConnection.setClientID(clientId); 145 connectionUp(); 146 147 jmsConnection.setExceptionListener(new MyJmsExceptionListener()); 148 jmsConnection.start(); 149 } catch (Exception e) { 150 if (failOnError) 151 throw e; 152 try { 153 Thread.sleep(CONN_RETRY_INTERVAL); 154 } catch (InterruptedException e1) { 155 if (stopping) 156 throw e1; 157 } 158 } 159 } 160 getLogger().info("JMS connection established."); 161 } 162 163 private class MyJmsExceptionListener implements ExceptionListener { 164 public void onException(JMSException e) { 165 if (stopping) 166 return; 167 getLogger().error("Error with the JMS connection. Will automatically try to re-establish connection every " + CONN_RETRY_INTERVAL + " ms.", e); 168 connectionDown(); 169 jmsConnection = null; 170 Thread thread = new ConnectionEstablisherThread(); 171 runningThreads.add(thread); 172 thread.start(); 173 } 174 } 175 176 private class ConnectionEstablisherThread extends Thread { 177 public ConnectionEstablisherThread() { 178 super("DaisyJmsConnectionEstablisher"); 179 setDaemon(true); 180 } 181 182 public void run() { 183 try { 184 initializeJmsConnection(false); 185 } catch (Exception e2) { 186 getLogger().error("Error trying to establish JMS topic connection, giving up.", e2); 188 } 189 runningThreads.remove(this); 190 } 191 } 192 193 private Context getContext() throws NamingException { 194 ClassLoader current = Thread.currentThread().getContextClassLoader(); 195 Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); 196 try { 197 return new InitialContext (contextProperties); 198 } finally { 199 Thread.currentThread().setContextClassLoader(current); 200 } 201 } 202 203 public void dispose() { 204 stopping = true; 205 206 if (runningThreads.size() > 0 && getLogger().isDebugEnabled()) 207 getLogger().debug("Will interrupt " + runningThreads.size() + " JMS connection-establishing threads."); 208 Iterator runningThreadIt = runningThreads.iterator(); 209 while (runningThreadIt.hasNext()) { 210 Thread thread = (Thread )runningThreadIt.next(); 211 thread.interrupt(); 212 } 213 214 try { 215 if (jmsConnection != null) { 216 jmsConnection.stop(); 217 jmsConnection.close(); 218 } 219 } catch (JMSException e) { 220 getLogger().error("Error closing JMS connection.", e); 221 } 222 } 223 224 private synchronized void connectionUp() { 225 bringUp(consumers); 226 bringUp(senders); 227 } 228 229 private synchronized void connectionDown() { 230 bringDown(consumers); 231 bringDown(senders); 232 } 233 234 private void bringUp(List list) { 235 Iterator it = list.iterator(); 236 while (it.hasNext()) { 237 try { 238 ((Reconnectable)it.next()).connectionUp(); 239 } catch (Throwable e) { 240 getLogger().error("Error 'upping' a JMS session.", e); 241 } 242 } 243 } 244 245 private void bringDown(List list) { 246 Iterator it = list.iterator(); 247 while (it.hasNext()) { 248 try { 249 ((Reconnectable)it.next()).connectionDown(); 250 } catch (Throwable e) { 251 getLogger().error("Error 'downing' a JMS session.", e); 252 } 253 } 254 } 255 256 public synchronized void registerDurableTopicListener(String topicName, String subscriptionName, MessageListener listener) throws Exception { 257 MyJmsMessageListener theListener = new MyJmsMessageListener(topicName, subscriptionName, listener); 258 theListener.connectionUp(); 259 consumers.add(theListener); 260 } 261 262 public synchronized void registerListener(String destinationName, MessageListener listener) throws Exception { 263 MyJmsMessageListener theListener = new MyJmsMessageListener(destinationName, null, listener); 264 theListener.connectionUp(); 265 consumers.add(theListener); 266 } 267 268 public synchronized void unregisterListener(MessageListener listener) { 269 if (listener instanceof MyJmsMessageListener) { 270 consumers.remove(listener); 271 ((MyJmsMessageListener)listener).dispose(); 272 } else { 273 throw new RuntimeException ("Unexpected object: " + listener); 274 } 275 } 276 277 public synchronized Sender getSender(String destinationName) { 278 SenderImpl sender = new SenderImpl(destinationName); 279 try { 280 sender.connectionUp(); 281 } catch (Exception e) { 282 getLogger().warn("Sender could not be initialized after initial retrieval, meaning the JMS connection is probably down.", e); 283 } 284 senders.add(sender); 285 return sender; 286 } 287 288 public synchronized void unregisterSender(Sender sender) { 289 if (sender instanceof SenderImpl) { 290 senders.remove(sender); 291 ((SenderImpl)sender).dispose(); 292 } else { 293 throw new RuntimeException ("Unexpected object: " + sender); 294 } 295 } 296 297 interface Reconnectable { 298 void connectionDown(); 299 300 void connectionUp() throws Exception ; 301 } 302 303 class MyJmsMessageListener implements MessageListener, Reconnectable { 304 private String destinationName; 305 private String subscriptionName; 306 private MessageListener delegate; 307 private Session session; 308 309 public MyJmsMessageListener(String destinationName, String subscriptionName, MessageListener delegate) { 310 this.destinationName = destinationName; 311 this.subscriptionName = subscriptionName; 312 this.delegate = delegate; 313 } 314 315 public void onMessage(Message message) { 316 try { 317 suspendLock.readLock().acquire(); 318 } catch (InterruptedException e) { 319 throw new RuntimeException ("Got InterruptedException while waiting for suspendLock."); 322 } 323 try { 324 delegate.onMessage(message); 325 } finally { 326 suspendLock.readLock().release(); 327 } 328 } 329 330 public void connectionDown() { 331 } 332 333 public void connectionUp() throws Exception { 334 Destination jmsDestination = (Destination) getContext().lookup(destinationName); 335 session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 336 MessageConsumer consumer; 337 if (subscriptionName != null) 338 consumer = session.createDurableSubscriber((Topic)jmsDestination, subscriptionName); 339 else 340 consumer = session.createConsumer(jmsDestination); 341 consumer.setMessageListener(this); 342 } 343 344 public void dispose() { 345 try { 346 if (session != null) 347 session.close(); 348 } catch (Exception e) { 349 getLogger().error("Error closing JMS session.", e); 350 } 351 } 352 } 353 354 class SenderImpl implements Sender, Reconnectable { 355 private boolean connectionUp = false; 356 private Session session; 357 private String destinationName; 358 private MessageProducer messageProducer; 359 360 public SenderImpl(String destinationName) { 361 this.destinationName = destinationName; 362 } 363 364 public void send(final Message message) throws Exception { 365 executeWhenConnectionIsUp(new JMSAction() { 366 public void run() throws JMSException { 367 messageProducer.send(message); 368 } 369 }); 370 } 371 372 public TextMessage createTextMessage(final String text) throws JMSException { 373 final TextMessage[] message = new TextMessage[1]; 374 executeWhenConnectionIsUp(new JMSAction() { 375 public void run() throws Exception { 376 message[0] = session.createTextMessage(text); 377 } 378 }); 379 return message[0]; 380 } 381 382 public MapMessage createMapMessage() throws JMSException { 383 final MapMessage[] message = new MapMessage[1]; 384 executeWhenConnectionIsUp(new JMSAction() { 385 public void run() throws Exception { 386 message[0] = session.createMapMessage(); 387 } 388 }); 389 return message[0]; 390 } 391 392 public void connectionDown() { 393 connectionUp = false; 394 session = null; 395 messageProducer = null; 396 } 397 398 public void connectionUp() throws Exception { 399 Destination jmsDestination = (Destination) getContext().lookup(destinationName); 400 session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 401 messageProducer = session.createProducer(jmsDestination); 402 connectionUp = true; 403 } 404 405 public void dispose() { 406 try { 407 if (session != null) 408 session.close(); 409 } catch (Exception e) { 410 getLogger().error("Error closing JMS session.", e); 411 } 412 } 413 414 protected void executeWhenConnectionIsUp(JMSAction action) { 415 stoppingLoop: while (!stopping) { 416 while (!connectionUp) { 418 getLogger().debug("JMS connection is down..."); 419 try { 420 Thread.sleep(CONN_RETRY_INTERVAL); 421 } catch (InterruptedException e) { 422 getLogger().debug("Got interruptedexception while sleeping to wait for JMS connection to re-appear.", e); 423 break stoppingLoop; 424 } 425 } 426 427 int i = 0; 429 while (!stopping) { 430 try { 431 suspendLock.readLock().acquire(); 432 } catch (InterruptedException e) { 433 getLogger().debug("Got interruptedexception while trying to get suspend lock.", e); 434 break stoppingLoop; 435 } 436 try { 437 action.run(); 438 return; 439 } catch (Exception e) { 440 if (!connectionUp) { 441 break; 443 } else { 444 i++; 448 if (i >= 3) { 449 throw new RuntimeException ("Failed to execute JMS action, giving up.", e); 450 } else { 451 try { 452 Thread.sleep(CONN_RETRY_INTERVAL); 453 } catch (InterruptedException e2) { 454 getLogger().debug("Got interruptedexception while sleeping before retrying JMS action.", e); 455 break stoppingLoop; 456 } 457 } 458 } 459 } finally { 460 suspendLock.readLock().release(); 461 } 462 } 463 } 464 throw new RuntimeException ("Failed to execute JMS action and now server is going down..."); 465 } 466 } 467 468 interface JMSAction { 469 public void run() throws Exception ; 470 } 471 } 472 | Popular Tags |