1 18 package org.apache.activemq.pool; 19 20 import java.io.IOException ; 21 import java.util.HashMap ; 22 import java.util.Iterator ; 23 import java.util.Map ; 24 25 import javax.jms.JMSException ; 26 import javax.jms.Session ; 27 import javax.transaction.RollbackException ; 28 import javax.transaction.Status ; 29 import javax.transaction.SystemException ; 30 import javax.transaction.TransactionManager ; 31 import javax.transaction.xa.XAResource ; 32 33 import org.apache.activemq.ActiveMQConnection; 34 import org.apache.activemq.transport.TransportListener; 35 import org.apache.commons.pool.ObjectPoolFactory; 36 37 import java.util.concurrent.atomic.AtomicBoolean ; 38 39 44 public class ConnectionPool { 45 46 private TransactionManager transactionManager; 47 private ActiveMQConnection connection; 48 private Map cache; 49 private AtomicBoolean started = new AtomicBoolean (false); 50 private int referenceCount; 51 private ObjectPoolFactory poolFactory; 52 private long lastUsed = System.currentTimeMillis(); 53 private boolean hasFailed; 54 private boolean hasExpired; 55 private int idleTimeout = 30*1000; 56 57 public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) { 58 this(connection, new HashMap (), poolFactory, transactionManager); 59 connection.addTransportListener(new TransportListener(){ 62 public void onCommand(Object command) { 63 } 64 public void onException(IOException error) { 65 synchronized(ConnectionPool.this) { 66 hasFailed = true; 67 } 68 } 69 public void transportInterupted() { 70 } 71 public void transportResumed() { 72 } 73 }); 74 } 75 76 public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory, TransactionManager transactionManager) { 77 this.connection = connection; 78 this.cache = cache; 79 this.poolFactory = poolFactory; 80 this.transactionManager = transactionManager; 81 } 82 83 public void start() throws JMSException { 84 if (started.compareAndSet(false, true)) { 85 connection.start(); 86 } 87 } 88 89 synchronized public ActiveMQConnection getConnection() { 90 return connection; 91 } 92 93 public Session createSession(boolean transacted, int ackMode) throws JMSException { 94 try { 95 boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); 96 if (isXa) { 97 transacted = true; 98 ackMode = Session.SESSION_TRANSACTED; 99 } 100 SessionKey key = new SessionKey(transacted, ackMode); 101 SessionPool pool = (SessionPool) cache.get(key); 102 if (pool == null) { 103 pool = new SessionPool(this, key, poolFactory.createPool()); 104 cache.put(key, pool); 105 } 106 PooledSession session = pool.borrowSession(); 107 if (isXa) { 108 session.setIgnoreClose(true); 109 transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); 110 incrementReferenceCount(); 111 transactionManager.getTransaction().enlistResource(createXaResource(session)); 112 } 113 return session; 114 } catch (RollbackException e) { 115 final JMSException jmsException = new JMSException ("Rollback Exception"); 116 jmsException.initCause(e); 117 throw jmsException; 118 } catch (SystemException e) { 119 final JMSException jmsException = new JMSException ("System Exception"); 120 jmsException.initCause(e); 121 throw jmsException; 122 } 123 } 124 125 synchronized public void close() { 126 if( connection!=null ) { 127 try { 128 Iterator i = cache.values().iterator(); 129 while (i.hasNext()) { 130 SessionPool pool = (SessionPool) i.next(); 131 i.remove(); 132 try { 133 pool.close(); 134 } catch (Exception e) { 135 } 136 } 137 } finally { 138 try { 139 connection.close(); 140 } catch (Exception e) { 141 } finally { 142 connection = null; 143 } 144 } 145 } 146 } 147 148 synchronized public void incrementReferenceCount() { 149 referenceCount++; 150 lastUsed = System.currentTimeMillis(); 151 } 152 153 synchronized public void decrementReferenceCount() { 154 referenceCount--; 155 lastUsed = System.currentTimeMillis(); 156 if( referenceCount == 0 ) { 157 expiredCheck(); 158 } 159 } 160 161 164 synchronized public boolean expiredCheck() { 165 if( connection == null ) { 166 return true; 167 } 168 if( hasExpired ) { 169 if( referenceCount == 0 ) { 170 close(); 171 } 172 return true; 173 } 174 if( hasFailed || ( idleTimeout>0 && System.currentTimeMillis() > lastUsed+idleTimeout) ) { 175 hasExpired=true; 176 if( referenceCount == 0 ) { 177 close(); 178 } 179 return true; 180 } 181 return false; 182 } 183 184 public int getIdleTimeout() { 185 return idleTimeout; 186 } 187 188 public void setIdleTimeout(int idleTimeout) { 189 this.idleTimeout = idleTimeout; 190 } 191 192 protected XAResource createXaResource(PooledSession session) throws JMSException { 193 return session.getSession().getTransactionContext(); 194 } 195 196 protected class Synchronization implements javax.transaction.Synchronization { 197 private final PooledSession session; 198 199 private Synchronization(PooledSession session) { 200 this.session = session; 201 } 202 203 public void beforeCompletion() { 204 } 205 206 public void afterCompletion(int status) { 207 try { 208 session.setIgnoreClose(false); 210 session.close(); 211 decrementReferenceCount(); 212 } catch (JMSException e) { 213 throw new RuntimeException (e); 214 } 215 } 216 } 217 218 } 219 | Popular Tags |