1 16 package org.apache.cocoon.components.jms; 17 18 import java.util.Date ; 19 import java.util.HashMap ; 20 import java.util.HashSet ; 21 import java.util.Iterator ; 22 import java.util.Map ; 23 import java.util.Properties ; 24 import java.util.Set ; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.ExceptionListener ; 28 import javax.jms.JMSException ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.TopicConnection ; 32 import javax.jms.TopicConnectionFactory ; 33 import javax.naming.InitialContext ; 34 import javax.naming.NamingException ; 35 import org.apache.avalon.framework.CascadingException; 36 import org.apache.avalon.framework.activity.Disposable; 37 import org.apache.avalon.framework.activity.Initializable; 38 import org.apache.avalon.framework.activity.Startable; 39 import org.apache.avalon.framework.configuration.Configurable; 40 import org.apache.avalon.framework.configuration.Configuration; 41 import org.apache.avalon.framework.configuration.ConfigurationException; 42 import org.apache.avalon.framework.logger.AbstractLogEnabled; 43 import org.apache.avalon.framework.logger.Logger; 44 import org.apache.avalon.framework.parameters.ParameterException; 45 import org.apache.avalon.framework.parameters.Parameters; 46 import org.apache.avalon.framework.service.ServiceException; 47 import org.apache.avalon.framework.service.ServiceManager; 48 import org.apache.avalon.framework.service.Serviceable; 49 import org.apache.avalon.framework.thread.ThreadSafe; 50 import org.apache.cocoon.components.cron.CronJob; 51 import org.apache.cocoon.components.cron.JobScheduler; 52 53 56 public class JMSConnectionManagerImpl extends AbstractLogEnabled 57 implements JMSConnectionManager, Serviceable, Configurable, Initializable, 58 Startable, Disposable, ThreadSafe, JMSConnectionEventNotifier { 59 60 62 private static final int TOPIC_CONNECTION_TYPE = 1; 63 private static final int QUEUE_CONNECTION_TYPE = 2; 64 private static final int CONNECTION_TYPE = 3; 65 66 private static final String CONNECTION_CONFIG = "connection"; 67 private static final String TOPIC_CONNECTION_CONFIG = "topic-connection"; 68 private static final String QUEUE_CONNECTION_CONFIG = "queue-connection"; 69 private static final String NAME_ATTR = "name"; 70 71 private static final String CONNECTION_FACTORY_PARAM = "connection-factory"; 72 private static final String USERNAME_PARAM = "username"; 73 private static final String PASSWORD_PARAM = "password"; 74 private static final String AUTO_RECONNECT_PARAM = "auto-reconnect"; 75 private static final String AUTO_RECONNECT_DELAY_PARAM = "auto-reconnect-delay"; 76 77 private static final int DEFAULT_AUTO_RECONNECT_DELAY = 1000; 78 79 private static final String JNDI_PROPERTY_PREFIX = "java.naming."; 80 81 83 private ServiceManager m_serviceManager; 84 85 private Map m_configurations; 86 private Map m_connections; 87 private Map m_listeners; 88 89 91 public JMSConnectionManagerImpl() { 92 } 93 94 public void service(ServiceManager manager) { 95 m_serviceManager = manager; 96 } 97 98 public void configure(Configuration configuration) throws ConfigurationException { 99 m_configurations = new HashMap (configuration.getChildren().length); 100 Configuration[] configurations = configuration.getChildren(CONNECTION_CONFIG); 102 configureConnections(configurations, CONNECTION_TYPE); 103 configurations = configuration.getChildren(TOPIC_CONNECTION_CONFIG); 105 configureConnections(configurations, TOPIC_CONNECTION_TYPE); 106 configurations = configuration.getChildren(QUEUE_CONNECTION_CONFIG); 108 configureConnections(configurations, QUEUE_CONNECTION_TYPE); 109 } 110 111 private void configureConnections(Configuration[] connections, int type) throws ConfigurationException { 112 for (int i = 0; i < connections.length; i++) { 113 final String name = connections[i].getAttribute(NAME_ATTR); 114 if (m_configurations.containsKey(name)) { 115 throw new ConfigurationException("Duplicate connection name '" + name + "'." + 116 " Connection names must be unique."); 117 } 118 final Parameters parameters = Parameters.fromConfiguration(connections[i]); 119 ConnectionConfiguration cc = new ConnectionConfiguration(name, parameters, type); 120 m_configurations.put(name, cc); 121 } 122 } 123 124 public void initialize() throws Exception { 125 m_listeners = new HashMap (); 126 m_connections = new HashMap (m_configurations.size()); 127 final Iterator iter = m_configurations.values().iterator(); 128 129 while (iter.hasNext()) { 130 final ConnectionConfiguration cc = (ConnectionConfiguration) iter.next(); 131 try { 132 final Connection connection = createConnection(cc); 133 134 m_connections.put(cc.getName(), connection); 135 } 136 catch (NamingException e) { 137 } 139 } 140 m_configurations = null; 141 } 142 143 public void start() throws Exception { 144 final Iterator iter = m_connections.entrySet().iterator(); 145 while (iter.hasNext()) { 146 final Map.Entry entry = (Map.Entry ) iter.next(); 147 if (getLogger().isDebugEnabled()) { 148 getLogger().debug("Starting JMS connection " + entry.getKey()); 149 } 150 final Connection connection = (Connection ) entry.getValue(); 151 connection.start(); 152 } 153 } 154 155 public void stop() throws Exception { 156 final Iterator iter = m_connections.entrySet().iterator(); 157 while (iter.hasNext()) { 158 final Map.Entry entry = (Map.Entry ) iter.next(); 159 stopConnection((String ) entry.getKey(), (Connection ) entry.getValue()); 160 } 161 } 162 163 void stopConnection(String name, Connection connection) { 164 if (getLogger().isDebugEnabled()) { 165 getLogger().debug("Stopping JMS connection " + name); 166 } 167 try { 168 connection.stop(); 169 } 170 catch (JMSException e) { 171 } 173 } 174 175 public void dispose() { 176 final Iterator iter = m_connections.entrySet().iterator(); 177 while (iter.hasNext()) { 178 final Map.Entry entry = (Map.Entry ) iter.next(); 179 if (getLogger().isDebugEnabled()) { 180 getLogger().debug("Closing JMS connection " + entry.getKey()); 181 } 182 try { 183 final Connection connection = (Connection ) entry.getValue(); 184 connection.close(); 185 } 186 catch (JMSException e) { 187 getLogger().error("Error closing JMS connection " + entry.getKey(), e); 188 } 189 } 190 } 191 192 194 public synchronized Connection getConnection(String name) { 195 return (Connection ) m_connections.get(name); 196 } 197 198 public synchronized TopicConnection getTopicConnection(String name) { 199 return (TopicConnection ) m_connections.get(name); 200 } 201 202 public synchronized QueueConnection getQueueConnection(String name) { 203 return (QueueConnection ) m_connections.get(name); 204 } 205 206 208 public synchronized void addConnectionListener(String name, JMSConnectionEventListener listener) { 209 Set connectionListeners = (Set ) m_listeners.get(name); 210 if (connectionListeners == null) { 211 connectionListeners = new HashSet (); 212 m_listeners.put(name, connectionListeners); 213 } 214 connectionListeners.add(listener); 215 } 216 217 public synchronized void removeConnectionListener(String name, JMSConnectionEventListener listener) { 218 Set connectionListeners = (Set ) m_listeners.get(name); 219 if (connectionListeners != null) { 220 connectionListeners.remove(listener); 221 } 222 } 223 224 226 Connection createConnection(ConnectionConfiguration cc) throws NamingException , JMSException { 227 try { 228 final InitialContext context = createInitialContext(cc.getJNDIProperties()); 229 final ConnectionFactory factory = (ConnectionFactory ) context.lookup(cc.getConnectionFactory()); 230 final Connection connection = createConnection(factory, cc); 231 if (cc.isAutoReconnect()) { 232 connection.setExceptionListener(new ReconnectionListener(this, cc)); 233 } 234 return connection; 235 } 236 catch (NamingException e) { 237 if (getLogger().isWarnEnabled()) { 238 final Throwable rootCause = e.getRootCause(); 239 if (rootCause != null) { 240 String message = e.getRootCause().getMessage(); 241 if (rootCause instanceof ClassNotFoundException ) { 242 String info = "WARN! *** JMS block is installed but jms client library not found. ***\n" + 243 "- For the jms block to work you must install and start a JMS server and " + 244 "place the client jar in WEB-INF/lib."; 245 if (message.indexOf("exolab") > 0 ) { 246 info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon."; 247 } 248 System.err.println(info); 249 getLogger().warn(info,e); 250 } else { 251 System.out.println(message); 252 getLogger().warn("Cannot get Initial Context. Is the JNDI server reachable?",e); 253 } 254 } 255 else { 256 getLogger().warn("Failed to initialize JMS.",e); 257 } 258 } 259 throw e; 260 } 261 } 262 263 private Connection createConnection(ConnectionFactory factory, ConnectionConfiguration cc) throws JMSException { 264 if (cc.getUserName() != null) { 265 switch (cc.getType()) { 266 case CONNECTION_TYPE: { 267 return factory.createConnection(cc.getUserName(), cc.getPassword()); 268 } 269 case TOPIC_CONNECTION_TYPE: { 270 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) factory; 271 return topicFactory.createTopicConnection(cc.getUserName(), cc.getPassword()); 272 } 273 case QUEUE_CONNECTION_TYPE: { 274 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) factory; 275 return queueFactory.createQueueConnection(cc.getUserName(), cc.getPassword()); 276 } 277 } 278 } 279 switch (cc.getType()) { 280 case CONNECTION_TYPE: { 281 return factory.createConnection(); 282 } 283 case TOPIC_CONNECTION_TYPE: { 284 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) factory; 285 return topicFactory.createTopicConnection(); 286 } 287 case QUEUE_CONNECTION_TYPE: { 288 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) factory; 289 return queueFactory.createQueueConnection(); 290 } 291 } 292 return null; 293 } 294 295 private InitialContext createInitialContext(Properties properties) throws NamingException { 296 if (properties != null) { 297 return new InitialContext (properties); 298 } 299 return new InitialContext (); 300 } 301 302 synchronized void removeConnection(String name) { 303 notifyListenersOfDisconnection(name); 304 final Connection connection = (Connection ) m_connections.remove(name); 305 stopConnection(name, connection); 306 } 307 308 synchronized void addConnection(String name, Connection connection) { 309 m_connections.put(name, connection); 310 notifyListenersOfConnection(name); 311 } 312 313 void scheduleReconnectionJob(ConnectionConfiguration configuration) { 314 if (getLogger().isInfoEnabled()) { 315 getLogger().info("Scheduling JMS reconnection job for: " + configuration.getName()); 316 } 317 JobScheduler scheduler = null; 318 try { 319 scheduler = (JobScheduler) m_serviceManager.lookup(JobScheduler.ROLE); 320 Date executionTime = new Date (System.currentTimeMillis() + configuration.getAutoReconnectDelay()); 321 ReconnectionJob job = new ReconnectionJob(this, configuration); 322 scheduler.fireJobAt(executionTime, "reconnect_" + configuration.getName(), job); 323 } 324 catch (ServiceException e) { 325 if (getLogger().isWarnEnabled()) { 326 getLogger().warn("Cannot obtain scheduler.",e); 327 } 328 } 329 catch (CascadingException e) { 330 if (getLogger().isWarnEnabled()) { 331 getLogger().warn("Unable to schedule reconnection job.",e); 332 } 333 } 334 finally { 335 if (scheduler != null) { 336 m_serviceManager.release(scheduler); 337 } 338 } 339 } 340 341 private void notifyListenersOfConnection(String name) { 342 Set connectionListeners = (Set ) m_listeners.get(name); 343 if (connectionListeners != null) { 344 for (Iterator listenersIterator = connectionListeners.iterator(); listenersIterator.hasNext();) { 345 JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator.next(); 346 listener.onConnection(name); 347 } 348 } 349 } 350 351 private void notifyListenersOfDisconnection(String name) { 352 Set connectionListeners = (Set ) m_listeners.get(name); 353 if (connectionListeners != null) { 354 for (Iterator listenersIterator = connectionListeners.iterator(); listenersIterator.hasNext();) { 355 JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator.next(); 356 listener.onDisconnection(name); 357 } 358 } 359 } 360 361 static final class ConnectionConfiguration { 362 363 365 private final String m_name; 366 private final int m_type; 367 private final String m_connectionFactory; 368 private final String m_username; 369 private final String m_password; 370 private final boolean m_autoReconnect; 371 private final int m_autoReconnectDelay; 372 373 private Properties m_jndiProperties = new Properties (); 374 375 ConnectionConfiguration(String name, Parameters parameters, int type) 376 throws ConfigurationException { 377 m_name = name; 378 try { 379 m_connectionFactory = parameters.getParameter(CONNECTION_FACTORY_PARAM); 380 m_username = parameters.getParameter(USERNAME_PARAM, null); 381 m_password = parameters.getParameter(PASSWORD_PARAM, null); 382 m_autoReconnect = parameters.getParameterAsBoolean(AUTO_RECONNECT_PARAM, false); 383 m_autoReconnectDelay = parameters.getParameterAsInteger(AUTO_RECONNECT_DELAY_PARAM, DEFAULT_AUTO_RECONNECT_DELAY); 384 385 String [] names = parameters.getNames(); 387 for (int i = 0; i < names.length; i++) { 388 if (names[i].startsWith(JNDI_PROPERTY_PREFIX)) { 389 m_jndiProperties.put(names[i], parameters.getParameter(names[i])); 390 } 391 } 392 } 393 catch (ParameterException e) { 394 throw new ConfigurationException(e.getLocalizedMessage()); 395 } 396 m_type = type; 397 } 398 399 String getName() { 400 return m_name; 401 } 402 403 int getType() { 404 return m_type; 405 } 406 407 Properties getJNDIProperties() { 408 return m_jndiProperties; 409 } 410 411 String getConnectionFactory() { 412 return m_connectionFactory; 413 } 414 415 String getUserName() { 416 return m_username; 417 } 418 419 String getPassword() { 420 return m_password; 421 } 422 423 boolean isAutoReconnect() { 424 return m_autoReconnect; 425 } 426 427 int getAutoReconnectDelay() { 428 return m_autoReconnectDelay; 429 } 430 431 public int hashCode() { 432 return m_name.hashCode(); 433 } 434 435 } 436 437 static final class ReconnectionListener implements ExceptionListener { 438 439 private final JMSConnectionManagerImpl m_manager; 440 private final ConnectionConfiguration m_configuration; 441 442 ReconnectionListener(JMSConnectionManagerImpl manager, ConnectionConfiguration configuration) { 443 super(); 444 m_manager = manager; 445 m_configuration = configuration; 446 } 447 448 public void onException(JMSException exception) { 449 m_manager.removeConnection(m_configuration.getName()); 450 m_manager.scheduleReconnectionJob(m_configuration); 451 } 452 453 } 454 455 static final class ReconnectionJob implements CronJob { 456 457 private final JMSConnectionManagerImpl m_manager; 458 private final ConnectionConfiguration m_configuration; 459 460 ReconnectionJob(JMSConnectionManagerImpl manager, ConnectionConfiguration configuration) { 461 super(); 462 m_manager = manager; 463 m_configuration = configuration; 464 } 465 466 public void execute(String jobname) { 467 final Logger logger = m_manager.getLogger(); 468 if (logger.isInfoEnabled()) { 469 logger.info("Reconnecting JMS connection: " + m_configuration.getName()); 470 } 471 try { 472 final Connection connection = m_manager.createConnection(m_configuration); 473 m_manager.addConnection(m_configuration.getName(), connection); 474 if (logger.isInfoEnabled()) { 475 logger.info("Successfully reconnected JMS connection: " + m_configuration.getName()); 476 } 477 } 478 catch (NamingException e) { 479 if (logger.isWarnEnabled()) { 480 logger.warn("Failed to reconnect.",e); 481 } 482 m_manager.scheduleReconnectionJob(m_configuration); 483 } 484 catch (JMSException e) { 485 if (logger.isWarnEnabled()) { 486 logger.warn("Failed to reconnect.",e); 487 } 488 m_manager.scheduleReconnectionJob(m_configuration); 489 } 490 } 491 } 492 493 } 494 | Popular Tags |