KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > cocoon > components > jms > JMSConnectionManagerImpl


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.apache.cocoon.components.jms;
17
18 import java.util.Date JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.HashSet JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Map JavaDoc;
23 import java.util.Properties JavaDoc;
24 import java.util.Set JavaDoc;
25 import javax.jms.Connection JavaDoc;
26 import javax.jms.ConnectionFactory JavaDoc;
27 import javax.jms.ExceptionListener JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.TopicConnection JavaDoc;
32 import javax.jms.TopicConnectionFactory JavaDoc;
33 import javax.naming.InitialContext JavaDoc;
34 import javax.naming.NamingException JavaDoc;
35 import org.apache.avalon.framework.CascadingException;
36 import org.apache.avalon.framework.activity.Disposable;
37 import org.apache.avalon.framework.activity.Initializable;
38 import org.apache.avalon.framework.activity.Startable;
39 import org.apache.avalon.framework.configuration.Configurable;
40 import org.apache.avalon.framework.configuration.Configuration;
41 import org.apache.avalon.framework.configuration.ConfigurationException;
42 import org.apache.avalon.framework.logger.AbstractLogEnabled;
43 import org.apache.avalon.framework.logger.Logger;
44 import org.apache.avalon.framework.parameters.ParameterException;
45 import org.apache.avalon.framework.parameters.Parameters;
46 import org.apache.avalon.framework.service.ServiceException;
47 import org.apache.avalon.framework.service.ServiceManager;
48 import org.apache.avalon.framework.service.Serviceable;
49 import org.apache.avalon.framework.thread.ThreadSafe;
50 import org.apache.cocoon.components.cron.CronJob;
51 import org.apache.cocoon.components.cron.JobScheduler;
52
53 /**
54  * {@link org.apache.cocoon.components.jms.JMSConnectionManager} implementation.
55  */

