KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ra > ActiveMQEndpointWorker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.ra;
19
20 import java.lang.reflect.Method JavaDoc;
21
22 import javax.jms.Connection JavaDoc;
23 import javax.jms.ConnectionConsumer JavaDoc;
24 import javax.jms.ExceptionListener JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.Topic JavaDoc;
30 import javax.resource.ResourceException JavaDoc;
31 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
32 import javax.resource.spi.work.Work JavaDoc;
33 import javax.resource.spi.work.WorkException JavaDoc;
34 import javax.resource.spi.work.WorkManager JavaDoc;
35
36 import org.apache.activemq.ActiveMQConnection;
37 import org.apache.activemq.command.ActiveMQDestination;
38 import org.apache.activemq.command.ActiveMQQueue;
39 import org.apache.activemq.command.ActiveMQTopic;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43 /**
44  * @version $Revision$ $Date$
45  */

46 public class ActiveMQEndpointWorker {
47
48     private static final Log log = LogFactory.getLog(ActiveMQEndpointWorker.class);
49
50     /**
51      *
52      */

53     public static final Method JavaDoc ON_MESSAGE_METHOD;
54
55     private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
56
private static final long MAX_RECONNECT_DELAY = 1000*30; // 30 seconds.
57
private static final ThreadLocal JavaDoc threadLocal = new ThreadLocal JavaDoc();
58     
59     static {
60         try {
61             ON_MESSAGE_METHOD = MessageListener JavaDoc.class.getMethod("onMessage", new Class JavaDoc[]{Message JavaDoc.class});
62         }
63         catch (Exception JavaDoc e) {
64             throw new ExceptionInInitializerError JavaDoc(e);
65         }
66     }
67
68     protected MessageResourceAdapter adapter;
69     protected ActiveMQEndpointActivationKey endpointActivationKey;
70     protected MessageEndpointFactory JavaDoc endpointFactory;
71     protected WorkManager JavaDoc workManager;
72     protected boolean transacted;
73     
74     
75     private ConnectionConsumer JavaDoc consumer;
76     private ServerSessionPoolImpl serverSessionPool;
77     private ActiveMQDestination dest;
78     private boolean running;
79     private Work JavaDoc connectWork;
80     protected ActiveMQConnection connection;
81     
82     private long reconnectDelay=INITIAL_RECONNECT_DELAY;
83
84
85     /**
86      * @param s
87      */

88     public static void safeClose(Session JavaDoc s) {
89         try {
90             if (s != null) {
91                 s.close();
92             }
93         }
94         catch (JMSException JavaDoc e) {
95             //
96
}
97     }
98
99     /**
100      * @param c
101      */

102     public static void safeClose(Connection JavaDoc c) {
103         try {
104             if (c != null) {
105                 c.close();
106             }
107         }
108         catch (JMSException JavaDoc e) {
109             //
110
}
111     }
112
113     /**
114      * @param cc
115      */

116     public static void safeClose(ConnectionConsumer JavaDoc cc) {
117         try {
118             if (cc != null) {
119                 cc.close();
120             }
121         }
122         catch (JMSException JavaDoc e) {
123             //
124
}
125     }
126
127     /**
128      *
129      */

130     public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException JavaDoc {
131         this.endpointActivationKey = key;
132         this.adapter = adapter;
133         this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
134         this.workManager = adapter.getBootstrapContext().getWorkManager();
135         try {
136             this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
137         }
138         catch (NoSuchMethodException JavaDoc e) {
139             throw new ResourceException JavaDoc("Endpoint does not implement the onMessage method.");
140         }
141         
142         connectWork = new Work JavaDoc() {
143
144             public void release() {
145                 //
146
}
147
148             synchronized public void run() {
149                 if( !isRunning() )
150                     return;
151                 if( connection!=null )
152                     return;
153                 
154                 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
155                 try {
156                     connection = adapter.makeConnection(activationSpec);
157                     connection.start();
158                     connection.setExceptionListener(new ExceptionListener JavaDoc() {
159                         public void onException(JMSException JavaDoc error) {
160                             if (!serverSessionPool.isClosing()) {
161                                 reconnect(error);
162                             }
163                         }
164                     });
165
166                     if (activationSpec.isDurableSubscription()) {
167                         consumer = connection.createDurableConnectionConsumer(
168                                 (Topic JavaDoc) dest,
169                                 activationSpec.getSubscriptionName(),
170                                 emptyToNull(activationSpec.getMessageSelector()),
171                                 serverSessionPool,
172                                 activationSpec.getMaxMessagesPerSessionsIntValue(),
173                                 activationSpec.getNoLocalBooleanValue());
174                     } else {
175                         consumer = connection.createConnectionConsumer(
176                                 dest,
177                                 emptyToNull(activationSpec.getMessageSelector()),
178                                 serverSessionPool,
179                                 activationSpec.getMaxMessagesPerSessionsIntValue(),
180                                 activationSpec.getNoLocalBooleanValue());
181                     }
182
183                 } catch (JMSException JavaDoc error) {
184                     log.debug("Fail to to connect: "+error, error);
185                     reconnect(error);
186                 }
187             }
188         };
189
190         MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
191         if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
192             dest = new ActiveMQQueue(activationSpec.getDestination());
193         } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
194             dest = new ActiveMQTopic(activationSpec.getDestination());
195         } else {
196             throw new ResourceException JavaDoc("Unknown destination type: " + activationSpec.getDestinationType());
197         }
198
199     }
200
201     /**
202      *
203      */

