KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ejb > plugins > inflow > MessageEndpointInterceptor


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.ejb.plugins.inflow;
23
24 import java.lang.reflect.Method JavaDoc;
25
26 import javax.resource.ResourceException JavaDoc;
27 import javax.transaction.Status JavaDoc;
28 import javax.transaction.Transaction JavaDoc;
29 import javax.transaction.TransactionManager JavaDoc;
30 import javax.transaction.xa.XAResource JavaDoc;
31
32 import org.jboss.ejb.MessageDrivenContainer;
33 import org.jboss.invocation.Invocation;
34 import org.jboss.logging.Logger;
35 import org.jboss.proxy.Interceptor;
36
37 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
38
39 /**
40  * Implements the application server message endpoint requirements.
41  *
42  * @author <a HREF="mailto:adrian@jboss.com">Adrian Brock</a>
43  * @version $Revision: 37459 $
44  */

45 public class MessageEndpointInterceptor extends Interceptor
46 {
47    // Constants -----------------------------------------------------
48

49    /** The log */
50    private static final Logger log = Logger.getLogger(MessageEndpointInterceptor.class);
51    
52    /** The key for the factory */
53    public static final String JavaDoc MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory";
54
55    /** The key for the xa resource */
56    public static final String JavaDoc MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
57    
58    // Attributes ----------------------------------------------------
59

60    /** Whether trace is enabled */
61    private boolean trace = log.isTraceEnabled();
62    
63    /** Cached version of our proxy string */
64    private String JavaDoc cachedProxyString = null;
65    
66    /** Whether this proxy has been released */
67    protected SynchronizedBoolean released = new SynchronizedBoolean(false);
68    
69    /** Whether we have delivered a message */
70    protected boolean delivered = false;
71    
72    /** The in use thread */
73    protected Thread JavaDoc inUseThread = null;
74    
75    /** The old classloader of the thread */
76    protected ClassLoader JavaDoc oldClassLoader = null;
77    
78    /** Any transaction we started */
79    protected Transaction JavaDoc transaction = null;
80    
81    /** Any suspended transaction */
82    protected Transaction JavaDoc suspended = null;
83
84    /** The message endpoint factory */
85    private JBossMessageEndpointFactory endpointFactory;
86    
87    // Static --------------------------------------------------------
88

89    // Constructors --------------------------------------------------
90

91    public MessageEndpointInterceptor()
92    {
93    }
94    
95    // Public --------------------------------------------------------
96

97    // Interceptor implementation ------------------------------------
98

99    public Object JavaDoc invoke(Invocation mi) throws Throwable JavaDoc
100    {
101       // Are we still useable?
102
if (released.get())
103          throw new IllegalStateException JavaDoc("This message endpoint + " + getProxyString(mi) + " has been released");
104
105       // Concurrent invocation?
106
Thread JavaDoc currentThread = Thread.currentThread();
107       if (inUseThread != null && inUseThread.equals(currentThread) == false)
108          throw new IllegalStateException JavaDoc("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
109       inUseThread = currentThread;
110       
111       String JavaDoc method = mi.getMethod().getName();
112       if (trace)
113          log.trace("MessageEndpoint " + getProxyString(mi) + " in use by " + method + " " + inUseThread);
114       
115       // Which operation?
116
if (method.equals("release"))
117       {
118          release(mi);
119          return null;
120       }
121       else if (method.equals("beforeDelivery"))
122       {
123          before(mi);
124          return null;
125       }
126       else if (method.equals("afterDelivery"))
127       {
128          after(mi);
129          return null;
130       }
131       else
132          return delivery(mi);
133    }
134    
135    // Package Protected ---------------------------------------------
136

137    // Protected -----------------------------------------------------
138

139    /**
140     * Release this message endpoint.
141     *
142     * @param mi the invocation
143     * @throws Throwable for any error
144     */

145    protected void release(Invocation mi) throws Throwable JavaDoc
146    {
147       // We are now released
148
released.set(true);
149
150       if (trace)
151          log.trace("MessageEndpoint " + getProxyString(mi) + " released");
152       
153       // Tidyup any outstanding delivery
154
if (oldClassLoader != null)
155       {
156          try
157          {
158             finish("release", mi, false);
159          }
160          catch (Throwable JavaDoc t)
161          {
162             log.warn("Error in release ", t);
163          }
164       }
165    }
166    
167    /**
168     * Before delivery processing.
169     *
170     * @param mi the invocation
171     * @throws Throwable for any error
172     */

173    protected void before(Invocation mi) throws Throwable JavaDoc
174    {
175       // Called out of sequence
176
if (oldClassLoader != null)
177          throw new IllegalStateException JavaDoc("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(mi));
178
179       if (trace)
180          log.trace("MessageEndpoint " + getProxyString(mi) + " released");
181
182       // Set the classloader
183
MessageDrivenContainer container = getContainer(mi);
184       oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
185       SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
186       if (trace)
187          log.trace("MessageEndpoint " + getProxyString(mi) + " set context classloader to " + container.getClassLoader());
188
189       // start any transaction
190
try
191       {
192          startTransaction("beforeDelivery", mi, container);
193       }
194       catch (Throwable JavaDoc t)
195       {
196          resetContextClassLoader(mi);
197          throw new ResourceException JavaDoc(t);
198       }
199    }
200    
201    /**
202     * After delivery processing.
203     *
204     * @param mi the invocation
205     * @throws Throwable for any error
206     */

207    protected void after(Invocation mi) throws Throwable JavaDoc
208    {
209       // Called out of sequence
210
if (oldClassLoader == null)
211          throw new IllegalStateException JavaDoc("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(mi));
212
213       // Finish this delivery committing if we can
214
try
215       {
216          finish("afterDelivery", mi, true);
217       }
218       catch (Throwable JavaDoc t)
219       {
220          throw new ResourceException JavaDoc(t);
221       }
222    }
223    
224    /**
225     * Delivery.
226     *
227     * @param mi the invocation
228     * @return the result of the delivery
229     * @throws Throwable for any error
230     */

231    protected Object JavaDoc delivery(Invocation mi) throws Throwable JavaDoc
232    {
233       // Have we already delivered a message?
234
if (delivered)
235          throw new IllegalStateException JavaDoc("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(mi));
236
237       if (trace)
238          log.trace("MessageEndpoint " + getProxyString(mi) + " delivering");
239       
240       // Mark delivery if beforeDelivery was invoked
241
if (oldClassLoader != null)
242          delivered = true;
243
244      
245       MessageDrivenContainer container = getContainer(mi);
246       boolean commit = true;
247       try
248       {
249          // Check for starting a transaction
250
if (oldClassLoader == null)
251             startTransaction("delivery", mi, container);
252          return getNext().invoke(mi);
253       }
254       catch (Throwable JavaDoc t)
255       {
256          if (trace)
257             log.trace("MessageEndpoint " + getProxyString(mi) + " delivery error", t);
258          if (t instanceof Error JavaDoc || t instanceof RuntimeException JavaDoc)
259          {
260             if (transaction != null)
261                transaction.setRollbackOnly();
262             commit = false;
263          }
264          throw t;
265       }
266       finally
267       {
268          // No before/after delivery, end any transaction and release the lock
269
if (oldClassLoader == null)
270          {
271             try
272             {
273                // Finish any transaction we started
274
endTransaction(mi, commit);
275             }
276             finally
277             {
278                releaseThreadLock(mi);
279             }
280          }
281       }
282    }
283    
284    /**
285     * Finish the current delivery
286     *
287     * @param context the lifecycle method
288     * @param mi the invocation
289     * @param commit whether to commit
290     * @throws Throwable for any error
291     */

292    protected void finish(String JavaDoc context, Invocation mi, boolean commit) throws Throwable JavaDoc
293    {
294       try
295       {
296          endTransaction(mi, commit);
297       }
298       finally
299       {
300          // Reset delivered flag
301
delivered = false;
302          // Change back to the original context classloader
303
resetContextClassLoader(mi);
304          // We no longer hold the lock
305
releaseThreadLock(mi);
306       }
307    }
308
309    /**
310     * Start a transaction
311     *
312     * @param context the lifecycle method
313     * @param mi the invocation
314     * @param container the container
315     * @throws Throwable for any error
316     */

317    protected void startTransaction(String JavaDoc context, Invocation mi, MessageDrivenContainer container) throws Throwable JavaDoc
318    {
319       // Get any passed resource
320
XAResource JavaDoc resource = (XAResource JavaDoc) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_XARESOURCE);
321
322       Method JavaDoc method = null;
323
324       // Normal delivery
325
if ("delivery".equals(context))
326          method = mi.getMethod();
327       // Before delivery
328
else
329          method = (Method JavaDoc) mi.getArguments()[0];
330
331       // Is the delivery transacted?
332
boolean isTransacted = getMessageEndpointFactory(mi).isDeliveryTransacted(method);
333
334       if (trace)
335          log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted);
336
337       // Get the transaction status
338
TransactionManager JavaDoc tm = container.getTransactionManager();
339       suspended = tm.suspend();
340
341       if (trace)
342          log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " currentTx=" + suspended);
343
344       // Delivery is transacted
345
if (isTransacted)
346       {
347          // No transaction means we start a new transaction and enlist the resource
348
if (suspended == null)
349          {
350             tm.begin();
351             transaction = tm.getTransaction();
352             if (trace)
353                log.trace("MessageEndpoint " + getProxyString(mi) + " started transaction=" + transaction);
354       
355             // Enlist the XAResource in the transaction
356
if (resource != null)
357             {
358                transaction.enlistResource(resource);
359                if (trace)
360                   log.trace("MessageEndpoint " + getProxyString(mi) + " enlisted=" + resource);
361             }
362          }
363          else
364          {
365             // If there is already a transaction we ignore the XAResource (by spec 12.5.9)
366
try
367             {
368                tm.resume(suspended);
369             }
370             finally
371             {
372                suspended = null;
373                if (trace)
374                   log.trace("MessageEndpoint " + getProxyString(mi) + " transaction=" + suspended + " already active, IGNORED=" + resource);
375             }
376          }
377       }
378    }
379    
380    /**
381     * End the transaction
382     *
383     * @param mi the invocation
384     * @param commit whether to try to commit
385     * @throws Throwable for any error
386     */

387    protected void endTransaction(Invocation mi, boolean commit) throws Throwable JavaDoc
388    {
389       TransactionManager JavaDoc tm = null;
390       Transaction JavaDoc currentTx = null;
391       try
392       {
393          // If we started the transaction, commit it
394
if (transaction != null)
395          {
396             tm = getContainer(mi).getTransactionManager();
397             currentTx = tm.getTransaction();
398             
399             // Suspend any bad transaction - there is bug somewhere, but we will try to tidy things up
400
if (currentTx != null && currentTx.equals(transaction) == false)
401             {
402                log.warn("Current transaction " + currentTx + " is not the expected transaction.");
403                tm.suspend();
404                tm.resume(transaction);
405             }
406             else
407             {
408                // We have the correct transaction
409
currentTx = null;
410             }
411             
412             // Commit or rollback depending on the status
413
if (commit == false || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK)
414             {
415                if (trace)
416                   log.trace("MessageEndpoint " + getProxyString(mi) + " rollback");
417                tm.rollback();
418             }
419             else
420             {
421                if (trace)
422                   log.trace("MessageEndpoint " + getProxyString(mi) + " commit");
423                tm.commit();
424             }
425          }
426
427          // If we suspended the incoming transaction, resume it
428
if (suspended != null)
429          {
430             try
431             {
432                tm = getContainer(mi).getTransactionManager();
433                tm.resume(suspended);
434             }
435             finally
436             {
437                suspended = null;
438             }
439          }
440       }
441       finally
442       {
443          // Resume any suspended transaction
444
if (currentTx != null)
445          {
446             try
447             {
448                tm.resume(currentTx);
449             }
450             catch (Throwable JavaDoc t)
451             {
452                log.warn("MessageEndpoint " + getProxyString(mi) + " failed to resume old transaction " + currentTx);
453                
454             }
455          }
456       }
457    }
458    
459    /**
460     * Reset the context classloader
461     *
462     * @param mi the invocation
463     */

464    protected void resetContextClassLoader(Invocation mi)
465    {
466       if (trace)
467          log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
468       SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
469       oldClassLoader = null;
470    }
471
472    /**
473     * Release the thread lock
474     *
475     * @param mi the invocation
476     */

477    protected void releaseThreadLock(Invocation mi)
478    {
479       if (trace)
480          log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
481       inUseThread = null;
482    }
483    
484    /**
485     * Get our proxy's string value.
486     *
487     * @param mi the invocation
488     * @return the string
489     */

490    protected String JavaDoc getProxyString(Invocation mi)
491    {
492       if (cachedProxyString == null)
493          cachedProxyString = mi.getInvocationContext().getCacheId().toString();
494       return cachedProxyString;
495    }
496
497    /**
498     * Get the message endpoint factory
499     *
500     * @return the message endpoint factory
501     */

502    protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation mi)
503    {
504       if (endpointFactory == null)
505          endpointFactory = (JBossMessageEndpointFactory) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_FACTORY);
506       return endpointFactory;
507    }
508    
509    /**
510     * Get the container
511     *
512     * @return the container
513     */

514    protected MessageDrivenContainer getContainer(Invocation mi)
515    {
516       return getMessageEndpointFactory(mi).getContainer();
517    }
518
519    // Private -------------------------------------------------------
520

521    // Inner classes -------------------------------------------------
522
}
523
Popular Tags