KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ra > ServerSessionImpl


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.ra;
19
20 import java.lang.reflect.Method JavaDoc;
21
22 import javax.jms.JMSException JavaDoc;
23 import javax.jms.Message JavaDoc;
24 import javax.jms.MessageListener JavaDoc;
25 import javax.jms.MessageProducer JavaDoc;
26 import javax.jms.ServerSession JavaDoc;
27 import javax.jms.Session JavaDoc;
28 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
29 import javax.resource.spi.work.Work JavaDoc;
30 import javax.resource.spi.work.WorkEvent JavaDoc;
31 import javax.resource.spi.work.WorkException JavaDoc;
32 import javax.resource.spi.work.WorkListener JavaDoc;
33 import javax.resource.spi.work.WorkManager JavaDoc;
34
35 import org.apache.activemq.ActiveMQSession;
36 import org.apache.activemq.TransactionContext;
37 import org.apache.activemq.ActiveMQSession.DeliveryListener;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40
41 /**
42  * @version $Revision$
43  */

44 public class ServerSessionImpl implements ServerSession JavaDoc, InboundContext, Work JavaDoc, DeliveryListener {
45
46     public static final Method JavaDoc ON_MESSAGE_METHOD;
47
48     static {
49         try {
50             ON_MESSAGE_METHOD = MessageListener JavaDoc.class.getMethod("onMessage", new Class JavaDoc[]{Message JavaDoc.class});
51         }
52         catch (Exception JavaDoc e) {
53             throw new ExceptionInInitializerError JavaDoc(e);
54         }
55     }
56
57     private static int nextLogId=0;
58     synchronized static private int getNextLogId() {
59         return nextLogId++;
60     }
61
62     private int serverSessionId = getNextLogId();
63     private final Log log = LogFactory.getLog( ServerSessionImpl.class.getName()+":"+serverSessionId );
64     
65     private ActiveMQSession session;
66     private WorkManager JavaDoc workManager;
67     private MessageEndpoint JavaDoc endpoint;
68     private MessageProducer JavaDoc messageProducer;
69     private final ServerSessionPoolImpl pool;
70
71     private Object JavaDoc runControlMutex = new Object JavaDoc();
72     private boolean runningFlag = false;
73     /**
74      * True if an error was detected that cause this session to be stale. When a session
75      * is stale, it should not be used again for proccessing.
76      */

77     private boolean stale;
78     /**
79      * Does the TX commit need to be managed by the RA?
80      */

81     private final boolean useRAManagedTx;
82     /**
83      * The maximum number of messages to batch
84      */

85     private final int batchSize;
86     /**
87      * The current number of messages in the batch
88      */

89     private int currentBatchSize;
90
91     public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager JavaDoc workManager, MessageEndpoint JavaDoc endpoint, boolean useRAManagedTx, int batchSize) throws JMSException JavaDoc {
92         this.pool = pool;
93         this.session = session;
94         this.workManager = workManager;
95         this.endpoint = endpoint;
96         this.useRAManagedTx = useRAManagedTx;
97         this.session.setMessageListener((MessageListener JavaDoc) endpoint);
98         this.session.setDeliveryListener(this);
99         this.batchSize = batchSize;
100     }
101
102     public Session JavaDoc getSession() throws JMSException JavaDoc {
103         return session;
104     }
105
106     public MessageProducer JavaDoc getMessageProducer() throws JMSException JavaDoc {
107         if (messageProducer == null) {
108             messageProducer = getSession().createProducer(null);
109         }
110         return messageProducer;
111     }
112
113     /**
114      * @see javax.jms.ServerSession#start()
115      */

