KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > common > AsyncBaseLifeCycle


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.common;
18
19 import java.lang.reflect.Method JavaDoc;
20 import java.util.Map JavaDoc;
21
22 import javax.jbi.JBIException;
23 import javax.jbi.component.ComponentContext;
24 import javax.jbi.component.ComponentLifeCycle;
25 import javax.jbi.messaging.DeliveryChannel;
26 import javax.jbi.messaging.ExchangeStatus;
27 import javax.jbi.messaging.MessageExchange;
28 import javax.jbi.messaging.MessagingException;
29 import javax.jbi.messaging.MessageExchange.Role;
30 import javax.jbi.servicedesc.ServiceEndpoint;
31 import javax.management.MBeanServer JavaDoc;
32 import javax.management.ObjectName JavaDoc;
33 import javax.resource.spi.work.Work JavaDoc;
34 import javax.resource.spi.work.WorkManager JavaDoc;
35 import javax.transaction.Status JavaDoc;
36 import javax.transaction.Transaction JavaDoc;
37 import javax.transaction.TransactionManager JavaDoc;
38 import javax.xml.namespace.QName JavaDoc;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.servicemix.JbiConstants;
42
43 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
44 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
45
46 /**
47  * Base class for life cycle management of components.
48  * This class may be used as is.
49  *
50  * @author Guillaume Nodet
51  * @version $Revision: 399873 $
52  * @since 3.0
53  */

