KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ejb3 > mdb > inflow > MessageInflowLocalProxy


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.ejb3.mdb.inflow;
23
24 import java.lang.reflect.Method JavaDoc;
25 import java.lang.reflect.InvocationHandler JavaDoc;
26
27 import javax.resource.ResourceException JavaDoc;
28 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
29 import javax.transaction.Status JavaDoc;
30 import javax.transaction.Transaction JavaDoc;
31 import javax.transaction.TransactionManager JavaDoc;
32 import javax.transaction.xa.XAResource JavaDoc;
33
34 import org.jboss.aop.joinpoint.Invocation;
35 import org.jboss.aop.joinpoint.MethodInvocation;
36 import org.jboss.aop.MethodInfo;
37 import org.jboss.ejb3.mdb.MessagingContainer;
38 import org.jboss.ejb3.mdb.MDB;
39 import org.jboss.ejb3.tx.TxUtil;
40 import org.jboss.logging.Logger;
41
42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
43
44 /**
45  * @version <tt>$Revision: 46009 $</tt>
46  * @author <a HREF="mailto:bdecoste@jboss.com">William DeCoste</a>
47  */

48 public class MessageInflowLocalProxy implements InvocationHandler JavaDoc
49 {
50    private static final Logger log = Logger.getLogger(MessageInflowLocalProxy.class);
51    
52    /** The key for the factory */
53    public static final String JavaDoc MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory";
54
55    /** The key for the xa resource */
56    public static final String JavaDoc MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
57    
58    /** Whether trace is enabled */
59    private boolean trace = log.isTraceEnabled();
60    
61    /** Cached version of our proxy string */
62    private String JavaDoc cachedProxyString = null;
63    
64    /** Whether this proxy has been released */
65    protected SynchronizedBoolean released = new SynchronizedBoolean(false);
66    
67    /** Whether we have delivered a message */
68    protected boolean delivered = false;
69    
70    /** The in use thread */
71    protected Thread JavaDoc inUseThread = null;
72    
73    /** The old classloader of the thread */
74    protected ClassLoader JavaDoc oldClassLoader = null;
75    
76    /** Any transaction we started */
77    protected Transaction JavaDoc transaction = null;
78    
79    /** Any suspended transaction */
80    protected Transaction JavaDoc suspended = null;
81
82    /** The message endpoint factory */
83    private JBossMessageEndpointFactory endpointFactory;
84    
85    private XAResource JavaDoc resource;
86    private MessageEndpointFactory JavaDoc messageEndpointFactory;
87    
88    MessagingContainer container;
89
90    protected MessageInflowLocalProxy(MessagingContainer container)
91    {
92       this.container = container;
93    }
94
95    public void setMessageEndpointFactory(MessageEndpointFactory JavaDoc messageEndpointFactory)
96    {
97       this.messageEndpointFactory = messageEndpointFactory;
98    }
99    
100    public void setXaResource(XAResource JavaDoc resource)
101    {
102       this.resource = resource;
103    }
104
105    public Object JavaDoc invoke(Object JavaDoc proxy, Method JavaDoc method, Object JavaDoc[] args)
106            throws Throwable JavaDoc
107    {
108       // Are we still useable?
109
if (released.get())
110          throw new IllegalStateException JavaDoc("This message endpoint + " + getProxyString(proxy) + " has been released");
111
112       // Concurrent invocation?
113
Thread JavaDoc currentThread = Thread.currentThread();
114       if (inUseThread != null && inUseThread.equals(currentThread) == false)
115          throw new IllegalStateException JavaDoc("This message endpoint + " + getProxyString(proxy) + " is already in use by another thread " + inUseThread);
116       inUseThread = currentThread;
117       
118       if (trace)
119          log.trace("MessageEndpoint " + getProxyString(proxy) + " in use by " + method + " " + inUseThread);
120       
121       // Which operation?
122
if (method.getName().equals("release"))
123       {
124          release(proxy);
125          return null;
126       }
127       else if (method.getName().equals("beforeDelivery"))
128       {
129          before(proxy, container, method, args);
130          return null;
131       }
132       else if (method.getName().equals("afterDelivery"))
133       {
134          after(proxy);
135          return null;
136       }
137       else
138          return delivery(proxy, container, method, args);
139    }
140
141    public String JavaDoc toString()
142    {
143       return container.getEjbName().toString();
144    }
145    
146    // -----------------------------------------------------------
147

148    /**
149     * Release this message endpoint.
150     *
151     * @param mi the invocation
152     * @throws Throwable for any error
153     */

154    protected void release(Object JavaDoc proxy) throws Throwable JavaDoc
155    {
156       // We are now released
157
released.set(true);
158
159       if (trace)
160          log.trace("MessageEndpoint " + getProxyString(proxy) + " released");
161       
162       // Tidyup any outstanding delivery
163
if (oldClassLoader != null)
164       {
165          try
166          {
167             finish("release", proxy, false);
168          }
169          catch (Throwable JavaDoc t)
170          {
171             log.warn("Error in release ", t);
172          }
173       }
174    }
175    
176    /**
177     * Before delivery processing.
178     *
179     * @param mi the invocation
180     * @throws Throwable for any error
181     */

182    protected void before(Object JavaDoc proxy, MessagingContainer container, Method JavaDoc method, Object JavaDoc[] args) throws Throwable JavaDoc
183    {
184       // Called out of sequence
185
if (oldClassLoader != null)
186          throw new IllegalStateException JavaDoc("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(proxy));
187
188       if (trace)
189          log.trace("MessageEndpoint " + getProxyString(proxy) + " released");
190
191       // Set the classloader
192
oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
193       SetTCLAction.setContextClassLoader(inUseThread, container.getClassloader());
194       if (trace)
195          log.trace("MessageEndpoint " + getProxyString(proxy) + " set context classloader to " + container.getClassloader());
196
197       // start any transaction
198
try
199       {
200          // Is the delivery transacted?
201
MethodInfo methodInfo = container.getMethodInfo((Method JavaDoc)args[0]);
202          boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod());
203
204          startTransaction("beforeDelivery", proxy, container, method, args, isTransacted);
205       }
206       catch (Throwable JavaDoc t)
207       {
208          resetContextClassLoader(proxy);
209          throw new ResourceException JavaDoc(t);
210       }
211    }
212    
213    /**
214     * After delivery processing.
215     *
216     * @param mi the invocation
217     * @throws Throwable for any error
218     */

219    protected void after(Object JavaDoc proxy) throws Throwable JavaDoc
220    {
221       // Called out of sequence
222
if (oldClassLoader == null)
223          throw new IllegalStateException JavaDoc("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(proxy));
224
225       // Finish this delivery committing if we can
226
try
227       {
228          finish("afterDelivery", proxy, true);
229       }
230       catch (Throwable JavaDoc t)
231       {
232          throw new ResourceException JavaDoc(t);
233       }
234    }
235    
236    /**
237     * Delivery.
238     *
239     * @param mi the invocation
240     * @return the result of the delivery
241     * @throws Throwable for any error
242     */

243    protected Object JavaDoc delivery(Object JavaDoc proxy, MessagingContainer container, Method JavaDoc method, Object JavaDoc[] args) throws Throwable JavaDoc
244    {
245       // Have we already delivered a message?
246
if (delivered)
247          throw new IllegalStateException JavaDoc("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(proxy));
248
249       if (trace)
250          log.trace("MessageEndpoint " + getProxyString(proxy) + " delivering");
251       
252       // Mark delivery if beforeDelivery was invoked
253
if (oldClassLoader != null)
254          delivered = true;
255
256       boolean commit = true;
257       // Is the delivery transacted?
258
MethodInfo methodInfo = container.getMethodInfo(method);
259
260       try
261       {
262          // Check for starting a transaction
263
if (oldClassLoader == null)
264          {
265             boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod());
266             startTransaction("delivery", proxy, container, method, args, isTransacted);
267          }
268          return container.localInvoke(methodInfo, args);
269       }
270       catch (Throwable JavaDoc t)
271       {
272          if (trace)
273             log.trace("MessageEndpoint " + getProxyString(proxy) + " delivery error", t);
274          if (t instanceof Error JavaDoc || t instanceof RuntimeException JavaDoc)
275          {
276             if (transaction != null)
277                transaction.setRollbackOnly();
278             commit = false;
279          }
280          throw t;
281       }
282       finally
283       {
284          // No before/after delivery, end any transaction and release the lock
285
if (oldClassLoader == null)
286          {
287             try
288             {
289                // Finish any transaction we started
290
endTransaction(proxy, commit);
291             }
292             finally
293             {
294                releaseThreadLock(proxy);
295             }
296          }
297       }
298    }
299    
300    /**
301     * Finish the current delivery
302     *
303     * @param context the lifecycle method
304     * @param mi the invocation
305     * @param commit whether to commit
306     * @throws Throwable for any error
307     */

308    protected void finish(String JavaDoc context, Object JavaDoc proxy, boolean commit) throws Throwable JavaDoc
309    {
310       try
311       {
312          endTransaction(proxy, commit);
313       }
314       finally
315       {
316          // Reset delivered flag
317
delivered = false;
318          // Change back to the original context classloader
319
resetContextClassLoader(proxy);
320          // We no longer hold the lock
321
releaseThreadLock(proxy);
322       }
323    }
324
325    /**
326     * Start a transaction
327     *
328     * @param context the lifecycle method
329     * @param mi the invocation
330     * @param container the container
331     * @throws Throwable for any error
332     */

333    protected void startTransaction(String JavaDoc context, Object JavaDoc proxy, MessagingContainer container, Method JavaDoc m, Object JavaDoc[] args, boolean isTransacted) throws Throwable JavaDoc
334    {
335       Method JavaDoc method;
336       
337       // Normal delivery
338
if ("delivery".equals(context))
339          method = m;
340       // Before delivery
341
else
342          method = (Method JavaDoc)args[0];
343
344       if (trace)
345          log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted);
346
347       // Get the transaction status
348
TransactionManager JavaDoc tm = TxUtil.getTransactionManager(); //container.getTransactionManager();
349
suspended = tm.suspend();
350
351       if (trace)
352          log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " currentTx=" + suspended);
353
354       // Delivery is transacted
355
if (isTransacted)
356       {
357          // No transaction means we start a new transaction and enlist the resource
358
if (suspended == null)
359          {
360             tm.begin();
361             transaction = tm.getTransaction();
362             if (trace)
363                log.trace("MessageEndpoint " + getProxyString(proxy) + " started transaction=" + transaction);
364       
365             // Enlist the XAResource in the transaction
366
if (resource != null)
367             {
368                transaction.enlistResource(resource);
369                if (trace)
370                   log.trace("MessageEndpoint " + getProxyString(proxy) + " enlisted=" + resource);
371             }
372          }
373          else
374          {
375             // If there is already a transaction we ignore the XAResource (by spec 12.5.9)
376
try
377             {
378                tm.resume(suspended);
379             }
380             finally
381             {
382                suspended = null;
383                if (trace)
384                   log.trace("MessageEndpoint " + getProxyString(proxy) + " transaction=" + suspended + " already active, IGNORED=" + resource);
385             }
386          }
387       }
388    }
389    
390    /**
391     * End the transaction
392     *
393     * @param mi the invocation
394     * @param commit whether to try to commit
395     * @throws Throwable for any error
396     */

