KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > outerj > daisy > jms > impl > JmsClientImpl


1 /*
2  * Copyright 2004 Outerthought bvba and Schaubroeck nv
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.outerj.daisy.jms.impl;
17
18 import java.util.ArrayList JavaDoc;
19 import java.util.Collections JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.List JavaDoc;
22 import java.util.Properties JavaDoc;
23
24 import javax.jms.*;
25 import javax.naming.Context JavaDoc;
26 import javax.naming.InitialContext JavaDoc;
27 import javax.naming.NamingException JavaDoc;
28
29 import org.apache.avalon.framework.activity.Disposable;
30 import org.apache.avalon.framework.activity.Initializable;
31 import org.apache.avalon.framework.configuration.Configurable;
32 import org.apache.avalon.framework.configuration.Configuration;
33 import org.apache.avalon.framework.configuration.ConfigurationException;
34 import org.apache.avalon.framework.logger.AbstractLogEnabled;
35 import org.apache.avalon.framework.logger.Logger;
36 import org.apache.avalon.framework.service.ServiceException;
37 import org.apache.avalon.framework.service.ServiceManager;
38 import org.apache.avalon.framework.service.Serviceable;
39 import org.apache.avalon.framework.thread.ThreadSafe;
40 import org.outerj.daisy.jms.JmsClient;
41 import org.outerj.daisy.jms.Sender;
42 import org.outerj.daisy.configutil.PropertyResolver;
43
44 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
45
46 /**
47  * @avalon.component version="1.0" name="jsmclient" lifestyle="singleton"
48  * @avalon.service type="org.outerj.daisy.jms.JmsClient"
49  */