54 public class AsyncBaseLifeCycle implements ComponentLifeCycle {
55
56     protected final transient Log logger;
57     
58     protected BaseComponent component;
59     protected ComponentContext context;
60     protected ObjectName JavaDoc mbeanName;
61     protected WorkManager JavaDoc workManager;
62     protected AtomicBoolean running;
63     protected DeliveryChannel channel;
64     protected Thread JavaDoc poller;
65     protected AtomicBoolean polling;
66     protected TransactionManager JavaDoc transactionManager;
67     protected boolean workManagerCreated;
68     protected Map JavaDoc processors = new ConcurrentHashMap();
69     
70     
71     public AsyncBaseLifeCycle(BaseComponent component) {
72         this.component = component;
73         this.logger = component.logger;
74         this.running = new AtomicBoolean(false);
75         this.polling = new AtomicBoolean(false);
76         this.processors = new ConcurrentHashMap();
77     }
78     
79     /* (non-Javadoc)
80      * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
81      */

82     public ObjectName JavaDoc getExtensionMBeanName() {
83         return mbeanName;
84     }
85
86     protected Object JavaDoc getExtensionMBean() throws Exception JavaDoc {
87         return null;
88     }
89     
90     protected ObjectName JavaDoc createExtensionMBeanName() throws Exception JavaDoc {
91         return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
92     }
93     
94     protected QName JavaDoc getEPRServiceName() {
95         return null;
96     }
97
98     /* (non-Javadoc)
99      * @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
100      */

101     public void init(ComponentContext context) throws JBIException {
102         try {
103             if (logger.isDebugEnabled()) {
104                 logger.debug("Initializing component");
105             }
106             this.context = context;
107             this.channel = context.getDeliveryChannel();
108             try {
109                 this.transactionManager = (TransactionManager JavaDoc) context.getTransactionManager();
110             } catch (Throwable JavaDoc e) {
111               // Ignore, this is just a safeguard against non compliant
112
// JBI implementation which throws an exception instead of
113
// return null
114
}
115             doInit();
116             if (logger.isDebugEnabled()) {
117                 logger.debug("Component initialized");
118             }
119         } catch (JBIException e) {
120             throw e;
121         } catch (Exception JavaDoc e) {
122             throw new JBIException("Error calling init", e);
123         }
124     }
125
126     protected void doInit() throws Exception JavaDoc {
127         // Register extension mbean
128
Object JavaDoc mbean = getExtensionMBean();
129         if (mbean != null) {
130             MBeanServer JavaDoc server = this.context.getMBeanServer();
131             if (server == null) {
132                 // TODO: log a warning ?
133
//throw new JBIException("null mBeanServer");
134
} else {
135                 this.mbeanName = createExtensionMBeanName();
136                 if (server.isRegistered(this.mbeanName)) {
137                     server.unregisterMBean(this.mbeanName);
138                 }
139                 server.registerMBean(mbean, this.mbeanName);
140             }
141         }
142         // Obtain or create the work manager
143
// When using the WorkManager from ServiceMix,
144
// some class loader problems can appear when
145
// trying to uninstall the components.
146
// Some threads owned by the work manager have a
147
// security context referencing the component class loader
148
// so that every loaded classes are locked
149
//this.workManager = findWorkManager();
150
if (this.workManager == null) {
151             this.workManagerCreated = true;
152             this.workManager = createWorkManager();
153         }
154     }
155
156     /* (non-Javadoc)
157      * @see javax.jbi.component.ComponentLifeCycle#shutDown()
158      */

159     public void shutDown() throws JBIException {
160         try {
161             if (logger.isDebugEnabled()) {
162                 logger.debug("Shutting down component");
163             }
164             doShutDown();
165             this.context = null;
166             if (logger.isDebugEnabled()) {
167                 logger.debug("Component shut down");
168             }
169         } catch (JBIException e) {
170             throw e;
171         } catch (Exception JavaDoc e) {
172             throw new JBIException("Error calling shutdown", e);
173         }
174     }
175
176     protected void doShutDown() throws Exception JavaDoc {
177         // Unregister mbean
178
if (this.mbeanName != null) {
179             MBeanServer JavaDoc server = this.context.getMBeanServer();
180             if (server == null) {
181                 throw new JBIException("null mBeanServer");
182             }
183             if (server.isRegistered(this.mbeanName)) {
184                 server.unregisterMBean(this.mbeanName);
185             }
186         }
187         // Destroy work manager, if created
188
if (this.workManagerCreated) {
189             if (this.workManager instanceof BasicWorkManager) {
190                 ((BasicWorkManager) this.workManager).shutDown();
191             }
192             this.workManager = null;
193         }
194     }
195
196     /* (non-Javadoc)
197      * @see javax.jbi.component.ComponentLifeCycle#start()
198      */

199     public void start() throws JBIException {
200         try {
201             if (logger.isDebugEnabled()) {
202                 logger.debug("Starting component");
203             }
204             if (this.running.compareAndSet(false, true)) {
205                 doStart();
206             }
207             if (logger.isDebugEnabled()) {
208                 logger.debug("Component started");
209             }
210         } catch (JBIException e) {
211             throw e;
212         } catch (Exception JavaDoc e) {
213             throw new JBIException("Error calling start", e);
214         }
215     }
216
217     protected void doStart() throws Exception JavaDoc {
218         synchronized (this.polling) {
219             workManager.startWork(new Work JavaDoc() {
220                 public void release() { }
221                 public void run() {
222                     poller = Thread.currentThread();
223                     pollDeliveryChannel();
224                 }
225             });
226             polling.wait();
227         }
228     }
229     
230     protected void pollDeliveryChannel() {
231         synchronized (polling) {
232             polling.set(true);
233             polling.notify();
234         }
235         while (running.get()) {
236             try {
237                 final MessageExchange exchange = channel.accept(1000L);
238                 if (exchange != null) {
239                     final Transaction JavaDoc tx = (Transaction JavaDoc) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
240                     if (tx != null) {
241                         if (transactionManager == null) {
242                             throw new IllegalStateException JavaDoc("Exchange is enlisted in a transaction, but no transaction manager is available");
243                         }
244                         transactionManager.suspend();
245                     }
246                     workManager.scheduleWork(new Work JavaDoc() {
247                         public void release() {
248                         }
249                         public void run() {
250                             processExchangeInTx(exchange, tx);
251                         }
252                     });
253                 }
254             } catch (Throwable JavaDoc t) {
255                 if (running.get() == false) {
256                     // Should have been interrupted, discard the throwable
257
if (logger.isDebugEnabled()) {
258                         logger.debug("Polling thread will stop");
259                     }
260                 } else {
261                     logger.error("Error polling delivery channel", t);
262                 }
263             }
264         }
265         synchronized (polling) {
266             polling.set(false);
267             polling.notify();
268         }
269     }
270
271     /* (non-Javadoc)
272      * @see javax.jbi.component.ComponentLifeCycle#stop()
273      */

274     public void stop() throws JBIException {
275         try {
276             if (logger.isDebugEnabled()) {
277                 logger.debug("Stopping component");
278             }
279             if (this.running.compareAndSet(true, false)) {
280                 doStop();
281             }
282             if (logger.isDebugEnabled()) {
283                 logger.debug("Component stopped");
284             }
285         } catch (JBIException e) {
286             throw e;
287         } catch (Exception JavaDoc e) {
288             throw new JBIException("Error calling stop", e);
289         }
290     }
291
292     protected void doStop() throws Exception JavaDoc {
293         // Interrupt the polling thread and await termination
294
try {
295             synchronized (polling) {
296                 if (polling.get()) {
297                     poller.interrupt();
298                     polling.wait();
299                 }
300             }
301         } finally {
302             poller = null;
303         }
304     }
305
306     /**
307      * @return Returns the context.
308      */

309     public ComponentContext getContext() {
310         return context;
311     }
312
313     public WorkManager JavaDoc getWorkManager() {
314         return workManager;
315     }
316
317     protected WorkManager JavaDoc createWorkManager() {
318         // Create a very simple one
319
return new BasicWorkManager();
320     }
321
322     protected WorkManager JavaDoc findWorkManager() {
323         // If inside ServiceMix, retrieve its work manager
324
try {
325             Method JavaDoc getContainerMth = context.getClass().getMethod("getContainer", new Class JavaDoc[0]);
326             Object JavaDoc container = getContainerMth.invoke(context, new Object JavaDoc[0]);
327             Method JavaDoc getWorkManagerMth = container.getClass().getMethod("getWorkManager", new Class JavaDoc[0]);
328             return (WorkManager JavaDoc) getWorkManagerMth.invoke(container, new Object JavaDoc[0]);
329         } catch (Throwable JavaDoc t) {
330             if (logger.isDebugEnabled()) {
331                 logger.debug("JBI container is not ServiceMix. Will create our own WorkManager", t);
332             }
333         }
334         // TODO: should look in jndi for an existing work manager
335
return null;
336     }
337     
338     protected void processExchangeInTx(MessageExchange exchange, Transaction JavaDoc tx) {
339         try {
340             if (tx != null) {
341                 transactionManager.resume(tx);
342             }
343             processExchange(exchange);
344         } catch (Exception JavaDoc e) {
345             logger.error("Error processing exchange " + exchange, e);
346             try {
347                 // If we are transacted, check if this exception should
348
// rollback the transaction
349
if (transactionManager != null &&
350                     transactionManager.getStatus() == Status.STATUS_ACTIVE &&
351                     exceptionShouldRollbackTx(e)) {
352                     transactionManager.setRollbackOnly();
353                 }
354                 exchange.setError(e);
355                 channel.send(exchange);
356             } catch (Exception JavaDoc inner) {
357                 logger.error("Error setting exchange status to ERROR", inner);
358             }
359         } finally {
360             try {
361                 // Check transaction status
362
if (tx != null) {
363                     int status = transactionManager.getStatus();
364                     // We use pull delivery, so the transaction should already
365
// have been transfered to another thread because the component
366
// must have answered.
367
if (status != Status.STATUS_NO_TRANSACTION) {
368                         logger.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
369                         try {
370                             transactionManager.rollback();
371                         } catch (Throwable JavaDoc t) {
372                             logger.error("Error trying to rollback transaction.", t);
373                         }
374                     }
375                 }
376             } catch (Throwable JavaDoc t) {
377                 logger.error("Error checking transaction status.", t);
378             }
379         }
380     }
381     
382     protected boolean exceptionShouldRollbackTx(Exception JavaDoc e) {
383         return false;
384     }
385     
386     public void processExchange(MessageExchange exchange) throws Exception JavaDoc {
387         if (logger.isDebugEnabled()) {
388             logger.debug("Received exchange: status: " + exchange.getStatus() + ", role: " +
389                                 (exchange.getRole() == Role.CONSUMER ? "consumer" : "provider"));
390         }
391         if (exchange.getRole() == Role.PROVIDER) {
392             boolean dynamic = false;
393             ServiceEndpoint endpoint = exchange.getEndpoint();
394             String JavaDoc key = EndpointSupport.getKey(exchange.getEndpoint());
395             Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
396             if (ep == null) {
397                 if (endpoint.getServiceName().equals(getEPRServiceName())) {
398                     ep = getResolvedEPR(exchange.getEndpoint());
399                     dynamic = true;
400                 }
401                 if (ep == null) {
402                     throw new IllegalStateException JavaDoc("Endpoint not found: " + key);
403                 }
404             }
405             ExchangeProcessor processor = ep.getProcessor();
406             if (processor == null) {
407                 throw new IllegalStateException JavaDoc("No processor found for endpoint: " + key);
408             }
409             try {
410                 processor.process(exchange);
411             } finally {
412                 // If the endpoint is dynamic, deactivate it
413
if (dynamic) {
414                     ep.deactivate();
415                 }
416             }
417         } else {
418             ExchangeProcessor processor = null;
419             if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
420                 String JavaDoc key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
421                 Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
422                 if (ep != null) {
423                     processor = ep.getProcessor();
424                 }
425             } else {
426                 processor = (ExchangeProcessor) processors.remove(exchange.getExchangeId());
427             }
428             if (processor == null) {
429                 throw new IllegalStateException JavaDoc("No processor found for: " + exchange.getExchangeId());
430             }
431             processor.process(exchange);
432         }
433     }
434
435     /**
436      *
437      * @param exchange
438      * @param processor
439      * @throws MessagingException
440      * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
441      */

442     public void sendConsumerExchange(MessageExchange exchange, ExchangeProcessor processor) throws MessagingException {
443         // If the exchange is not ACTIVE, no answer is expected
444
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
445             processors.put(exchange.getExchangeId(), processor);
446         }
447         channel.send(exchange);
448     }
449     
450     /**
451      * This method allows the component to keep no state in memory so that
452      * components can be clustered and provide fail-over and load-balancing.
453      * @param exchange
454      * @param endpoint
455      * @throws MessagingException
456      */

457     public void sendConsumerExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException {
458         String JavaDoc key = EndpointSupport.getKey(endpoint);
459         exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
460         channel.send(exchange);
461     }
462     
463     /**
464      * Handle an exchange sent to an EPR resolved by this component
465      * @param exchange
466      * @return an endpoint to use for handling the exchange
467      * @throws Exception
468      */

469     protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception JavaDoc {
470         throw new UnsupportedOperationException JavaDoc("Component does not handle EPR exchanges");
471     }
472
473 }
474
Popular Tags