397    protected void endTransaction(Object JavaDoc proxy, boolean commit) throws Throwable JavaDoc
398    {
399       TransactionManager JavaDoc tm = null;
400       Transaction JavaDoc currentTx = null;
401       try
402       {
403          // If we started the transaction, commit it
404
if (transaction != null)
405          {
406             tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
407
currentTx = tm.getTransaction();
408             
409             // Suspend any bad transaction - there is bug somewhere, but we will try to tidy things up
410
if (currentTx != null && currentTx.equals(transaction) == false)
411             {
412                log.warn("Current transaction " + currentTx + " is not the expected transaction.");
413                tm.suspend();
414                tm.resume(transaction);
415             }
416             else
417             {
418                // We have the correct transaction
419
currentTx = null;
420             }
421             
422             // Commit or rollback depending on the status
423
if (commit == false || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK)
424             {
425                if (trace)
426                   log.trace("MessageEndpoint " + getProxyString(proxy) + " rollback");
427                tm.rollback();
428             }
429             else
430             {
431                if (trace)
432                   log.trace("MessageEndpoint " + getProxyString(proxy) + " commit");
433                tm.commit();
434             }
435          }
436
437          // If we suspended the incoming transaction, resume it
438
if (suspended != null)
439          {
440             try
441             {
442                tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
443
tm.resume(suspended);
444             }
445             finally
446             {
447                suspended = null;
448             }
449          }
450       }
451       finally
452       {
453          // Resume any suspended transaction
454
if (currentTx != null)
455          {
456             try
457             {
458                tm.resume(currentTx);
459             }
460             catch (Throwable JavaDoc t)
461             {
462                log.warn("MessageEndpoint " + getProxyString(proxy) + " failed to resume old transaction " + currentTx);
463                
464             }
465          }
466       }
467    }
468    
469    /**
470     * Reset the context classloader
471     *
472     * @param mi the invocation
473     */

