KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > ra > inbound > ServerSessionPoolImpl


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Coridan.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46
47 package org.mr.ra.inbound;
48
49 import java.util.ArrayList JavaDoc;
50 import java.util.Iterator JavaDoc;
51 import java.util.LinkedList JavaDoc;
52
53 import javax.jms.JMSException JavaDoc;
54 import javax.jms.ServerSession JavaDoc;
55 import javax.jms.ServerSessionPool JavaDoc;
56 import javax.jms.Session JavaDoc;
57 import javax.resource.spi.UnavailableException JavaDoc;
58 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
59
60 import org.apache.commons.logging.Log;
61 import org.apache.commons.logging.LogFactory;
62 import org.mr.api.jms.MantaSession;
63 import org.mr.ra.LocalAndXATransaction;
64
65 /**
66  * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:11 $
67  */

68 public class ServerSessionPoolImpl implements ServerSessionPool JavaDoc {
69     
70     private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class);
71     
72     private final MantaAsfEndpointWorker mantaAsfEndpointWorker;
73     private final int maxSessions;
74     
75     private ArrayList JavaDoc idleSessions = new ArrayList JavaDoc();
76     private LinkedList JavaDoc activeSessions = new LinkedList JavaDoc();
77     private boolean closing = false;
78     
79     /**
80      * Constructor
81      * @param mantaAsfEndpointWorker
82      * @param maxSessions
83      */

84     public ServerSessionPoolImpl(MantaAsfEndpointWorker mantaAsfEndpointWorker, int maxSessions) {
85         this.mantaAsfEndpointWorker = mantaAsfEndpointWorker;
86         this.maxSessions = maxSessions;
87     }
88     
89     private ServerSessionImpl createServerSessionImpl() throws JMSException JavaDoc {
90         ActivationSpecImpl activationSpec = mantaAsfEndpointWorker.endpointActivationKey.getActivationSpec();
91
92         final MantaSession session;
93         if (mantaAsfEndpointWorker.transacted) {
94             session = (MantaSession) mantaAsfEndpointWorker.connection.createXASession();
95         }
96         else {
97             int acknowledge = activationSpec.getAcknowledgeModeForSession();
98             session = (MantaSession) mantaAsfEndpointWorker.connection.createSession(false, acknowledge);
99         }
100         //final MantaSession session = (MantaSession) mantaAsfEndpointWorker.connection.createSession(mantaAsfEndpointWorker.transacted, acknowledge);
101
//final MantaSession session = (MantaSession) mantaAsfEndpointWorker.connection.createSession(mantaAsfEndpointWorker.transacted, acknowledge);
102
//final MantaSession session = (MantaSession) mantaAsfEndpointWorker.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
103
MessageEndpoint JavaDoc endpoint;
104         try {
105             int batchSize = 0;
106             if (activationSpec.getEnableBatchBooleanValue()) {
107                 batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
108             }
109             if (activationSpec.isUseRAManagedTransactionEnabled()) {
110                 // The RA will manage the transaction commit.
111
endpoint = createEndpoint(null);
112                 return new ServerSessionImpl(this,
113                                              (MantaSession)session,
114                                              mantaAsfEndpointWorker.workManager,
115                                              endpoint,
116                                              true,
117                                              batchSize);
118             } else {
119                 // Give the container an object to manage to transaction with.
120
endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
121                 return new ServerSessionImpl(this,
122                                              (MantaSession)session,
123                                              mantaAsfEndpointWorker.workManager,
124                                              endpoint,
125                                              false,
126                                              batchSize);
127             }
128 // endpoint = createEndpoint();
129
// return new ServerSessionImpl(this,
130
// (MantaSession)session,
131
// mantaAsfEndpointWorker.workManager,
132
// endpoint,
133
// //true,
134
// batchSize);
135
} catch (UnavailableException JavaDoc e) {
136             // The container could be limiting us on the number of endpoints
137
// that are being created.
138
log.error("createServerSessionImpl(): "+e);
139             session.close();
140             return null;
141         }
142     }
143     
144     private MessageEndpoint JavaDoc createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException JavaDoc {
145         MessageEndpoint JavaDoc endpoint;
146         endpoint = mantaAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
147         MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
148         return endpointProxy;
149     }
150 // private MessageEndpoint createEndpoint() throws UnavailableException {
151
// MessageEndpoint endpoint;
152
// endpoint = mantaAsfEndpointWorker.endpointFactory.createEndpoint(null);
153
// MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
154
// return endpointProxy;
155
// }
156