204     synchronized public void start() throws WorkException JavaDoc, ResourceException JavaDoc {
205         if (running)
206             return;
207         running = true;
208
209         log.debug("Starting");
210         serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
211         connect();
212         log.debug("Started");
213     }
214
215     /**
216      *
217      */

218     synchronized public void stop() throws InterruptedException JavaDoc {
219         if (!running)
220             return;
221         running = false;
222         serverSessionPool.close();
223         disconnect();
224     }
225
226     private boolean isRunning() {
227         return running;
228     }
229
230     synchronized private void connect() {
231         if (!running)
232             return;
233
234         try {
235             workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
236         } catch (WorkException JavaDoc e) {
237             running = false;
238             log.error("Work Manager did not accept work: ",e);
239         }
240     }
241
242     /**
243      *
244      */

245     synchronized private void disconnect() {
246         safeClose(consumer);
247         consumer=null;
248         safeClose(connection);
249         connection=null;
250     }
251
252     private void reconnect(JMSException JavaDoc error){
253         log.debug("Reconnect cause: ",error);
254         long reconnectDelay;
255         synchronized(this) {
256             reconnectDelay = this.reconnectDelay;
257             // Only log errors if the server is really down.. And not a temp failure.
258
if (reconnectDelay == MAX_RECONNECT_DELAY) {
259                 log.error("Endpoint connection to JMS broker failed: " + error.getMessage());
260                 log.error("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds");
261             }
262         }
263         try {
264             disconnect();
265             Thread.sleep(reconnectDelay);
266             
267             synchronized(this) {
268                 // Use exponential rollback.
269
this.reconnectDelay*=2;
270                 if (this.reconnectDelay > MAX_RECONNECT_DELAY)
271                     this.reconnectDelay=MAX_RECONNECT_DELAY;
272             }
273             connect();
274         } catch(InterruptedException JavaDoc e) {
275             //
276
}
277     }
278
279     protected void registerThreadSession(Session JavaDoc session) {
280         threadLocal.set(session);
281     }
282
283     protected void unregisterThreadSession(Session JavaDoc session) {
284         threadLocal.set(null);
285     }
286
287     private String JavaDoc emptyToNull(String JavaDoc value) {
288         if (value == null || value.length() == 0) {
289             return null;
290         }
291         return value;
292     }
293
294 }
295
Popular Tags