KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > pool > ConnectionPool


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.pool;
19
20 import java.io.IOException JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.Map JavaDoc;
24
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import javax.transaction.RollbackException JavaDoc;
28 import javax.transaction.Status JavaDoc;
29 import javax.transaction.SystemException JavaDoc;
30 import javax.transaction.TransactionManager JavaDoc;
31 import javax.transaction.xa.XAResource JavaDoc;
32
33 import org.apache.activemq.ActiveMQConnection;
34 import org.apache.activemq.transport.TransportListener;
35 import org.apache.commons.pool.ObjectPoolFactory;
36
37 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
38
39 /**
40  * Holds a real JMS connection along with the session pools associated with it.
41  *
42  * @version $Revision: 514668 $
43  */

44 public class ConnectionPool {
45     
46     private TransactionManager JavaDoc transactionManager;
47     private ActiveMQConnection connection;
48     private Map JavaDoc cache;
49     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
50     private int referenceCount;
51     private ObjectPoolFactory poolFactory;
52     private long lastUsed = System.currentTimeMillis();
53     private boolean hasFailed;
54     private boolean hasExpired;
55     private int idleTimeout = 30*1000;
56
57     public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager JavaDoc transactionManager) {
58         this(connection, new HashMap JavaDoc(), poolFactory, transactionManager);
59         // Add a transport Listener so that we can notice if this connection should be expired due to
60
// a connection failure.
61
connection.addTransportListener(new TransportListener(){
62             public void onCommand(Object JavaDoc command) {
63             }
64             public void onException(IOException JavaDoc error) {
65                 synchronized(ConnectionPool.this) {
66                     hasFailed = true;
67                 }
68             }
69             public void transportInterupted() {
70             }
71             public void transportResumed() {
72             }
73         });
74     }
75
76     public ConnectionPool(ActiveMQConnection connection, Map JavaDoc cache, ObjectPoolFactory poolFactory, TransactionManager JavaDoc transactionManager) {
77         this.connection = connection;
78         this.cache = cache;
79         this.poolFactory = poolFactory;
80         this.transactionManager = transactionManager;
81     }
82
83     public void start() throws JMSException JavaDoc {
84         if (started.compareAndSet(false, true)) {
85             connection.start();
86         }
87     }
88
89     synchronized public ActiveMQConnection getConnection() {
90         return connection;
91     }
92
93     public Session JavaDoc createSession(boolean transacted, int ackMode) throws JMSException JavaDoc {
94         try {
95             boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
96             if (isXa) {
97                 transacted = true;
98                 ackMode = Session.SESSION_TRANSACTED;
99             }
100             SessionKey key = new SessionKey(transacted, ackMode);
101             SessionPool pool = (SessionPool) cache.get(key);
102             if (pool == null) {
103                 pool = new SessionPool(this, key, poolFactory.createPool());
104                 cache.put(key, pool);
105             }
106             PooledSession session = pool.borrowSession();
107             if (isXa) {
108                 session.setIgnoreClose(true);
109                 transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
110                 incrementReferenceCount();
111                 transactionManager.getTransaction().enlistResource(createXaResource(session));
112             }
113             return session;
114         } catch (RollbackException JavaDoc e) {
115             final JMSException JavaDoc jmsException = new JMSException JavaDoc("Rollback Exception");
116             jmsException.initCause(e);
117             throw jmsException;
118         } catch (SystemException JavaDoc e) {
119             final JMSException JavaDoc jmsException = new JMSException JavaDoc("System Exception");
120             jmsException.initCause(e);
121             throw jmsException;
122         }
123     }
124
125     synchronized public void close() {
126         if( connection!=null ) {
127             try {
128                 Iterator JavaDoc i = cache.values().iterator();
129                 while (i.hasNext()) {
130                     SessionPool pool = (SessionPool) i.next();
131                     i.remove();
132                     try {
133                         pool.close();
134                     } catch (Exception JavaDoc e) {
135                     }
136                 }
137             } finally {
138                 try {
139                     connection.close();
140                 } catch (Exception JavaDoc e) {
141                 } finally {
142                     connection = null;
143                 }
144             }
145         }
146     }
147
148     synchronized public void incrementReferenceCount() {
149         referenceCount++;
150         lastUsed = System.currentTimeMillis();
151     }
152
153     synchronized public void decrementReferenceCount() {
154         referenceCount--;
155         lastUsed = System.currentTimeMillis();
156         if( referenceCount == 0 ) {
157             expiredCheck();
158         }
159     }
160
161     /**
162      * @return true if this connection has expired.
163      */

164     synchronized public boolean expiredCheck() {
165         if( connection == null ) {
166             return true;
167         }
168         if( hasExpired ) {
169             if( referenceCount == 0 ) {
170                 close();
171             }
172             return true;
173         }
174         if( hasFailed || ( idleTimeout>0 && System.currentTimeMillis() > lastUsed+idleTimeout) ) {
175             hasExpired=true;
176             if( referenceCount == 0 ) {
177                 close();
178             }
179             return true;
180         }
181         return false;
182     }
183
184     public int getIdleTimeout() {
185         return idleTimeout;
186     }
187
188     public void setIdleTimeout(int idleTimeout) {
189         this.idleTimeout = idleTimeout;
190     }
191     
192     protected XAResource JavaDoc createXaResource(PooledSession session) throws JMSException JavaDoc {
193         return session.getSession().getTransactionContext();
194     }
195
196     protected class Synchronization implements javax.transaction.Synchronization JavaDoc {
197         private final PooledSession session;
198
199         private Synchronization(PooledSession session) {
200             this.session = session;
201         }
202
203         public void beforeCompletion() {
204         }
205         
206         public void afterCompletion(int status) {
207             try {
208                 // This will return session to the pool.
209
session.setIgnoreClose(false);
210                 session.close();
211                 decrementReferenceCount();
212             } catch (JMSException JavaDoc e) {
213                 throw new RuntimeException JavaDoc(e);
214             }
215         }
216     }
217     
218 }
219
Popular Tags