KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > jms > asf > StdServerSession


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.jms.asf;
23
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27 import javax.jms.ServerSession JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.XASession JavaDoc;
30 import javax.naming.InitialContext JavaDoc;
31 import javax.transaction.Status JavaDoc;
32 import javax.transaction.Transaction JavaDoc;
33 import javax.transaction.TransactionManager JavaDoc;
34 import javax.transaction.xa.XAResource JavaDoc;
35 import javax.transaction.xa.Xid JavaDoc;
36 import org.jboss.logging.Logger;
37 import org.jboss.tm.TransactionManagerService;
38 import org.jboss.tm.XidFactoryMBean;
39
40 /**
41  * An implementation of ServerSession. <p>
42  *
43  * @author <a HREF="mailto:peter.antman@tim.se">Peter Antman</a> .
44  * @author <a HREF="mailto:jason@planet57.com">Jason Dillon</a>
45  * @author <a HREF="mailto:hiram.chirino@jboss.org">Hiram Chirino</a> .
46  * @author <a HREF="mailto:adrian@jboss.com">Adrian Brock</a>
47  * @version $Revision: 44639 $
48  */

49 public class StdServerSession implements Runnable JavaDoc, ServerSession JavaDoc, MessageListener JavaDoc
50 {
51    /** Instance logger. */
52    static Logger log = Logger.getLogger(StdServerSession.class);
53
54    /** The server session pool which we belong to. */
55    private StdServerSessionPool serverSessionPool;
56
57    /** Our session resource. */
58    private Session JavaDoc session;
59
60    /** Our XA session resource. */
61    private XASession JavaDoc xaSession;
62
63    /** The transaction manager that we will use for transactions. */
64    private TransactionManager JavaDoc tm;
65
66    /**
67     * Use the session's XAResource directly if we have an JBossMQ XASession.
68     * this allows us to get around the TX timeout problem when you have
69     * extensive message processing.
70     */

71    private boolean useLocalTX;
72
73    /** The listener to delegate calls, to. In our case the container invoker. */
74    private MessageListener JavaDoc delegateListener;
75
76    private XidFactoryMBean xidFactory;
77
78    /**
79     * @deprecated
80     * @todo these appeared in jboss-head where are they used?
81     */

82    public TransactionManager JavaDoc getTransactionManager()
83    {
84       return tm;
85    }
86
87    /**
88     * @deprecated
89     * @todo these appeared in jboss-head where are they used?
90     */

91    public void setTransactionManager(TransactionManager JavaDoc transactionManager)
92    {
93       this.tm = transactionManager;
94    }
95
96    /**
97     * Create a <tt>StdServerSession</tt> .
98     *
99     * @param pool The server session pool which we belong to.
100     * @param session Our session resource.
101     * @param xaSession Our XA session resource.
102     * @param delegateListener Listener to call when messages arrives.
103     * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit)
104     * @throws JMSException Transation manager was not found.
105     */

106    StdServerSession(final StdServerSessionPool pool,
107                     final Session JavaDoc session,
108                     final XASession JavaDoc xaSession,
109                     final MessageListener JavaDoc delegateListener,
110                     boolean useLocalTX,
111                     final XidFactoryMBean xidFactory,
112                     final TransactionManager JavaDoc tm)
113            throws JMSException JavaDoc
114    {
115       this.serverSessionPool = pool;
116       this.session = session;
117       this.xaSession = xaSession;
118       this.delegateListener = delegateListener;
119       if (xaSession == null)
120          useLocalTX = false;
121       this.useLocalTX = useLocalTX;
122       this.xidFactory = xidFactory;
123       this.tm = tm;
124
125       log.trace(this + " initializing (pool, session, xaSession, useLocalTX): " +
126                 pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
127
128       // Set out self as message listener
129
if (StdServerSessionPoolFactory.USE_OLD && xaSession != null)
130          xaSession.setMessageListener(this);
131       else
132          session.setMessageListener(this);
133
134       if (tm == null)
135       {
136          InitialContext JavaDoc ctx = null;
137          try
138          {
139             ctx = new InitialContext JavaDoc();
140             this.tm = (TransactionManager JavaDoc) ctx.lookup(TransactionManagerService.JNDI_NAME);
141          }
142          catch (Exception JavaDoc e)
143          {
144             throw new JMSException JavaDoc("Transation manager was not found");
145          }
146          finally
147          {
148             if (ctx != null)
149             {
150                try
151                {
152                   ctx.close();
153                }
154                catch (Exception JavaDoc ignore)
155                {
156                }
157             }
158          }
159       }
160    }
161
162    /**
163     * Returns the session. <p>
164     * <p/>
165     * This simply returns what it has fetched from the connection. It is up to
166     * the jms provider to typecast it and have a private API to stuff messages
167     * into it.
168     *
169     * @return The session.
170     * @throws JMSException Description of Exception
171     */

172    public Session JavaDoc getSession() throws JMSException JavaDoc
173    {
174       if (StdServerSessionPoolFactory.USE_OLD && xaSession != null)
175          return xaSession;
176       else
177          return session;
178    }
179
180    /**
181     * Runs in an own thread, basically calls the session.run(), it is up to the
182     * session to have been filled with messages and it will run against the
183     * listener set in StdServerSessionPool. When it has send all its messages it
184     * returns.
185     */

186    public void run()
187    {
188       boolean trace = log.isTraceEnabled();
189
190       TransactionDemarcation td = null;
191       if (StdServerSessionPoolFactory.USE_OLD == false)
192       {
193          td = createTransactionDemarcation();
194          if (td == null)
195             return;
196       }
197       try
198       {
199          if (trace)
200             log.trace(this + " running...");
201
202          if (StdServerSessionPoolFactory.USE_OLD && xaSession != null)
203             xaSession.run();
204          else
205             session.run();
206
207          if (trace)
208             log.trace(this + " run.");
209       }
210       catch (Throwable JavaDoc t)
211       {
212          log.error(this + " onMessage failed to run; setting rollback only", t);
213          if (td != null)
214             td.error();
215       }
216       finally
217       {
218          if (td != null)
219             td.end();
220
221          recycle();
222       }
223    }
224
225    /**
226     * Will get called from session for each message stuffed into it.
227     * <p/>
228     * Starts a transaction with the TransactionManager
229     * and enlists the XAResource of the JMS XASession if a XASession was
230     * available. A good JMS implementation should provide the XASession for use
231     * in the ASF. So we optimize for the case where we have an XASession. So,
232     * for the case where we do not have an XASession and the bean is not
233     * transacted, we have the unneeded overhead of creating a Transaction. I'm
234     * leaving it this way since it keeps the code simpler and that case should
235     * not be too common (JBossMQ provides XASessions).
236     */

237    public void onMessage(Message JavaDoc msg)
238    {
239       boolean trace = log.isTraceEnabled();
240
241       TransactionDemarcation td = null;
242       if (StdServerSessionPoolFactory.USE_OLD)
243       {
244          td = createTransactionDemarcation();
245          if (td == null)
246             return;
247       }
248       try
249       {
250          if (trace)
251             log.trace(this + " onMessage running (pool, session, xaSession, useLocalTX): " +
252                       ", " + session + ", " + xaSession + ", " + useLocalTX);
253
254          // Call delegate listener
255
delegateListener.onMessage(msg);
256
257          if (trace)
258             log.trace(this + " onMessage finished");
259       }
260       catch (Throwable JavaDoc t)
261       {
262          log.error(this + " onMessage failed to run; setting rollback only", t);
263          if (td != null)
264             td.error();
265       }
266       finally
267       {
268          if (td != null)
269             td.end();
270       }
271       if (trace)
272          log.trace(this + " onMessage done");
273    }
274
275    /**
276     * Start the session and begin consuming messages.
277     *
278     * @throws JMSException No listener has been specified.
279     */

280    public void start() throws JMSException JavaDoc
281    {
282       log.trace(this + " starting invokes on server session");
283
284       if (session != null)
285       {
286          try
287          {
288             serverSessionPool.getExecutor().execute(this);
289          }
290          catch (InterruptedException JavaDoc ignore)
291          {
292          }
293       }
294       else
295       {
296          throw new JMSException JavaDoc(this + " no listener has been specified");
297       }
298    }
299
300    /**
301     * Called by the ServerSessionPool when the sessions should be closed.
302     */

303    void close()
304    {
305       log.trace(this + " closing.");
306       
307       if (session != null)
308       {
309          try
310          {
311             session.close();
312          }
313          catch (Exception JavaDoc ignore)
314          {
315          }
316
317          session = null;
318       }
319
320       if (xaSession != null)
321       {
322          try
323          {
324             xaSession.close();
325          }
326          catch (Exception JavaDoc ignore)
327          {
328          }
329          xaSession = null;
330       }
331
332       log.debug("closed");
333    }
334
335    /**
336     * This method is called by the ServerSessionPool when it is ready to be
337     * recycled intot the pool
338     */

339    void recycle()
340    {
341       boolean trace = log.isTraceEnabled();
342       if (trace)
343          log.trace(this + " recycling");
344       serverSessionPool.recycle(this);
345       if (trace)
346          log.trace(this + " recycled");
347    }
348    
349    TransactionDemarcation createTransactionDemarcation()
350    {
351       try
352       {
353          return new TransactionDemarcation();
354       }
355       catch (Throwable JavaDoc t)
356       {
357          log.error(this + " error creating transaction demarcation ", t);
358          return null;
359       }
360    }
361    
362    private class TransactionDemarcation
363    {
364       boolean trace = log.isTraceEnabled();
365       
366       // Used if run with useLocalTX if true
367
Xid JavaDoc localXid = null;
368       boolean localRollbackFlag = false;
369       // Used if run with useLocalTX if false
370
Transaction JavaDoc trans = null;
371
372       public TransactionDemarcation() throws Throwable JavaDoc
373       {
374          if (useLocalTX)
375          {
376             // Use JBossMQ One Phase Commit to commit the TX
377
localXid = xidFactory.newXid();//new XidImpl();
378
XAResource JavaDoc res = xaSession.getXAResource();
379             res.start(localXid, XAResource.TMNOFLAGS);
380
381             if (trace)
382                log.trace(StdServerSession.this + " using optimized 1p commit to control TX. xid=" + localXid);
383          }
384          else
385          {
386
387             // Use the TM to control the TX
388
tm.begin();
389             try
390             {
391                trans = tm.getTransaction();
392
393                if (trace)
394                   log.trace(StdServerSession.this + " using tx=" + trans);
395
396                if (xaSession != null)
397                {
398                   XAResource JavaDoc res = xaSession.getXAResource();
399                   if (!trans.enlistResource(res))
400                   {
401                      throw new JMSException JavaDoc("could not enlist resource");
402                   }
403                   if (trace)
404                      log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted.");
405                }
406             }
407             catch (Throwable JavaDoc t)
408             {
409                try
410                {
411                   tm.rollback();
412                }
413                catch (Throwable JavaDoc ignored)
414                {
415                   log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored);
416                }
417                throw t;
418             }
419          }
420       }
421       
422       public void error()
423       {
424          if (useLocalTX)
425          {
426             // Use JBossMQ One Phase Commit to commit the TX
427
localRollbackFlag = true;
428          }
429          else
430          {
431             // Mark for tollback TX via TM
432
try
433             {
434                // The transaction will be rolledback in the finally
435
if (trace)
436                   log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans);
437                trans.setRollbackOnly();
438             }
439             catch (Throwable JavaDoc t)
440             {
441                log.error(StdServerSession.this + " failed to set rollback only", t);
442             }
443          }
444       }
445       
446       public void end()
447       {
448          try
449          {
450             if (useLocalTX)
451             {
452                if (localRollbackFlag == true)
453                {
454                   if (trace)
455                      log.trace(StdServerSession.this + " using optimized 1p commit to rollback TX xid=" + localXid);
456
457                   XAResource JavaDoc res = xaSession.getXAResource();
458                   res.end(localXid, XAResource.TMSUCCESS);
459                   res.rollback(localXid);
460
461                }
462                else
463                {
464                   if (trace)
465                      log.trace(StdServerSession.this + " using optimized 1p commit to commit TX xid=" + localXid);
466
467                   XAResource JavaDoc res = xaSession.getXAResource();
468                   res.end(localXid, XAResource.TMSUCCESS);
469                   res.commit(localXid, true);
470                }
471             }
472             else
473             {
474                // Use the TM to commit the Tx (assert the correct association)
475
Transaction JavaDoc currentTx = tm.getTransaction();
476                if (trans.equals(currentTx) == false)
477                   throw new IllegalStateException JavaDoc("Wrong tx association: expected " + trans + " was " + currentTx);
478
479                // Marked rollback
480
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
481                {
482                   if (trace)
483                      log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans);
484                   // actually roll it back
485
tm.rollback();
486
487                   // NO XASession? then manually rollback.
488
// This is not so good but
489
// it's the best we can do if we have no XASession.
490
if (xaSession == null && serverSessionPool.isTransacted())
491                   {
492                      session.rollback();
493                   }
494                }
495                else if (trans.getStatus() == Status.STATUS_ACTIVE)
496                {
497                   // Commit tx
498
// This will happen if
499
// a) everything goes well
500
// b) app. exception was thrown
501
if (trace)
502                      log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans);
503                   tm.commit();
504
505                   // NO XASession? then manually commit. This is not so good but
506
// it's the best we can do if we have no XASession.
507
if (xaSession == null && serverSessionPool.isTransacted())
508                   {
509                      session.commit();
510                   }
511                }
512             }
513          }
514          catch (Throwable JavaDoc t)
515          {
516             log.error(StdServerSession.this + " failed to commit/rollback", t);
517          }
518       }
519    }
520 }
521
Popular Tags