KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ra > ServerSessionPoolImpl


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.ra;
19
20 import java.util.Iterator JavaDoc;
21 import java.util.List JavaDoc;
22
23 import javax.jms.JMSException JavaDoc;
24 import javax.jms.ServerSession JavaDoc;
25 import javax.jms.ServerSessionPool JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import javax.resource.spi.UnavailableException JavaDoc;
28 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
29
30 import org.apache.activemq.ActiveMQQueueSession;
31 import org.apache.activemq.ActiveMQSession;
32 import org.apache.activemq.ActiveMQTopicSession;
33 import org.apache.activemq.command.MessageDispatch;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
37 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
38
39 /**
40  * @version $Revision$ $Date$
41  */

42 public class ServerSessionPoolImpl implements ServerSessionPool JavaDoc {
43     
44     private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class);
45
46     private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
47     private final int maxSessions;
48
49     private List JavaDoc idleSessions = new CopyOnWriteArrayList JavaDoc();
50     private List JavaDoc activeSessions = new CopyOnWriteArrayList JavaDoc();
51     private AtomicBoolean JavaDoc closing = new AtomicBoolean JavaDoc(false);
52
53     public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
54         this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
55         this.maxSessions=maxSessions;
56     }
57
58     private ServerSessionImpl createServerSessionImpl() throws JMSException JavaDoc {
59         MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
60         int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
61         final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge);
62         MessageEndpoint JavaDoc endpoint;
63         try {
64             int batchSize = 0;
65             if (activationSpec.getEnableBatchBooleanValue()) {
66                 batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
67             }
68             if( activationSpec.isUseRAManagedTransactionEnabled() ) {
69                 // The RA will manage the transaction commit.
70
endpoint = createEndpoint(null);
71                 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
72             } else {
73                 // Give the container an object to manage to transaction with.
74
endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
75                 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
76             }
77         } catch (UnavailableException JavaDoc e) {
78             // The container could be limiting us on the number of endpoints
79
// that are being created.
80
log.debug("Could not create an endpoint.", e);
81             session.close();
82             return null;
83         }
84     }
85
86     private MessageEndpoint JavaDoc createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException JavaDoc {
87         MessageEndpoint JavaDoc endpoint;
88         endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
89         MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
90         return endpointProxy;
91     }
92
93     /**
94      */

95     public ServerSession JavaDoc getServerSession() throws JMSException JavaDoc {
96         log.debug("ServerSession requested.");
97         if (closing.get()) {
98             throw new JMSException JavaDoc("Session Pool Shutting Down.");
99         }
100
101         if (idleSessions.size() > 0) {
102             ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1);
103             activeSessions.add(ss);
104             log.debug("Using idle session: " + ss);
105             return ss;
106         } else {
107             // Are we at the upper limit?
108
if (activeSessions.size() >= maxSessions) {
109                 // then reuse the already created sessions..
110
// This is going to queue up messages into a session for
111
// processing.
112
return getExistingServerSession();
113             }
114             ServerSessionImpl ss = createServerSessionImpl();
115             // We may not be able to create a session due to the container
116
// restricting us.
117
if (ss == null) {
118                 if (idleSessions.size() == 0) {
119                     throw new JMSException JavaDoc("Endpoint factory did not allows to any endpoints.");
120                 }
121
122                 return getExistingServerSession();
123             }
124             activeSessions.add(ss);
125             log.debug("Created a new session: " + ss);
126             return ss;
127         }
128     }
129
130     /**
131      * @param messageDispatch the message to dispatch
132      * @throws JMSException
133      */

134     private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException JavaDoc {
135
136         ServerSession JavaDoc serverSession = getServerSession();
137         Session JavaDoc s = serverSession.getSession();
138         ActiveMQSession session = null;
139         if( s instanceof ActiveMQSession ) {
140             session = (ActiveMQSession) s;
141         } else if(s instanceof ActiveMQQueueSession) {
142             session = (ActiveMQSession) s;
143         } else if(s instanceof ActiveMQTopicSession) {
144             session = (ActiveMQSession) s;
145         } else {
146             activeMQAsfEndpointWorker.connection.onAsyncException(new JMSException JavaDoc("Session pool provided an invalid session type: "+s.getClass()));
147         }
148         session.dispatch(messageDispatch);
149         serverSession.start();
150     }
151
152     
153     /**
154      * @return
155      */

156     private ServerSession JavaDoc getExistingServerSession() {
157         ServerSessionImpl ss = (ServerSessionImpl) activeSessions.remove(0);
158         activeSessions.add(ss);
159         log.debug("Reusing an active session: " + ss);
160         return ss;
161     }
162
163     public void returnToPool(ServerSessionImpl ss) {
164         log.debug("Session returned to pool: " + ss);
165         activeSessions.remove(ss);
166         idleSessions.add(ss);
167         synchronized(closing){
168             closing.notify();
169         }
170     }
171
172      public void removeFromPool(ServerSessionImpl ss) {
173         activeSessions.remove(ss);
174         try {
175             ActiveMQSession session = (ActiveMQSession) ss.getSession();
176             List JavaDoc l = session.getUnconsumedMessages();
177             for (Iterator JavaDoc i = l.iterator(); i.hasNext();) {
178                 dispatchToSession((MessageDispatch) i.next());
179             }
180         } catch (Throwable JavaDoc t) {
181             log.error("Error redispatching unconsumed messages from stale session", t);
182         }
183         ss.close();
184         synchronized(closing){
185             closing.notify();
186         }
187     }
188
189     public void close() {
190         synchronized (closing) {
191             closing.set(true);
192             closeIdleSessions();
193             while( activeSessions.size() > 0 ) {
194                 System.out.println("ACtive Sessions = " + activeSessions.size());
195                 try {
196                     closing.wait(1000);
197                 } catch (InterruptedException JavaDoc e) {
198                     Thread.currentThread().interrupt();
199                     return;
200                 }
201                 closeIdleSessions();
202             }
203         }
204     }
205
206     private void closeIdleSessions() {
207         for (Iterator JavaDoc iter = idleSessions.iterator(); iter.hasNext();) {
208             ServerSessionImpl ss = (ServerSessionImpl) iter.next();
209             ss.close();
210         }
211     }
212
213     /**
214      * @return Returns the closing.
215      */

216     public boolean isClosing(){
217         return closing.get();
218     }
219
220     /**
221      * @param closing The closing to set.
222      */

223     public void setClosing(boolean closing){
224         this.closing.set(closing);
225     }
226
227 }
228
Popular Tags