474    protected void resetContextClassLoader(Object JavaDoc proxy)
475    {
476       if (trace)
477          log.trace("MessageEndpoint " + getProxyString(proxy) + " reset classloader " + oldClassLoader);
478       SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
479       oldClassLoader = null;
480    }
481
482    /**
483     * Release the thread lock
484     *
485     * @param mi the invocation
486     */

487    protected void releaseThreadLock(Object JavaDoc proxy)
488    {
489       if (trace)
490          log.trace("MessageEndpoint " + getProxyString(proxy) + " no longer in use by " + inUseThread);
491       inUseThread = null;
492    }
493    
494    /**
495     * Get our proxy's string value.
496     *
497     * @param mi the invocation
498     * @return the string
499     */

500    protected String JavaDoc getProxyString(Object JavaDoc proxy)
501    {
502       if (cachedProxyString == null)
503          cachedProxyString = proxy.toString();
504       return cachedProxyString;
505    }
506
507    /**
508     * Get the message endpoint factory
509     *
510     * @return the message endpoint factory
511     */

512    protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation invocation)
513    {
514       if (endpointFactory == null)
515       {
516          MethodInvocation mi = (MethodInvocation)invocation;
517          endpointFactory = (JBossMessageEndpointFactory) mi.getResponseAttachment(MESSAGE_ENDPOINT_FACTORY);
518       }
519       return endpointFactory;
520    }
521    
522    /**
523     * Get the container
524     *
525     * @return the container
526     */

527    protected MessagingContainer getContainer(Invocation mi)
528    {
529       return getMessageEndpointFactory(mi).getContainer();
530    }
531 }
532
Popular Tags