1 18 package org.apache.activemq.ra; 19 20 import java.util.Iterator ; 21 import java.util.List ; 22 23 import javax.jms.JMSException ; 24 import javax.jms.ServerSession ; 25 import javax.jms.ServerSessionPool ; 26 import javax.jms.Session ; 27 import javax.resource.spi.UnavailableException ; 28 import javax.resource.spi.endpoint.MessageEndpoint ; 29 30 import org.apache.activemq.ActiveMQQueueSession; 31 import org.apache.activemq.ActiveMQSession; 32 import org.apache.activemq.ActiveMQTopicSession; 33 import org.apache.activemq.command.MessageDispatch; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import java.util.concurrent.CopyOnWriteArrayList ; 37 import java.util.concurrent.atomic.AtomicBoolean ; 38 39 42 public class ServerSessionPoolImpl implements ServerSessionPool { 43 44 private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class); 45 46 private final ActiveMQEndpointWorker activeMQAsfEndpointWorker; 47 private final int maxSessions; 48 49 private List idleSessions = new CopyOnWriteArrayList (); 50 private List activeSessions = new CopyOnWriteArrayList (); 51 private AtomicBoolean closing = new AtomicBoolean (false); 52 53 public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { 54 this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; 55 this.maxSessions=maxSessions; 56 } 57 58 private ServerSessionImpl createServerSessionImpl() throws JMSException { 59 MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); 60 int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); 61 final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge); 62 MessageEndpoint endpoint; 63 try { 64 int batchSize = 0; 65 if (activationSpec.getEnableBatchBooleanValue()) { 66 batchSize = activationSpec.getMaxMessagesPerBatchIntValue(); 67 } 68 if( activationSpec.isUseRAManagedTransactionEnabled() ) { 69 endpoint = createEndpoint(null); 71 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); 72 } else { 73 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); 75 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); 76 } 77 } catch (UnavailableException e) { 78 log.debug("Could not create an endpoint.", e); 81 session.close(); 82 return null; 83 } 84 } 85 86 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException { 87 MessageEndpoint endpoint; 88 endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy); 89 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint); 90 return endpointProxy; 91 } 92 93 95 public ServerSession getServerSession() throws JMSException { 96 log.debug("ServerSession requested."); 97 if (closing.get()) { 98 throw new JMSException ("Session Pool Shutting Down."); 99 } 100 101 if (idleSessions.size() > 0) { 102 ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1); 103 activeSessions.add(ss); 104 log.debug("Using idle session: " + ss); 105 return ss; 106 } else { 107 if (activeSessions.size() >= maxSessions) { 109 return getExistingServerSession(); 113 } 114 ServerSessionImpl ss = createServerSessionImpl(); 115 if (ss == null) { 118 if (idleSessions.size() == 0) { 119 throw new JMSException ("Endpoint factory did not allows to any endpoints."); 120 } 121 122 return getExistingServerSession(); 123 } 124 activeSessions.add(ss); 125 log.debug("Created a new session: " + ss); 126 return ss; 127 } 128 } 129 130 134 private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException { 135 136 ServerSession serverSession = getServerSession(); 137 Session s = serverSession.getSession(); 138 ActiveMQSession session = null; 139 if( s instanceof ActiveMQSession ) { 140 session = (ActiveMQSession) s; 141 } else if(s instanceof ActiveMQQueueSession) { 142 session = (ActiveMQSession) s; 143 } else if(s instanceof ActiveMQTopicSession) { 144 session = (ActiveMQSession) s; 145 } else { 146 activeMQAsfEndpointWorker.connection.onAsyncException(new JMSException ("Session pool provided an invalid session type: "+s.getClass())); 147 } 148 session.dispatch(messageDispatch); 149 serverSession.start(); 150 } 151 152 153 156 private ServerSession getExistingServerSession() { 157 ServerSessionImpl ss = (ServerSessionImpl) activeSessions.remove(0); 158 activeSessions.add(ss); 159 log.debug("Reusing an active session: " + ss); 160 return ss; 161 } 162 163 public void returnToPool(ServerSessionImpl ss) { 164 log.debug("Session returned to pool: " + ss); 165 activeSessions.remove(ss); 166 idleSessions.add(ss); 167 synchronized(closing){ 168 closing.notify(); 169 } 170 } 171 172 public void removeFromPool(ServerSessionImpl ss) { 173 activeSessions.remove(ss); 174 try { 175 ActiveMQSession session = (ActiveMQSession) ss.getSession(); 176 List l = session.getUnconsumedMessages(); 177 for (Iterator i = l.iterator(); i.hasNext();) { 178 dispatchToSession((MessageDispatch) i.next()); 179 } 180 } catch (Throwable t) { 181 log.error("Error redispatching unconsumed messages from stale session", t); 182 } 183 ss.close(); 184 synchronized(closing){ 185 closing.notify(); 186 } 187 } 188 189 public void close() { 190 synchronized (closing) { 191 closing.set(true); 192 closeIdleSessions(); 193 while( activeSessions.size() > 0 ) { 194 System.out.println("ACtive Sessions = " + activeSessions.size()); 195 try { 196 closing.wait(1000); 197 } catch (InterruptedException e) { 198 Thread.currentThread().interrupt(); 199 return; 200 } 201 closeIdleSessions(); 202 } 203 } 204 } 205 206 private void closeIdleSessions() { 207 for (Iterator iter = idleSessions.iterator(); iter.hasNext();) { 208 ServerSessionImpl ss = (ServerSessionImpl) iter.next(); 209 ss.close(); 210 } 211 } 212 213 216 public boolean isClosing(){ 217 return closing.get(); 218 } 219 220 223 public void setClosing(boolean closing){ 224 this.closing.set(closing); 225 } 226 227 } 228 | Popular Tags |