KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > ra > inbound > ServerSessionImpl


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Coridan.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46
47 package org.mr.ra.inbound;
48
49 import java.lang.reflect.Method JavaDoc;
50
51 import javax.jms.JMSException JavaDoc;
52 import javax.jms.Message JavaDoc;
53 import javax.jms.MessageListener JavaDoc;
54 import javax.jms.MessageProducer JavaDoc;
55 import javax.jms.ServerSession JavaDoc;
56 import javax.jms.Session JavaDoc;
57 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
58 import javax.resource.spi.work.Work JavaDoc;
59 import javax.resource.spi.work.WorkException JavaDoc;
60 import javax.resource.spi.work.WorkManager JavaDoc;
61
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64 import org.mr.api.jms.MantaSession;
65 import org.mr.api.jms.MantaSession.DeliveryListener;
66
67 /**
68  * @version $Revision: 1.1.1.1 $
69  */

70 public class ServerSessionImpl implements ServerSession JavaDoc,
71                                           SessionAndProducer,
72                                           Work JavaDoc,
73                                           DeliveryListener {
74
75     public static final Method JavaDoc ON_MESSAGE_METHOD;
76
77     static {
78         try {
79             ON_MESSAGE_METHOD = MessageListener JavaDoc.class.getMethod("onMessage", new Class JavaDoc[]{Message JavaDoc.class});
80         }
81         catch (Exception JavaDoc e) {
82             throw new ExceptionInInitializerError JavaDoc(e);
83         }
84     }
85
86     private static int nextLogId = 0;
87     
88     synchronized static private int getNextLogId() {
89         return nextLogId++;
90     }
91
92     private int serverSessionId = getNextLogId();
93     
94     private final Log log = LogFactory.getLog(ServerSessionImpl.class.getName()+":"+serverSessionId);
95     
96     private MantaSession session;
97     private WorkManager JavaDoc workManager;
98     private MessageEndpoint JavaDoc endpoint;
99     private MessageProducer JavaDoc messageProducer;
100     private final ServerSessionPoolImpl pool;
101
102     private Object JavaDoc runControlMutex = new Object JavaDoc();
103     private boolean runningFlag = false;
104     /**
105      * True if an error was detected that cause this session to be stale. When a session
106      * is stale, it should not be used again for proccessing.
107      */

108     private boolean stale;
109     /**
110      * Does the TX commit need to be managed by the RA?
111      */

112     private final boolean useRAManagedTx;
113     /**
114      * The maximum number of messages to batch
115      */

116     private final int batchSize;
117     /**
118      * The current number of messages in the batch
119      */

120     private int currentBatchSize;
121
122     public ServerSessionImpl(ServerSessionPoolImpl pool,
123                              MantaSession session,
124                              WorkManager JavaDoc workManager,
125                              MessageEndpoint JavaDoc endpoint,
126                              boolean useRAManagedTx,
127                              int batchSize) throws JMSException JavaDoc {
128         this.pool = pool;
129         this.session = session;
130         this.workManager = workManager;
131         this.endpoint = endpoint;
132         this.useRAManagedTx = useRAManagedTx;
133         this.session.setMessageListener((MessageListener JavaDoc) endpoint);
134         this.session.setDeliveryListener(this);
135         this.batchSize = batchSize;
136     }
137
138     public Session JavaDoc getSession() throws JMSException JavaDoc {
139         return session;
140     }
141
142     public MessageProducer JavaDoc getMessageProducer() throws JMSException JavaDoc {
143         if (messageProducer == null) {
144             messageProducer = getSession().createProducer(null);
145         }
146         return messageProducer;
147     }
148
149     /**
150      * @see javax.jms.ServerSession#start()
151      */

152     public void start() throws JMSException JavaDoc {
153         synchronized (runControlMutex) {
154             if (runningFlag) {
155                 log.debug("Start request ignored, allready running.");
156                 return;
157             }
158             runningFlag = true;
159         }
160
161         // We get here because we need to start a async worker.
162
log.debug("Starting run.");
163         try {
164             workManager.scheduleWork(this, WorkManager.INDEFINITE, null, null);
165         }
166         catch (WorkException JavaDoc e) {
167             throw (JMSException JavaDoc) new JMSException JavaDoc("Start failed: " + e).initCause(e);
168         }
169     }
170
171     /**
172      * @see java.lang.Runnable#run()
173      */

174     synchronized public void run() {
175         while (true) {
176             try {
177                 SessionAndProducerHelper.register(this);
178                 currentBatchSize = 0;
179                 session.run();
180             }
181             catch (Throwable JavaDoc e) {
182                 stale = true;
183             }
184             finally {
185                 SessionAndProducerHelper.unregister(this);
186                 synchronized (runControlMutex) {
187                     // This endpoint may have gone stale due to error
188
if (stale) {
189                         log.debug("Session stale - removing from pool");
190                         runningFlag = false;
191                         pool.removeFromPool(this);
192                         break;
193                     }
194                     if (!session.hasConsumerMessages()) {
195                         runningFlag = false;
196                         pool.returnToPool(this);
197                         break;
198                     }
199                 }
200             }
201         }
202     }
203
204
205     /**
206      * The MantaSession's run method will call back to this method before
207      * dispactching a message to the MessageListener.
208      */

209     public void beforeDelivery(MantaSession session, Message JavaDoc msg) {
210         if (currentBatchSize == 0) {
211             try {
212                 endpoint.beforeDelivery(ON_MESSAGE_METHOD);
213             } catch (Throwable JavaDoc e) {
214                 throw new RuntimeException JavaDoc("Endpoint before delivery notification failure", e);
215             }
216         }
217     }
218
219     /**
220      * The MantaSession's run method will call back to this method after
221      * dispactching a message to the MessageListener.
222      */

223     public void afterDelivery(MantaSession session, Message JavaDoc msg) {
224         if (++currentBatchSize >= batchSize || !session.hasConsumerMessages()) {
225             currentBatchSize = 0;
226             try {
227                 endpoint.afterDelivery();
228             } catch (Throwable JavaDoc e) {
229                 throw new RuntimeException JavaDoc("Endpoint after delivery notification failure", e);
230             }
231             // Transactions
232
finally {
233                 if (session.getTransactionContext().isInLocalTransaction()) {
234                     if (!useRAManagedTx) {
235                         // Sanitiy Check: If the local transaction has not been commited..
236
// Commit it now.
237
log.warn("Local transaction had not been commited. Commiting now.");
238                     }
239                     try {
240                         session.commit();
241                     } catch (JMSException JavaDoc e) {
242                         log.info("Commit failed:", e);
243                     }
244                 }
245             }
246         }
247     }
248
249     /**
250      * @see javax.resource.spi.work.Work#release()
251      */

252     public void release() {
253         log.debug("release called");
254     }
255
256     /**
257      * @see java.lang.Object#toString()
258      */

259     public String JavaDoc toString() {
260         return "ServerSessionImpl:"+serverSessionId;
261     }
262
263     public void close() {
264         try {
265             endpoint.release();
266         } catch (Throwable JavaDoc e) {
267             log.debug("Endpoint did not release properly: "+e,e);
268         }
269         try {
270             session.close();
271         } catch (Throwable JavaDoc e) {
272             log.debug("Session did not close properly: "+e,e);
273         }
274     }
275
276 }
277
Popular Tags