50 public class JmsClientImpl extends AbstractLogEnabled implements JmsClient, Configurable, Initializable, Disposable, ThreadSafe, Serviceable {
51     private Properties JavaDoc contextProperties;
52     private String JavaDoc jmsUserName;
53     private String JavaDoc jmsPassword;
54     private String JavaDoc connectionFactoryName;
55     private String JavaDoc clientId;
56     private Connection jmsConnection;
57     private static final int CONN_RETRY_INTERVAL = 10000;
58     private boolean stopping = false;
59     private List JavaDoc consumers = new ArrayList JavaDoc();
60     private List JavaDoc senders = new ArrayList JavaDoc();
61     private List JavaDoc runningThreads = Collections.synchronizedList(new ArrayList JavaDoc());
62     private WriterPreferenceReadWriteLock suspendLock = new WriterPreferenceReadWriteLock();
63
64     public JmsClientImpl() {
65     }
66
67     /**
68      *
69      * @param clientId the JMS client ID, should be unique for each client
70      */

71     public JmsClientImpl(Properties JavaDoc contextProperties, String JavaDoc userName, String JavaDoc password, String JavaDoc clientId,
72                          String JavaDoc connectionFactoryName, Logger logger) throws Exception JavaDoc {
73         this.contextProperties = contextProperties;
74         this.jmsUserName = userName;
75         this.jmsPassword = password;
76         this.clientId = clientId;
77         this.connectionFactoryName = connectionFactoryName;
78         enableLogging(logger);
79         initialize();
80     }
81
82     public boolean suspend(long msecs) throws InterruptedException JavaDoc {
83         return this.suspendLock.writeLock().attempt(msecs);
84     }
85
86     public void resume() {
87         this.suspendLock.writeLock().release();
88     }
89     
90     
91     /**
92      * @avalon.dependency key="driverregistrar" type="org.outerj.daisy.datasource.DriverRegistrar"
93      */

94     public void service(ServiceManager serviceManager) throws ServiceException {
95         // used to set a dependency
96
}
97
98     public void configure(Configuration configuration) throws ConfigurationException {
99         Configuration jmsConf = configuration.getChild("jmsConnection");
100         Configuration[] propertiesConf = jmsConf.getChild("initialContext").getChildren("property");
101         contextProperties = new Properties JavaDoc();
102         for (int i = 0; i < propertiesConf.length; i++) {
103             if (!contextProperties.containsKey(propertiesConf[i].getAttribute("name"))) {
104                 String JavaDoc value = PropertyResolver.resolveProperties(propertiesConf[i].getAttribute("value"));
105                 // special hack for ActiveMQ on Windows: broker config should be an URL,
106
// but substitution of ${daisy.datadir} might contain backslashes, so
107
// convert backslashes to slashes
108
if (value.indexOf("brokerConfig=xbean:file:") != -1) {
109                     value = value.replaceAll("\\\\", "/");
110                 }
111                 contextProperties.put(propertiesConf[i].getAttribute("name"), value);
112             }
113         }
114
115         Configuration jmsCredentials = jmsConf.getChild("credentials", false);
116         if (jmsCredentials != null) {
117             jmsUserName = jmsCredentials.getAttribute("username");
118             jmsPassword = jmsCredentials.getAttribute("password");
119         }
120         
121         clientId = jmsConf.getChild("clientId").getValue();
122
123         connectionFactoryName = jmsConf.getChild("connectionFactoryName").getValue();
124     }
125
126     public void initialize() throws Exception JavaDoc {
127         // Note: we don't let this component initialize (and thus don't let daisy start) if establishing
128
// the JMS connections fails. Reason is to remind people that they need to start their JMS server,
129
// or configure the connection properly.
130
initializeJmsConnection(true);
131     }
132
133     protected void initializeJmsConnection(boolean failOnError) throws Exception JavaDoc {
134         while (jmsConnection == null) {
135             try {
136                 getLogger().debug("Trying to establish JMS connection...");
137                 Context JavaDoc context = getContext();
138
139                 ConnectionFactory jmsFactory = (ConnectionFactory) context.lookup(connectionFactoryName);
140                 if (jmsUserName != null)
141                     jmsConnection = jmsFactory.createConnection(jmsUserName, jmsPassword);
142                 else
143                     jmsConnection = jmsFactory.createConnection();
144                 jmsConnection.setClientID(clientId);
145                 connectionUp();
146
147                 jmsConnection.setExceptionListener(new MyJmsExceptionListener());
148                 jmsConnection.start();
149             } catch (Exception JavaDoc e) {
150                 if (failOnError)
151                     throw e;
152                 try {
153                     Thread.sleep(CONN_RETRY_INTERVAL);
154                 } catch (InterruptedException JavaDoc e1) {
155                     if (stopping)
156                         throw e1;
157                 }
158             }
159         }
160         getLogger().info("JMS connection established.");
161     }
162
163     private class MyJmsExceptionListener implements ExceptionListener {
164         public void onException(JMSException e) {
165             if (stopping)
166                 return;
167             getLogger().error("Error with the JMS connection. Will automatically try to re-establish connection every " + CONN_RETRY_INTERVAL + " ms.", e);
168             connectionDown();
169             jmsConnection = null;
170             Thread JavaDoc thread = new ConnectionEstablisherThread();
171             runningThreads.add(thread);
172             thread.start();
173         }
174     }
175
176     private class ConnectionEstablisherThread extends Thread JavaDoc {
177         public ConnectionEstablisherThread() {
178             super("DaisyJmsConnectionEstablisher");
179             setDaemon(true);
180         }
181
182         public void run() {
183             try {
184                 initializeJmsConnection(false);
185             } catch (Exception JavaDoc e2) {
186                 // can probably never occur since the initializeJmsConnection method catches all exceptions
187
getLogger().error("Error trying to establish JMS topic connection, giving up.", e2);
188             }
189             runningThreads.remove(this);
190         }
191     }
192
193     private Context JavaDoc getContext() throws NamingException JavaDoc {
194         ClassLoader JavaDoc current = Thread.currentThread().getContextClassLoader();
195         Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
196         try {
197             return new InitialContext JavaDoc(contextProperties);
198         } finally {
199             Thread.currentThread().setContextClassLoader(current);
200         }
201     }
202
203     public void dispose() {
204         stopping = true;
205
206         if (runningThreads.size() > 0 && getLogger().isDebugEnabled())
207             getLogger().debug("Will interrupt " + runningThreads.size() + " JMS connection-establishing threads.");
208         Iterator JavaDoc runningThreadIt = runningThreads.iterator();
209         while (runningThreadIt.hasNext()) {
210             Thread JavaDoc thread = (Thread JavaDoc)runningThreadIt.next();
211             thread.interrupt();
212         }
213
214         try {
215             if (jmsConnection != null) {
216                 jmsConnection.stop();
217                 jmsConnection.close();
218             }
219         } catch (JMSException e) {
220             getLogger().error("Error closing JMS connection.", e);
221         }
222     }
223
224     private synchronized void connectionUp() {
225         bringUp(consumers);
226         bringUp(senders);
227     }
228
229     private synchronized void connectionDown() {
230         bringDown(consumers);
231         bringDown(senders);
232     }
233
234     private void bringUp(List JavaDoc list) {
235         Iterator JavaDoc it = list.iterator();
236         while (it.hasNext()) {
237             try {
238                 ((Reconnectable)it.next()).connectionUp();
239             } catch (Throwable JavaDoc e) {
240                 getLogger().error("Error 'upping' a JMS session.", e);
241             }
242         }
243     }
244
245     private void bringDown(List JavaDoc list) {
246         Iterator JavaDoc it = list.iterator();
247         while (it.hasNext()) {
248             try {
249                 ((Reconnectable)it.next()).connectionDown();
250             } catch (Throwable JavaDoc e) {
251                 getLogger().error("Error 'downing' a JMS session.", e);
252             }
253         }
254     }
255
256     public synchronized void registerDurableTopicListener(String JavaDoc topicName, String JavaDoc subscriptionName, MessageListener listener) throws Exception JavaDoc {
257         MyJmsMessageListener theListener = new MyJmsMessageListener(topicName, subscriptionName, listener);
258         theListener.connectionUp();
259         consumers.add(theListener);
260     }
261
262     public synchronized void registerListener(String JavaDoc destinationName, MessageListener listener) throws Exception JavaDoc {
263         MyJmsMessageListener theListener = new MyJmsMessageListener(destinationName, null, listener);
264         theListener.connectionUp();
265         consumers.add(theListener);
266     }
267
268     public synchronized void unregisterListener(MessageListener listener) {
269         if (listener instanceof MyJmsMessageListener) {
270             consumers.remove(listener);
271             ((MyJmsMessageListener)listener).dispose();
272         } else {
273             throw new RuntimeException JavaDoc("Unexpected object: " + listener);
274         }
275     }
276
277     public synchronized Sender getSender(String JavaDoc destinationName) {
278         SenderImpl sender = new SenderImpl(destinationName);
279         try {
280             sender.connectionUp();
281         } catch (Exception JavaDoc e) {
282             getLogger().warn("Sender could not be initialized after initial retrieval, meaning the JMS connection is probably down.", e);
283         }
284         senders.add(sender);
285         return sender;
286     }
287
288     public synchronized void unregisterSender(Sender sender) {
289         if (sender instanceof SenderImpl) {
290             senders.remove(sender);
291             ((SenderImpl)sender).dispose();
292         } else {
293             throw new RuntimeException JavaDoc("Unexpected object: " + sender);
294         }
295     }
296
297     interface Reconnectable {
298         void connectionDown();
299
300         void connectionUp() throws Exception JavaDoc;
301     }
302
303     class MyJmsMessageListener implements MessageListener, Reconnectable {
304         private String JavaDoc destinationName;
305         private String JavaDoc subscriptionName;
306         private MessageListener delegate;
307         private Session session;
308
309         public MyJmsMessageListener(String JavaDoc destinationName, String JavaDoc subscriptionName, MessageListener delegate) {
310             this.destinationName = destinationName;
311             this.subscriptionName = subscriptionName;
312             this.delegate = delegate;
313         }
314
315         public void onMessage(Message message) {
316             try {
317                 suspendLock.readLock().acquire();
318             } catch (InterruptedException JavaDoc e) {
319                 // Note: when using AUTO_ACKNOWLEDGE, throwing a runtime exception will result
320
// in immediate retry of delivery.
321
throw new RuntimeException JavaDoc("Got InterruptedException while waiting for suspendLock.");
322             }
323             try {
324                 delegate.onMessage(message);
325             } finally {
326                 suspendLock.readLock().release();
327             }
328         }
329
330         public void connectionDown() {
331         }
332
333         public void connectionUp() throws Exception JavaDoc {
334             Destination jmsDestination = (Destination) getContext().lookup(destinationName);
335             session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
336             MessageConsumer consumer;
337             if (subscriptionName != null)
338                 consumer = session.createDurableSubscriber((Topic)jmsDestination, subscriptionName);
339             else
340                 consumer = session.createConsumer(jmsDestination);
341             consumer.setMessageListener(this);
342         }
343
344         public void dispose() {
345             try {
346                 if (session != null)
347                     session.close();
348             } catch (Exception JavaDoc e) {
349                 getLogger().error("Error closing JMS session.", e);
350             }
351         }
352     }
353
354     class SenderImpl implements Sender, Reconnectable {
355         private boolean connectionUp = false;
356         private Session session;
357         private String JavaDoc destinationName;
358         private MessageProducer messageProducer;
359
360         public SenderImpl(String JavaDoc destinationName) {
361             this.destinationName = destinationName;
362         }
363
364         public void send(final Message message) throws Exception JavaDoc {
365             executeWhenConnectionIsUp(new JMSAction() {
366                 public void run() throws JMSException {
367                     messageProducer.send(message);
368                 }
369             });
370         }
371
372         public TextMessage createTextMessage(final String JavaDoc text) throws JMSException {
373             final TextMessage[] message = new TextMessage[1];
374             executeWhenConnectionIsUp(new JMSAction() {
375                 public void run() throws Exception JavaDoc {
376                     message[0] = session.createTextMessage(text);
377                 }
378             });
379             return message[0];
380         }
381
382         public MapMessage createMapMessage() throws JMSException {
383             final MapMessage[] message = new MapMessage[1];
384             executeWhenConnectionIsUp(new JMSAction() {
385                 public void run() throws Exception JavaDoc {
386                     message[0] = session.createMapMessage();
387                 }
388             });
389             return message[0];
390         }
391
392         public void connectionDown() {
393             connectionUp = false;
394             session = null;
395             messageProducer = null;
396         }
397
398         public void connectionUp() throws Exception JavaDoc {
399             Destination jmsDestination = (Destination) getContext().lookup(destinationName);
400             session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
401             messageProducer = session.createProducer(jmsDestination);
402             connectionUp = true;
403         }
404
405         public void dispose() {
406             try {
407                 if (session != null)
408                     session.close();
409             } catch (Exception JavaDoc e) {
410                 getLogger().error("Error closing JMS session.", e);
411             }
412         }
413
414         protected void executeWhenConnectionIsUp(JMSAction action) {
415             stoppingLoop: while (!stopping) {
416                 // wait till connection is back up
417
while (!connectionUp) {
418                     getLogger().debug("JMS connection is down...");
419                     try {
420                         Thread.sleep(CONN_RETRY_INTERVAL);
421                     } catch (InterruptedException JavaDoc e) {
422                         getLogger().debug("Got interruptedexception while sleeping to wait for JMS connection to re-appear.", e);
423                         break stoppingLoop;
424                     }
425                 }
426
427                 // connection is back, try to send message
428
int i = 0;
429                 while (!stopping) {
430                     try {
431                         suspendLock.readLock().acquire();
432                     } catch (InterruptedException JavaDoc e) {
433                         getLogger().debug("Got interruptedexception while trying to get suspend lock.", e);
434                         break stoppingLoop;
435                     }
436                     try {
437                         action.run();
438                         return;
439                     } catch (Exception JavaDoc e) {
440                         if (!connectionUp) {
441                             // connection was just lost (again), go waiting till it is back up
442
break;
443                         } else {
444                             // action failed for another reason (maybe because connection is down but
445
// connectionUp flag is not yet changed), wait a little bit and try again, unless retry
446
// counter reach maximum
447
i++;
448                             if (i >= 3) {
449                                 throw new RuntimeException JavaDoc("Failed to execute JMS action, giving up.", e);
450                             } else {
451                                 try {
452                                     Thread.sleep(CONN_RETRY_INTERVAL);
453                                 } catch (InterruptedException JavaDoc e2) {
454                                     getLogger().debug("Got interruptedexception while sleeping before retrying JMS action.", e);
455                                     break stoppingLoop;
456                                 }
457                             }
458                         }
459                     } finally {
460                         suspendLock.readLock().release();
461                     }
462                 }
463             }
464             throw new RuntimeException JavaDoc("Failed to execute JMS action and now server is going down...");
465         }
466     }
467
468     interface JMSAction {
469         public void run() throws Exception JavaDoc;
470     }
471 }
472
Popular Tags