1 22 package org.jboss.jms.asf; 23 24 import java.util.ArrayList ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 28 import javax.jms.Connection ; 29 import javax.jms.Destination ; 30 import javax.jms.JMSException ; 31 import javax.jms.MessageListener ; 32 import javax.jms.Queue ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.ServerSession ; 35 import javax.jms.ServerSessionPool ; 36 import javax.jms.Session ; 37 import javax.jms.Topic ; 38 import javax.jms.TopicConnection ; 39 import javax.jms.XAQueueConnection ; 40 import javax.jms.XAQueueSession ; 41 import javax.jms.XASession ; 42 import javax.jms.XATopicConnection ; 43 import javax.jms.XATopicSession ; 44 import javax.transaction.TransactionManager ; 45 46 import org.jboss.logging.Logger; 47 import org.jboss.tm.XidFactoryMBean; 48 49 import EDU.oswego.cs.dl.util.concurrent.Executor; 50 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 51 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 52 53 61 public class StdServerSessionPool implements ServerSessionPool 62 { 63 64 private static ThreadGroup threadGroup = new ThreadGroup ("ASF Session Pool Threads"); 65 66 67 private final Logger log = Logger.getLogger(this.getClass()); 68 69 70 private int minSize; 71 72 73 private int poolSize; 74 75 76 private int ack; 77 78 79 private boolean useLocalTX; 80 81 82 private boolean transacted; 83 84 85 private Destination destination; 86 87 88 private Connection con; 89 90 91 private MessageListener listener; 92 93 94 private List sessionPool; 95 96 97 private PooledExecutor executor; 98 99 100 private boolean closing = false; 101 102 103 private int numServerSessions = 0; 104 105 private XidFactoryMBean xidFactory; 106 107 private TransactionManager tm; 108 109 124 public StdServerSessionPool(final Destination destination, 125 final Connection con, 126 final boolean transacted, 127 final int ack, 128 final boolean useLocalTX, 129 final MessageListener listener, 130 final int minSession, 131 final int maxSession, 132 final long keepAlive, 133 final XidFactoryMBean xidFactory, 134 final TransactionManager tm) 135 throws JMSException 136 { 137 this.destination = destination; 138 this.con = con; 139 this.ack = ack; 140 this.listener = listener; 141 this.transacted = transacted; 142 this.minSize = minSession; 143 this.poolSize = maxSession; 144 this.sessionPool = new ArrayList (maxSession); 145 this.useLocalTX = useLocalTX; 146 this.xidFactory = xidFactory; 147 this.tm = tm; 148 executor = new MyPooledExecutor(poolSize); 150 executor.setMinimumPoolSize(minSize); 151 executor.setKeepAliveTime(keepAlive); 152 executor.waitWhenBlocked(); 153 executor.setThreadFactory(new DefaultThreadFactory()); 154 155 create(); 157 log.debug("Server Session pool set up"); 158 } 159 160 166 public ServerSession getServerSession() throws JMSException 167 { 168 if( log.isTraceEnabled() ) 169 log.trace("getting a server session"); 170 ServerSession session = null; 171 172 try 173 { 174 while (true) 175 { 176 synchronized (sessionPool) 177 { 178 if (closing) 179 { 180 throw new JMSException ("Cannot get session after pool has been closed down."); 181 } 182 else if (sessionPool.size() > 0) 183 { 184 session = (ServerSession )sessionPool.remove(0); 185 break; 186 } 187 else 188 { 189 try 190 { 191 sessionPool.wait(); 192 } 193 catch (InterruptedException ignore) 194 { 195 } 196 } 197 } 198 } 199 } 200 catch (Exception e) 201 { 202 throw new JMSException ("Failed to get a server session: " + e); 203 } 204 205 if( log.isTraceEnabled() ) 206 log.trace("using server session: " + session); 207 return session; 208 } 209 210 214 public void clear() 215 { 216 synchronized (sessionPool) 217 { 218 closing = true; 223 224 log.debug("Clearing " + sessionPool.size() + " from ServerSessionPool"); 225 226 Iterator iter = sessionPool.iterator(); 227 while (iter.hasNext()) 228 { 229 StdServerSession ses = (StdServerSession)iter.next(); 230 ses.close(); 232 numServerSessions--; 233 } 234 235 sessionPool.clear(); 236 sessionPool.notifyAll(); 237 } 238 239 executor.shutdownAfterProcessingCurrentlyQueuedTasks(); 241 242 synchronized (sessionPool) 244 { 245 while (numServerSessions > 0) 246 { 247 try 248 { 249 sessionPool.wait(); 250 } 251 catch (InterruptedException ignore) 252 { 253 } 254 } 255 } 256 } 257 258 263 Executor getExecutor() 264 { 265 return executor; 266 } 267 268 270 275 boolean isTransacted() 276 { 277 return transacted; 278 } 279 280 285 void recycle(StdServerSession session) 286 { 287 synchronized (sessionPool) 288 { 289 if (closing) 290 { 291 session.close(); 292 numServerSessions--; 293 if (numServerSessions == 0) 294 { 295 sessionPool.notifyAll(); 297 } 298 } 299 else 300 { 301 sessionPool.add(session); 302 sessionPool.notifyAll(); 303 if( log.isTraceEnabled() ) 304 log.trace("recycled server session: " + session); 305 } 306 } 307 } 308 309 private void create() throws JMSException 310 { 311 for (int index = 0; index < poolSize; index++) 312 { 313 Session ses = null; 315 XASession xaSes = null; 316 317 log.debug("initializing with connection: " + con); 318 319 if (destination instanceof Topic && con instanceof XATopicConnection ) 320 { 321 xaSes = ((XATopicConnection )con).createXATopicSession(); 322 ses = ((XATopicSession )xaSes).getTopicSession(); 323 } 324 else if (destination instanceof Queue && con instanceof XAQueueConnection ) 325 { 326 xaSes = ((XAQueueConnection )con).createXAQueueSession(); 327 ses = ((XAQueueSession )xaSes).getQueueSession(); 328 } 329 else if (destination instanceof Topic && con instanceof TopicConnection ) 330 { 331 ses = ((TopicConnection )con).createTopicSession(transacted, ack); 332 log.warn("Using a non-XA TopicConnection. " + 333 "It will not be able to participate in a Global UOW"); 334 } 335 else if (destination instanceof Queue && con instanceof QueueConnection ) 336 { 337 ses = ((QueueConnection )con).createQueueSession(transacted, ack); 338 log.warn("Using a non-XA QueueConnection. " + 339 "It will not be able to participate in a Global UOW"); 340 } 341 else 342 { 343 throw new JMSException ("Connection was not reconizable: " + con + " for destination " + destination); 344 } 345 346 StdServerSession serverSession = new StdServerSession(this, ses, xaSes, 349 listener, useLocalTX, xidFactory, tm); 350 351 sessionPool.add(serverSession); 352 numServerSessions++; 353 354 log.debug("added server session to the pool: " + serverSession); 355 } 356 } 357 358 362 private static class MyPooledExecutor extends PooledExecutor 363 { 364 public MyPooledExecutor(int poolSize) 365 { 366 super(poolSize); 367 } 368 369 protected Runnable getTask() throws InterruptedException 370 { 371 Runnable task = null; 372 while ((task = super.getTask()) == null && keepRunning()); 373 return task; 374 } 375 376 382 protected synchronized boolean keepRunning() 383 { 384 if (shutdown_) 385 return false; 386 387 return poolSize_ <= minimumPoolSize_; 388 } 389 } 390 391 private static class DefaultThreadFactory implements ThreadFactory 392 { 393 private static int count = 0; 394 private static synchronized int nextCount() 395 { 396 return count ++; 397 } 398 399 405 public Thread newThread(final Runnable command) 406 { 407 String name = "JMS SessionPool Worker-" + nextCount(); 408 Thread thread = new Thread (threadGroup, command, name); 409 thread.setDaemon(true); 410 return thread; 411 } 412 } 413 } 414 | Popular Tags |