157     /**
158      * Returns a ServerSession from the pool
159      */

160     synchronized public ServerSession JavaDoc getServerSession() throws JMSException JavaDoc {
161         if (closing) {
162             throw new JMSException JavaDoc("Session Pool Shutting Down.");
163         }
164         
165         if (idleSessions.size() > 0) {
166             return getExistingIdleServerSession();
167         }
168         else {
169             // Are we at the upper limit?
170
if (activeSessions.size() >= maxSessions) {
171                 // then reuse the allready created sessions..
172
// This is going to queue up messages into a session for
173
// processing.
174
return getExistingServerSession();
175             }
176             ServerSessionImpl ss = createServerSessionImpl();
177             // We may not be able to create a session due to the conatiner
178
// restricting us.
179
if (ss == null) {
180                 return getExistingServerSession();
181             }
182             activeSessions.addLast(ss);
183             log.debug("Created a new session: " + ss);
184             return ss;
185         }
186     }
187     
188     /**
189      * @param message
190      * @throws JMSException
191      */

192 // private void dispatchToSession(MantaMessage message) throws JMSException {
193
//
194
// ServerSession serverSession = getServerSession();
195
// Session nestedSession = serverSession.getSession();
196
// MantaSession session = null;
197
// if (!(nestedSession instanceof MantaSession)) {
198
// throw new JMSException("Invalid instance of session obtained from server session." +
199
// "The instance should be one of the following: MantaSession, MantaTopicSession, MantaQueueSession. " +
200
// "Found instance of " + nestedSession.getClass().getName());
201
// }
202
//// if (nestedSession instanceof MantaSession) {
203
//// session = (MantaSession) nestedSession;
204
//// } else if (nestedSession instanceof MantaTopicSession) {
205
//// MantaTopicSession topicSession = (MantaTopicSession) nestedSession;
206
//// session = (MantaSession) topicSession.getNext();
207
//// } else if (nestedSession instanceof MantaQueueSession) {
208
//// MantaQueueSession queueSession = (MantaQueueSession) nestedSession;
209
//// session = (MantaSession) queueSession.getNext();
210
//// } else {
211
//// throw new JMSException("Invalid instance of session obtained from server session." +
212
//// "The instance should be one of the following: MantaSession, MantaTopicSession, MantaQueueSession. " +
213
//// "Found instance of " + nestedSession.getClass().getName());
214
//// }
215
// session.dispatch(message);
216
// serverSession.start();
217
// }
218

219     
220     /**
221      * @return
222      */

223     private ServerSession JavaDoc getExistingIdleServerSession() {
224         ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1);
225         activeSessions.addLast(ss);
226         log.debug("Using idle session: " + ss);
227         return ss;
228     }
229     
230     /**
231      * @return
232      */

233     private ServerSession JavaDoc getExistingServerSession() {
234         ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();
235         activeSessions.addLast(ss);
236         log.debug("Reusing an active session: " + ss);
237         return ss;
238     }
239     
240     synchronized public void returnToPool(ServerSessionImpl ss) {
241         log.debug("Session returned to pool: " + ss);
242         activeSessions.remove(ss);
243         idleSessions.add(ss);
244         notify();
245     }
246     
247     synchronized public void removeFromPool(ServerSessionImpl ss) {
248         activeSessions.remove(ss);
249         // TODO: see if you need this code
250
/*try {
251             MantaSession session = (MantaSession) ss.getSession();
252             List l = session.getUnconsumedMessages();
253             for (Iterator i = l.iterator(); i.hasNext();) {
254                 dispatchToSession((MantaMessage) i.next());
255             }
256         } catch (Throwable t) {
257             log.error("Error redispatching unconsumed messages from stale session", t);
258         }*/

259         ss.close();
260         notify();
261     }
262     
263     public void close() {
264         synchronized (this) {
265             closing = true;
266             //closeIdleSessions();
267
while( activeSessions.size() > 0 ) {
268                 try {
269                     wait();
270                 } catch (InterruptedException JavaDoc e) {
271                     Thread.currentThread().interrupt();
272                     return;
273                 }
274                 //closeIdleSessions();
275
}
276             closeIdleSessions();
277         }
278     }
279     
280     private void closeIdleSessions() {
281         for (Iterator JavaDoc iter = idleSessions.iterator(); iter.hasNext();) {
282             ServerSessionImpl ss = (ServerSessionImpl) iter.next();
283             ss.close();
284         }
285     }
286     
287 }
288
Popular Tags