1 46 47 package org.mr.ra.inbound; 48 49 import java.util.ArrayList ; 50 import java.util.Iterator ; 51 import java.util.LinkedList ; 52 53 import javax.jms.JMSException ; 54 import javax.jms.ServerSession ; 55 import javax.jms.ServerSessionPool ; 56 import javax.jms.Session ; 57 import javax.resource.spi.UnavailableException ; 58 import javax.resource.spi.endpoint.MessageEndpoint ; 59 60 import org.apache.commons.logging.Log; 61 import org.apache.commons.logging.LogFactory; 62 import org.mr.api.jms.MantaSession; 63 import org.mr.ra.LocalAndXATransaction; 64 65 68 public class ServerSessionPoolImpl implements ServerSessionPool { 69 70 private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class); 71 72 private final MantaAsfEndpointWorker mantaAsfEndpointWorker; 73 private final int maxSessions; 74 75 private ArrayList idleSessions = new ArrayList (); 76 private LinkedList activeSessions = new LinkedList (); 77 private boolean closing = false; 78 79 84 public ServerSessionPoolImpl(MantaAsfEndpointWorker mantaAsfEndpointWorker, int maxSessions) { 85 this.mantaAsfEndpointWorker = mantaAsfEndpointWorker; 86 this.maxSessions = maxSessions; 87 } 88 89 private ServerSessionImpl createServerSessionImpl() throws JMSException { 90 ActivationSpecImpl activationSpec = mantaAsfEndpointWorker.endpointActivationKey.getActivationSpec(); 91 92 final MantaSession session; 93 if (mantaAsfEndpointWorker.transacted) { 94 session = (MantaSession) mantaAsfEndpointWorker.connection.createXASession(); 95 } 96 else { 97 int acknowledge = activationSpec.getAcknowledgeModeForSession(); 98 session = (MantaSession) mantaAsfEndpointWorker.connection.createSession(false, acknowledge); 99 } 100 MessageEndpoint endpoint; 104 try { 105 int batchSize = 0; 106 if (activationSpec.getEnableBatchBooleanValue()) { 107 batchSize = activationSpec.getMaxMessagesPerBatchIntValue(); 108 } 109 if (activationSpec.isUseRAManagedTransactionEnabled()) { 110 endpoint = createEndpoint(null); 112 return new ServerSessionImpl(this, 113 (MantaSession)session, 114 mantaAsfEndpointWorker.workManager, 115 endpoint, 116 true, 117 batchSize); 118 } else { 119 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); 121 return new ServerSessionImpl(this, 122 (MantaSession)session, 123 mantaAsfEndpointWorker.workManager, 124 endpoint, 125 false, 126 batchSize); 127 } 128 } catch (UnavailableException e) { 136 log.error("createServerSessionImpl(): "+e); 139 session.close(); 140 return null; 141 } 142 } 143 144 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException { 145 MessageEndpoint endpoint; 146 endpoint = mantaAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy); 147 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint); 148 return endpointProxy; 149 } 150 157 160 synchronized public ServerSession getServerSession() throws JMSException { 161 if (closing) { 162 throw new JMSException ("Session Pool Shutting Down."); 163 } 164 165 if (idleSessions.size() > 0) { 166 return getExistingIdleServerSession(); 167 } 168 else { 169 if (activeSessions.size() >= maxSessions) { 171 return getExistingServerSession(); 175 } 176 ServerSessionImpl ss = createServerSessionImpl(); 177 if (ss == null) { 180 return getExistingServerSession(); 181 } 182 activeSessions.addLast(ss); 183 log.debug("Created a new session: " + ss); 184 return ss; 185 } 186 } 187 188 192 219 220 223 private ServerSession getExistingIdleServerSession() { 224 ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1); 225 activeSessions.addLast(ss); 226 log.debug("Using idle session: " + ss); 227 return ss; 228 } 229 230 233 private ServerSession getExistingServerSession() { 234 ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst(); 235 activeSessions.addLast(ss); 236 log.debug("Reusing an active session: " + ss); 237 return ss; 238 } 239 240 synchronized public void returnToPool(ServerSessionImpl ss) { 241 log.debug("Session returned to pool: " + ss); 242 activeSessions.remove(ss); 243 idleSessions.add(ss); 244 notify(); 245 } 246 247 synchronized public void removeFromPool(ServerSessionImpl ss) { 248 activeSessions.remove(ss); 249 259 ss.close(); 260 notify(); 261 } 262 263 public void close() { 264 synchronized (this) { 265 closing = true; 266 while( activeSessions.size() > 0 ) { 268 try { 269 wait(); 270 } catch (InterruptedException e) { 271 Thread.currentThread().interrupt(); 272 return; 273 } 274 } 276 closeIdleSessions(); 277 } 278 } 279 280 private void closeIdleSessions() { 281 for (Iterator iter = idleSessions.iterator(); iter.hasNext();) { 282 ServerSessionImpl ss = (ServerSessionImpl) iter.next(); 283 ss.close(); 284 } 285 } 286 287 } 288 | Popular Tags |