56 public class JMSConnectionManagerImpl extends AbstractLogEnabled
57 implements JMSConnectionManager, Serviceable, Configurable, Initializable,
58            Startable, Disposable, ThreadSafe, JMSConnectionEventNotifier {
59
60     // ---------------------------------------------------- Constants
61

62     private static final int TOPIC_CONNECTION_TYPE = 1;
63     private static final int QUEUE_CONNECTION_TYPE = 2;
64     private static final int CONNECTION_TYPE = 3;
65
66     private static final String JavaDoc CONNECTION_CONFIG = "connection";
67     private static final String JavaDoc TOPIC_CONNECTION_CONFIG = "topic-connection";
68     private static final String JavaDoc QUEUE_CONNECTION_CONFIG = "queue-connection";
69     private static final String JavaDoc NAME_ATTR = "name";
70     
71     private static final String JavaDoc CONNECTION_FACTORY_PARAM = "connection-factory";
72     private static final String JavaDoc USERNAME_PARAM = "username";
73     private static final String JavaDoc PASSWORD_PARAM = "password";
74     private static final String JavaDoc AUTO_RECONNECT_PARAM = "auto-reconnect";
75     private static final String JavaDoc AUTO_RECONNECT_DELAY_PARAM = "auto-reconnect-delay";
76     
77     private static final int DEFAULT_AUTO_RECONNECT_DELAY = 1000;
78     
79     private static final String JavaDoc JNDI_PROPERTY_PREFIX = "java.naming.";
80
81     // ---------------------------------------------------- Instance variables
82

83     private ServiceManager m_serviceManager;
84     
85     private Map JavaDoc m_configurations;
86     private Map JavaDoc m_connections;
87     private Map JavaDoc m_listeners;
88
89     // ---------------------------------------------------- Lifecycle
90

91     public JMSConnectionManagerImpl() {
92     }
93     
94     public void service(ServiceManager manager) {
95         m_serviceManager = manager;
96     }
97
98     public void configure(Configuration configuration) throws ConfigurationException {
99         m_configurations = new HashMap JavaDoc(configuration.getChildren().length);
100         // <connection>s
101
Configuration[] configurations = configuration.getChildren(CONNECTION_CONFIG);
102         configureConnections(configurations, CONNECTION_TYPE);
103         // <topic-connection>s
104
configurations = configuration.getChildren(TOPIC_CONNECTION_CONFIG);
105         configureConnections(configurations, TOPIC_CONNECTION_TYPE);
106         // <queue-connection>s
107
configurations = configuration.getChildren(QUEUE_CONNECTION_CONFIG);
108         configureConnections(configurations, QUEUE_CONNECTION_TYPE);
109     }
110     
111     private void configureConnections(Configuration[] connections, int type) throws ConfigurationException {
112         for (int i = 0; i < connections.length; i++) {
113             final String JavaDoc name = connections[i].getAttribute(NAME_ATTR);
114             if (m_configurations.containsKey(name)) {
115                 throw new ConfigurationException("Duplicate connection name '" + name + "'." +
116                         " Connection names must be unique.");
117             }
118             final Parameters parameters = Parameters.fromConfiguration(connections[i]);
119             ConnectionConfiguration cc = new ConnectionConfiguration(name, parameters, type);
120             m_configurations.put(name, cc);
121         }
122     }
123
124     public void initialize() throws Exception JavaDoc {
125         m_listeners = new HashMap JavaDoc();
126         m_connections = new HashMap JavaDoc(m_configurations.size());
127         final Iterator JavaDoc iter = m_configurations.values().iterator();
128
129         while (iter.hasNext()) {
130             final ConnectionConfiguration cc = (ConnectionConfiguration) iter.next();
131             try {
132                 final Connection JavaDoc connection = createConnection(cc);
133                 
134                 m_connections.put(cc.getName(), connection);
135             }
136             catch (NamingException JavaDoc e) {
137                 // ignore, warnings for NamingExceptions are logged by createConnection method
138
}
139         }
140         m_configurations = null;
141     }
142
143     public void start() throws Exception JavaDoc {
144         final Iterator JavaDoc iter = m_connections.entrySet().iterator();
145         while (iter.hasNext()) {
146             final Map.Entry JavaDoc entry = (Map.Entry JavaDoc) iter.next();
147             if (getLogger().isDebugEnabled()) {
148                 getLogger().debug("Starting JMS connection " + entry.getKey());
149             }
150             final Connection JavaDoc connection = (Connection JavaDoc) entry.getValue();
151             connection.start();
152         }
153     }
154
155     public void stop() throws Exception JavaDoc {
156         final Iterator JavaDoc iter = m_connections.entrySet().iterator();
157         while (iter.hasNext()) {
158             final Map.Entry JavaDoc entry = (Map.Entry JavaDoc) iter.next();
159             stopConnection((String JavaDoc) entry.getKey(), (Connection JavaDoc) entry.getValue());
160         }
161     }
162
163     void stopConnection(String JavaDoc name, Connection JavaDoc connection) {
164         if (getLogger().isDebugEnabled()) {
165             getLogger().debug("Stopping JMS connection " + name);
166         }
167         try {
168             connection.stop();
169         }
170         catch (JMSException JavaDoc e) {
171             // ignore
172
}
173     }
174
175     public void dispose() {
176         final Iterator JavaDoc iter = m_connections.entrySet().iterator();
177         while (iter.hasNext()) {
178             final Map.Entry JavaDoc entry = (Map.Entry JavaDoc) iter.next();
179             if (getLogger().isDebugEnabled()) {
180                 getLogger().debug("Closing JMS connection " + entry.getKey());
181             }
182             try {
183                 final Connection JavaDoc connection = (Connection JavaDoc) entry.getValue();
184                 connection.close();
185             }
186             catch (JMSException JavaDoc e) {
187                 getLogger().error("Error closing JMS connection " + entry.getKey(), e);
188             }
189         }
190     }
191
192     // ---------------------------------------------------- ConnectionManager
193

194     public synchronized Connection JavaDoc getConnection(String JavaDoc name) {
195         return (Connection JavaDoc) m_connections.get(name);
196     }
197
198     public synchronized TopicConnection JavaDoc getTopicConnection(String JavaDoc name) {
199         return (TopicConnection JavaDoc) m_connections.get(name);
200     }
201
202     public synchronized QueueConnection JavaDoc getQueueConnection(String JavaDoc name) {
203         return (QueueConnection JavaDoc) m_connections.get(name);
204     }
205
206     // ---------------------------------------------------- JMSConnectionEventNotifier
207

208     public synchronized void addConnectionListener(String JavaDoc name, JMSConnectionEventListener listener) {
209        Set JavaDoc connectionListeners = (Set JavaDoc) m_listeners.get(name);
210        if (connectionListeners == null) {
211            connectionListeners = new HashSet JavaDoc();
212            m_listeners.put(name, connectionListeners);
213        }
214        connectionListeners.add(listener);
215     }
216
217     public synchronized void removeConnectionListener(String JavaDoc name, JMSConnectionEventListener listener) {
218         Set JavaDoc connectionListeners = (Set JavaDoc) m_listeners.get(name);
219         if (connectionListeners != null) {
220             connectionListeners.remove(listener);
221         }
222      }
223
224     // ---------------------------------------------------- Implementation
225

226     Connection JavaDoc createConnection(ConnectionConfiguration cc) throws NamingException JavaDoc, JMSException JavaDoc {
227         try {
228             final InitialContext JavaDoc context = createInitialContext(cc.getJNDIProperties());
229             final ConnectionFactory JavaDoc factory = (ConnectionFactory JavaDoc) context.lookup(cc.getConnectionFactory());
230             final Connection JavaDoc connection = createConnection(factory, cc);
231             if (cc.isAutoReconnect()) {
232                 connection.setExceptionListener(new ReconnectionListener(this, cc));
233             }
234             return connection;
235         }
236         catch (NamingException JavaDoc e) {
237             if (getLogger().isWarnEnabled()) {
238                 final Throwable JavaDoc rootCause = e.getRootCause();
239                 if (rootCause != null) {
240                     String JavaDoc message = e.getRootCause().getMessage();
241                     if (rootCause instanceof ClassNotFoundException JavaDoc) {
242                         String JavaDoc info = "WARN! *** JMS block is installed but jms client library not found. ***\n" +
243                             "- For the jms block to work you must install and start a JMS server and " +
244                             "place the client jar in WEB-INF/lib.";
245                             if (message.indexOf("exolab") > 0 ) {
246                                 info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
247                             }
248                         System.err.println(info);
249                         getLogger().warn(info,e);
250                     } else {
251                         System.out.println(message);
252                         getLogger().warn("Cannot get Initial Context. Is the JNDI server reachable?",e);
253                     }
254                 }
255                 else {
256                     getLogger().warn("Failed to initialize JMS.",e);
257                 }
258             }
259             throw e;
260         }
261     }
262
263     private Connection JavaDoc createConnection(ConnectionFactory JavaDoc factory, ConnectionConfiguration cc) throws JMSException JavaDoc {
264         if (cc.getUserName() != null) {
265             switch (cc.getType()) {
266                 case CONNECTION_TYPE: {
267                     return factory.createConnection(cc.getUserName(), cc.getPassword());
268                 }
269                 case TOPIC_CONNECTION_TYPE: {
270                     TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) factory;
271                     return topicFactory.createTopicConnection(cc.getUserName(), cc.getPassword());
272                 }
273                 case QUEUE_CONNECTION_TYPE: {
274                     QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) factory;
275                     return queueFactory.createQueueConnection(cc.getUserName(), cc.getPassword());
276                 }
277             }
278         }
279         switch (cc.getType()) {
280             case CONNECTION_TYPE: {
281                 return factory.createConnection();
282             }
283             case TOPIC_CONNECTION_TYPE: {
284                 TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) factory;
285                 return topicFactory.createTopicConnection();
286             }
287             case QUEUE_CONNECTION_TYPE: {
288                 QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) factory;
289                 return queueFactory.createQueueConnection();
290             }
291         }
292         return null;
293     }
294
295     private InitialContext JavaDoc createInitialContext(Properties JavaDoc properties) throws NamingException JavaDoc {
296         if (properties != null) {
297             return new InitialContext JavaDoc(properties);
298         }
299         return new InitialContext JavaDoc();
300     }
301     
302     synchronized void removeConnection(String JavaDoc name) {
303         notifyListenersOfDisconnection(name);
304         final Connection JavaDoc connection = (Connection JavaDoc) m_connections.remove(name);
305         stopConnection(name, connection);
306     }
307     
308     synchronized void addConnection(String JavaDoc name, Connection JavaDoc connection) {
309         m_connections.put(name, connection);
310         notifyListenersOfConnection(name);
311     }
312     
313     void scheduleReconnectionJob(ConnectionConfiguration configuration) {
314         if (getLogger().isInfoEnabled()) {
315             getLogger().info("Scheduling JMS reconnection job for: " + configuration.getName());
316         }
317         JobScheduler scheduler = null;
318         try {
319             scheduler = (JobScheduler) m_serviceManager.lookup(JobScheduler.ROLE);
320             Date JavaDoc executionTime = new Date JavaDoc(System.currentTimeMillis() + configuration.getAutoReconnectDelay());
321             ReconnectionJob job = new ReconnectionJob(this, configuration);
322             scheduler.fireJobAt(executionTime, "reconnect_" + configuration.getName(), job);
323         }
324         catch (ServiceException e) {
325             if (getLogger().isWarnEnabled()) {
326                 getLogger().warn("Cannot obtain scheduler.",e);
327             }
328         }
329         catch (CascadingException e) {
330             if (getLogger().isWarnEnabled()) {
331                 getLogger().warn("Unable to schedule reconnection job.",e);
332             }
333         }
334         finally {
335             if (scheduler != null) {
336                 m_serviceManager.release(scheduler);
337             }
338         }
339     }
340     
341     private void notifyListenersOfConnection(String JavaDoc name) {
342         Set JavaDoc connectionListeners = (Set JavaDoc) m_listeners.get(name);
343         if (connectionListeners != null) {
344             for (Iterator JavaDoc listenersIterator = connectionListeners.iterator(); listenersIterator.hasNext();) {
345                 JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator.next();
346                 listener.onConnection(name);
347             }
348         }
349     }
350     
351     private void notifyListenersOfDisconnection(String JavaDoc name) {
352         Set JavaDoc connectionListeners = (Set JavaDoc) m_listeners.get(name);
353         if (connectionListeners != null) {
354             for (Iterator JavaDoc listenersIterator = connectionListeners.iterator(); listenersIterator.hasNext();) {
355                 JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator.next();
356                 listener.onDisconnection(name);
357             }
358         }
359     }
360
361     static final class ConnectionConfiguration {
362         
363         // ------------------------------------------------ Instance variables
364

365         private final String JavaDoc m_name;
366         private final int m_type;
367         private final String JavaDoc m_connectionFactory;
368         private final String JavaDoc m_username;
369         private final String JavaDoc m_password;
370         private final boolean m_autoReconnect;
371         private final int m_autoReconnectDelay;
372         
373         private Properties JavaDoc m_jndiProperties = new Properties JavaDoc();
374
375         ConnectionConfiguration(String JavaDoc name, Parameters parameters, int type)
376         throws ConfigurationException {
377             m_name = name;
378             try {
379                 m_connectionFactory = parameters.getParameter(CONNECTION_FACTORY_PARAM);
380                 m_username = parameters.getParameter(USERNAME_PARAM, null);
381                 m_password = parameters.getParameter(PASSWORD_PARAM, null);
382                 m_autoReconnect = parameters.getParameterAsBoolean(AUTO_RECONNECT_PARAM, false);
383                 m_autoReconnectDelay = parameters.getParameterAsInteger(AUTO_RECONNECT_DELAY_PARAM, DEFAULT_AUTO_RECONNECT_DELAY);
384                 
385                 // parse the jndi property parameters
386
String JavaDoc[] names = parameters.getNames();
387                 for (int i = 0; i < names.length; i++) {
388                     if (names[i].startsWith(JNDI_PROPERTY_PREFIX)) {
389                         m_jndiProperties.put(names[i], parameters.getParameter(names[i]));
390                     }
391                 }
392             }
393             catch (ParameterException e) {
394                 throw new ConfigurationException(e.getLocalizedMessage());
395             }
396             m_type = type;
397         }
398
399         String JavaDoc getName() {
400             return m_name;
401         }
402
403         int getType() {
404             return m_type;
405         }
406
407         Properties JavaDoc getJNDIProperties() {
408             return m_jndiProperties;
409         }
410
411         String JavaDoc getConnectionFactory() {
412             return m_connectionFactory;
413         }
414
415         String JavaDoc getUserName() {
416             return m_username;
417         }
418
419         String JavaDoc getPassword() {
420             return m_password;
421         }
422         
423         boolean isAutoReconnect() {
424             return m_autoReconnect;
425         }
426         
427         int getAutoReconnectDelay() {
428             return m_autoReconnectDelay;
429         }
430
431         public int hashCode() {
432             return m_name.hashCode();
433         }
434
435     }
436
437     static final class ReconnectionListener implements ExceptionListener JavaDoc {
438
439         private final JMSConnectionManagerImpl m_manager;
440         private final ConnectionConfiguration m_configuration;
441         
442         ReconnectionListener(JMSConnectionManagerImpl manager, ConnectionConfiguration configuration) {
443             super();
444             m_manager = manager;
445             m_configuration = configuration;
446         }
447
448         public void onException(JMSException JavaDoc exception) {
449             m_manager.removeConnection(m_configuration.getName());
450             m_manager.scheduleReconnectionJob(m_configuration);
451         }
452
453     }
454
455     static final class ReconnectionJob implements CronJob {
456
457         private final JMSConnectionManagerImpl m_manager;
458         private final ConnectionConfiguration m_configuration;
459
460         ReconnectionJob(JMSConnectionManagerImpl manager, ConnectionConfiguration configuration) {
461             super();
462             m_manager = manager;
463             m_configuration = configuration;
464         }
465         
466         public void execute(String JavaDoc jobname) {
467             final Logger logger = m_manager.getLogger();
468             if (logger.isInfoEnabled()) {
469                 logger.info("Reconnecting JMS connection: " + m_configuration.getName());
470             }
471             try {
472                 final Connection JavaDoc connection = m_manager.createConnection(m_configuration);
473                 m_manager.addConnection(m_configuration.getName(), connection);
474                 if (logger.isInfoEnabled()) {
475                     logger.info("Successfully reconnected JMS connection: " + m_configuration.getName());
476                 }
477             }
478             catch (NamingException JavaDoc e) {
479                 if (logger.isWarnEnabled()) {
480                     logger.warn("Failed to reconnect.",e);
481                 }
482                 m_manager.scheduleReconnectionJob(m_configuration);
483             }
484             catch (JMSException JavaDoc e) {
485                 if (logger.isWarnEnabled()) {
486                     logger.warn("Failed to reconnect.",e);
487                 }
488                 m_manager.scheduleReconnectionJob(m_configuration);
489             }
490         }
491     }
492
493 }
494
Popular Tags