116     public void start() throws JMSException JavaDoc {
117
118         synchronized (runControlMutex) {
119             if (runningFlag) {
120                 log.debug("Start request ignored, already running.");
121                 return;
122             }
123             runningFlag = true;
124         }
125
126         // We get here because we need to start a async worker.
127
log.debug("Starting run.");
128         try {
129             workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
130                     new WorkListener JavaDoc() {
131                         //The work listener is useful only for debugging...
132
public void workAccepted(WorkEvent JavaDoc event) {
133                             log.debug("Work accepted: " + event);
134                         }
135
136                         public void workRejected(WorkEvent JavaDoc event) {
137                             log.debug("Work rejected: " + event);
138                         }
139
140                         public void workStarted(WorkEvent JavaDoc event) {
141                             log.debug("Work started: " + event);
142                         }
143
144                         public void workCompleted(WorkEvent JavaDoc event) {
145                             log.debug("Work completed: " + event);
146                         }
147
148                     });
149         }
150         catch (WorkException JavaDoc e) {
151             throw (JMSException JavaDoc) new JMSException JavaDoc("Start failed: " + e).initCause(e);
152         }
153     }
154
155     /**
156      * @see java.lang.Runnable#run()
157      */

158     public void run() {
159         log.debug("Running");
160         while (true) {
161             log.debug("run loop start");
162             try {
163                 InboundContextSupport.register(this);
164                 currentBatchSize = 0;
165                 session.run();
166             }
167             catch (Throwable JavaDoc e) {
168                 stale=true;
169                 log.debug("Endpoint failed to process message.", e);
170                 log.info("Endpoint failed to process message. Reason: " + e);
171             }
172             finally {
173                 InboundContextSupport.unregister(this);
174                 log.debug("run loop end");
175                 synchronized (runControlMutex) {
176                     // This endpoint may have gone stale due to error
177
if( stale) {
178                         runningFlag = false;
179                         pool.removeFromPool(this);
180                         break;
181                     }
182                     if( !session.hasUncomsumedMessages() ) {
183                         runningFlag = false;
184                         pool.returnToPool(this);
185                         break;
186                     }
187                 }
188             }
189         }
190         log.debug("Run finished");
191     }
192
193
194     /**
195      * The ActiveMQSession's run method will call back to this method before
196      * dispactching a message to the MessageListener.
197      */

198     public void beforeDelivery(ActiveMQSession session, Message JavaDoc msg) {
199         if (currentBatchSize == 0) {
200             try {
201                 endpoint.beforeDelivery(ON_MESSAGE_METHOD);
202             } catch (Throwable JavaDoc e) {
203                 throw new RuntimeException JavaDoc("Endpoint before delivery notification failure", e);
204             }
205         }
206     }
207
208     /**
209      * The ActiveMQSession's run method will call back to this method after
210      * dispactching a message to the MessageListener.
211      */

212     public void afterDelivery(ActiveMQSession session, Message JavaDoc msg) {
213         if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
214             currentBatchSize = 0;
215             try {
216                 endpoint.afterDelivery();
217             } catch (Throwable JavaDoc e) {
218                 throw new RuntimeException JavaDoc("Endpoint after delivery notification failure", e);
219             } finally {
220                 TransactionContext transactionContext = session.getTransactionContext();
221                 if( transactionContext != null && transactionContext.isInLocalTransaction() ) {
222                     if( !useRAManagedTx ) {
223                         // Sanitiy Check: If the local transaction has not been commited..
224
// Commit it now.
225
log.warn("Local transaction had not been commited. Commiting now.");
226                     }
227                     try {
228                         session.commit();
229                     } catch (JMSException JavaDoc e) {
230                         log.info("Commit failed:", e);
231                     }
232                 }
233             }
234         }
235     }
236
237     /**
238      * @see javax.resource.spi.work.Work#release()
239      */

240     public void release() {
241         log.debug("release called");
242     }
243
244     /**
245      * @see java.lang.Object#toString()
246      */

247     public String JavaDoc toString() {
248         return "ServerSessionImpl:"+serverSessionId;
249     }
250
251     public void close() {
252         try {
253             endpoint.release();
254         } catch (Throwable JavaDoc e) {
255             log.debug("Endpoint did not release properly: "+e,e);
256         }
257         try {
258             session.close();
259         } catch (Throwable JavaDoc e) {
260             log.debug("Session did not close properly: "+e,e);
261         }
262     }
263
264 }
265
Popular Tags