KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > adapter > jms > inflow > JmsServerSession


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.adapter.jms.inflow;
23
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.ServerSession JavaDoc;
29 import javax.jms.Session JavaDoc;
30 import javax.jms.XAConnection JavaDoc;
31 import javax.jms.XASession JavaDoc;
32 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
33 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
34 import javax.resource.spi.work.Work JavaDoc;
35 import javax.resource.spi.work.WorkEvent JavaDoc;
36 import javax.resource.spi.work.WorkException JavaDoc;
37 import javax.resource.spi.work.WorkListener JavaDoc;
38 import javax.resource.spi.work.WorkManager JavaDoc;
39 import javax.transaction.Status JavaDoc;
40 import javax.transaction.Transaction JavaDoc;
41 import javax.transaction.TransactionManager JavaDoc;
42 import javax.transaction.xa.XAResource JavaDoc;
43
44 import org.jboss.logging.Logger;
45
46 /**
47  * A generic jms session pool.
48  *
49  * @author <a HREF="adrian@jboss.com">Adrian Brock</a>
50  * @author <a HREF="mailto:weston.price@jboss.com>Weston Price</a>
51  * @version $Revision: 58487 $
52  */

53 public class JmsServerSession implements ServerSession JavaDoc, MessageListener JavaDoc, Work JavaDoc, WorkListener JavaDoc
54 {
55    /** The log */
56    private static final Logger log = Logger.getLogger(JmsServerSession.class);
57    
58    /** The session pool */
59    JmsServerSessionPool pool;
60    
61    /** The transacted flag */
62    boolean transacted;
63    
64    /** The acknowledge mode */
65    int acknowledge;
66    
67    /** The session */
68    Session JavaDoc session;
69    
70    /** Any XA session */
71    XASession JavaDoc xaSession;
72    
73    /** The endpoint */
74    MessageEndpoint JavaDoc endpoint;
75    
76    /** Any DLQ handler */
77    DLQHandler dlqHandler;
78    
79    /** The runtimeHandler */
80    RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler();
81    
82    TransactionDemarcationStrategy txnStrategy;
83    
84    
85    /**
86     * Create a new JmsServerSession
87     *
88     * @param pool the server session pool
89     */

90    public JmsServerSession(JmsServerSessionPool pool)
91    {
92       this.pool = pool;
93       
94    }
95    
96    /**
97     * Setup the session
98     */

99    public void setup() throws Exception JavaDoc
100    {
101       JmsActivation activation = pool.getActivation();
102       JmsActivationSpec spec = activation.getActivationSpec();
103
104       dlqHandler = activation.getDLQHandler();
105       
106       Connection JavaDoc connection = activation.getConnection();
107
108       // Create the session
109
if (connection instanceof XAConnection JavaDoc && activation.isDeliveryTransacted())
110       {
111          xaSession = ((XAConnection JavaDoc) connection).createXASession();
112          session = xaSession.getSession();
113       }
114       else
115       {
116          transacted = spec.isSessionTransacted();
117          acknowledge = spec.getAcknowledgeModeInt();
118          session = connection.createSession(transacted, acknowledge);
119       }
120       
121       // Get the endpoint
122
MessageEndpointFactory JavaDoc endpointFactory = activation.getMessageEndpointFactory();
123       XAResource JavaDoc xaResource = null;
124
125       if (activation.isDeliveryTransacted() && xaSession != null)
126          xaResource = xaSession.getXAResource();
127       
128       endpoint = endpointFactory.createEndpoint(xaResource);
129       
130       // Set the message listener
131
session.setMessageListener(this);
132    }
133    
134    /**
135     * Stop the session
136     */

137    public void teardown()
138    {
139       try
140       {
141          if (endpoint != null)
142             endpoint.release();
143       }
144       catch (Throwable JavaDoc t)
145       {
146          log.debug("Error releasing endpoint " + endpoint, t);
147       }
148
149       try
150       {
151          if (xaSession != null)
152             xaSession.close();
153       }
154       catch (Throwable JavaDoc t)
155       {
156          log.debug("Error releasing xaSession " + xaSession, t);
157       }
158
159       try
160       {
161          if (session != null)
162             session.close();
163       }
164       catch (Throwable JavaDoc t)
165       {
166          log.debug("Error releasing session " + session, t);
167       }
168    }
169    
170    public void onMessage(Message JavaDoc message)
171    {
172       try
173       {
174          endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
175     
176          try
177          {
178             if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
179             {
180                MessageListener JavaDoc listener = (MessageListener JavaDoc)endpoint;
181                listener.onMessage(message);
182             }
183          }
184          finally
185          {
186             endpoint.afterDelivery();
187             
188             if (dlqHandler != null)
189                dlqHandler.messageDelivered(message);
190          }
191       }
192
193       catch (Throwable JavaDoc t)
194       {
195          log.error("Unexpected error delivering message " + message, t);
196          
197          if(txnStrategy != null)
198             txnStrategy.error();
199          
200       }
201       
202    
203    }
204
205    public Session JavaDoc getSession() throws JMSException JavaDoc
206    {
207       return session;
208    }
209
210    public void start() throws JMSException JavaDoc
211    {
212       JmsActivation activation = pool.getActivation();
213       WorkManager JavaDoc workManager = activation.getWorkManager();
214       try
215       {
216          workManager.scheduleWork(this, 0, null, this);
217       }
218       catch (WorkException JavaDoc e)
219       {
220          log.error("Unable to schedule work", e);
221          throw new JMSException JavaDoc("Unable to schedule work: " + e.toString());
222       }
223    }
224
225    public void run()
226    {
227       
228       try
229       {
230          txnStrategy = createTransactionDemarcation();
231          
232       }catch(Throwable JavaDoc t)
233       {
234          log.error("Error creating transaction demarcation. Cannot continue.");
235          return;
236       }
237       
238       
239       try
240       {
241          session.run();
242       }
243       catch(Throwable JavaDoc t)
244       {
245          if (txnStrategy != null)
246             txnStrategy.error();
247          
248       }finally
249       {
250          if(txnStrategy != null)
251             txnStrategy.end();
252
253          txnStrategy = null;
254       }
255       
256    }
257    
258    private TransactionDemarcationStrategy createTransactionDemarcation()
259    {
260       return new DemarcationStrategyFactory().getStrategy();
261       
262    }
263    public void release()
264    {
265    }
266    
267    public void workAccepted(WorkEvent JavaDoc e)
268    {
269    }
270
271    public void workCompleted(WorkEvent JavaDoc e)
272    {
273       pool.returnServerSession(this);
274    }
275
276    public void workRejected(WorkEvent JavaDoc e)
277    {
278       pool.returnServerSession(this);
279    }
280
281
282    public void workStarted(WorkEvent JavaDoc e)
283    {
284    }
285    
286    private class DemarcationStrategyFactory
287    {
288       
289       TransactionDemarcationStrategy getStrategy()
290       {
291          TransactionDemarcationStrategy current = null;
292          final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
293          final JmsActivation activation = pool.getActivation();
294          
295          if(activation.isDeliveryTransacted() && xaSession != null)
296          {
297             try
298             {
299                current = new XATransactionDemarcationStrategy();
300             }
301             catch (Throwable JavaDoc t)
302             {
303                log.error(this + " error creating transaction demarcation ", t);
304             }
305           
306          }else
307          {
308                         
309                return new LocalDemarcationStrategy();
310             
311          }
312          
313          return current;
314       }
315    
316    }
317    private interface TransactionDemarcationStrategy
318    {
319       void error();
320       void end();
321       
322    }
323    
324    private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
325    {
326       public void end()
327       {
328          final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
329          
330          if(spec.isSessionTransacted())
331          {
332             if(session != null)
333             {
334                try
335                {
336                   session.commit();
337                }
338                catch (JMSException JavaDoc e)
339                {
340                   log.error("Failed to commit session transaction", e);
341                }
342             }
343          }
344       }
345       
346       public void error()
347       {
348          final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
349          
350          if(spec.isSessionTransacted())
351          {
352             if(session != null)
353                
354                try
355                {
356                   /*
357                    * Looks strange, but this basically means
358                    *
359                    * If the underlying connection was non-XA and the transaction attribute is REQUIRED
360                    * we rollback. Also, if the underlying connection was non-XA and the transaction
361                    * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled
362                    * we rollback to force redelivery.
363                    *
364                    */

365                   if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
366                   {
367                      session.rollback();
368                   }
369                
370                }
371                catch (JMSException JavaDoc e)
372                {
373                   log.error("Failed to rollback session transaction", e);
374                }
375             
376          }
377       }
378    
379    }
380
381    private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
382    {
383      
384       boolean trace = log.isTraceEnabled();
385       
386       Transaction JavaDoc trans = null;
387       TransactionManager JavaDoc tm = pool.getActivation().getTransactionManager();;
388       
389       public XATransactionDemarcationStrategy() throws Throwable JavaDoc
390       {
391             
392             tm.begin();
393
394             try
395             {
396                trans = tm.getTransaction();
397
398                if (trace)
399                   log.trace(JmsServerSession.this + " using tx=" + trans);
400
401                if (xaSession != null)
402                {
403                   XAResource JavaDoc res = xaSession.getXAResource();
404
405                   if (!trans.enlistResource(res))
406                   {
407                      throw new JMSException JavaDoc("could not enlist resource");
408                   }
409                   if (trace)
410                      log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
411                }
412             }
413             catch (Throwable JavaDoc t)
414             {
415                try
416                {
417                   tm.rollback();
418                }
419                catch (Throwable JavaDoc ignored)
420                {
421                   log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
422                }
423                throw t;
424             }
425
426        }
427          
428       
429       public void error()
430       {
431          // Mark for tollback TX via TM
432
try
433          {
434
435             if (trace)
436                log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans);
437             trans.setRollbackOnly();
438          }
439          catch (Throwable JavaDoc t)
440          {
441             log.error(JmsServerSession.this + " failed to set rollback only", t);
442          }
443
444       }
445       
446       public void end()
447       {
448          try
449          {
450
451             // Use the TM to commit the Tx (assert the correct association)
452
Transaction JavaDoc currentTx = tm.getTransaction();
453             if (trans.equals(currentTx) == false)
454                throw new IllegalStateException JavaDoc("Wrong tx association: expected " + trans + " was " + currentTx);
455
456             // Marked rollback
457
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
458             {
459                if (trace)
460                   log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans);
461                // actually roll it back
462
tm.rollback();
463
464                // NO XASession? then manually rollback.
465
// This is not so good but
466
// it's the best we can do if we have no XASession.
467
if (xaSession == null && pool.getActivation().isDeliveryTransacted())
468                {
469                   session.rollback();
470                }
471             }
472
473             else if (trans.getStatus() == Status.STATUS_ACTIVE)
474             {
475                // Commit tx
476
// This will happen if
477
// a) everything goes well
478
// b) app. exception was thrown
479
if (trace)
480                   log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans);
481                tm.commit();
482
483                // NO XASession? then manually commit. This is not so good but
484
// it's the best we can do if we have no XASession.
485
if (xaSession == null && pool.getActivation().isDeliveryTransacted())
486                {
487                   session.commit();
488                }
489             
490             }else
491             {
492                tm.suspend();
493                
494                if (xaSession == null && pool.getActivation().isDeliveryTransacted())
495                {
496                   session.rollback();
497                }
498                
499                
500             }
501
502          }
503          catch (Throwable JavaDoc t)
504          {
505             log.error(JmsServerSession.this + " failed to commit/rollback", t);
506          }
507
508       }
509
510    }
511 }
Popular Tags