1 package org.objectweb.celtix.bus.transports.jms; 2 3 import java.net.InetAddress ; 4 import java.net.UnknownHostException ; 5 import java.util.Calendar ; 6 import java.util.logging.Level ; 7 import java.util.logging.Logger ; 8 9 import javax.jms.Connection ; 10 import javax.jms.Destination ; 11 import javax.jms.JMSException ; 12 import javax.jms.MessageConsumer ; 13 import javax.jms.Queue ; 14 import javax.jms.QueueConnection ; 15 import javax.jms.QueueSession ; 16 import javax.jms.Session ; 17 import javax.jms.Topic ; 18 import javax.jms.TopicConnection ; 19 import javax.jms.TopicSession ; 20 import javax.jms.TopicSubscriber ; 21 import javax.naming.Context ; 22 import javax.naming.NamingException ; 23 24 import org.objectweb.celtix.common.logging.LogUtils; 25 import org.objectweb.celtix.common.util.AbstractTwoStageCache; 26 import org.objectweb.celtix.transports.jms.JMSAddressPolicyType; 27 import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType; 28 29 30 86 public class JMSSessionFactory { 87 88 private static final int CACHE_HIGH_WATER_MARK = 500; 89 private static final Logger LOG = LogUtils.getL7dLogger(JMSSessionFactory.class); 90 private static final int PRIMARY_CACHE_MAX = 20; 91 92 private final Context initialContext; 93 private final Connection theConnection; 94 private AbstractTwoStageCache<PooledSession> replyCapableSessionCache; 95 private AbstractTwoStageCache<PooledSession> sendOnlySessionCache; 96 private final Destination theReplyDestination; 97 private final boolean isQueueConnection; 98 99 private final JMSAddressPolicyType addressExtensor; 100 private final JMSServerBehaviorPolicyType jmsServerPolicy; 101 102 107 public JMSSessionFactory(Connection connection, 108 Destination replyDestination, 109 JMSAddressPolicyType addrExt, 110 JMSServerBehaviorPolicyType serverPolicy, 111 Context context) { 112 theConnection = connection; 113 theReplyDestination = replyDestination; 114 addressExtensor = addrExt; 115 isQueueConnection = addressExtensor.getDestinationStyle().value().equals(JMSConstants.JMS_QUEUE); 116 jmsServerPolicy = serverPolicy; 117 initialContext = context; 118 119 if (isQueueConnection) { 122 replyCapableSessionCache = 126 new AbstractTwoStageCache<PooledSession>( 127 PRIMARY_CACHE_MAX, 128 CACHE_HIGH_WATER_MARK, 129 0, 130 this) { 131 public final PooledSession create() throws JMSException { 132 return createPointToPointReplyCapableSession(); 133 } 134 }; 135 136 try { 137 replyCapableSessionCache.populateCache(); 138 } catch (Throwable t) { 139 LOG.log(Level.FINE, "JMS Session cache populate failed: " + t); 140 } 141 142 sendOnlySessionCache = 145 new AbstractTwoStageCache<PooledSession>( 146 PRIMARY_CACHE_MAX, 147 CACHE_HIGH_WATER_MARK, 148 0, 149 this) { 150 public final PooledSession create() throws JMSException { 151 return createPointToPointSendOnlySession(); 152 } 153 }; 154 155 try { 156 sendOnlySessionCache.populateCache(); 157 } catch (Throwable t) { 158 LOG.log(Level.FINE, "JMS Session cache populate failed: " + t); 159 } 160 } else { 161 sendOnlySessionCache = 164 new AbstractTwoStageCache<PooledSession>( 165 PRIMARY_CACHE_MAX, 166 CACHE_HIGH_WATER_MARK, 167 0, 168 this) { 169 public final PooledSession create() throws JMSException { 170 return createPubSubSession(true, false, null); 171 } 172 }; 173 174 try { 175 sendOnlySessionCache.populateCache(); 176 } catch (Throwable t) { 177 LOG.log(Level.FINE, "JMS Session cache populate failed: " + t); 178 } 179 } 180 } 181 182 public String toString() { 184 return "JMSSessionFactory"; 185 } 186 187 188 protected Connection getConnection() { 190 return theConnection; 191 } 192 193 public Queue getQueueFromInitialContext(String queueName) 194 throws NamingException { 195 return (Queue ) initialContext.lookup(queueName); 196 } 197 198 public PooledSession get(boolean replyCapable) throws JMSException { 199 return get(null, replyCapable); 200 } 201 202 209 public PooledSession get(Destination replyDest, boolean replyCapable) throws JMSException { 210 PooledSession ret = null; 211 212 synchronized (this) { 213 if (replyCapable) { 214 ret = replyCapableSessionCache.poll(); 217 218 if (ret == null) { 219 ret = sendOnlySessionCache.poll(); 223 224 if (ret != null) { 225 QueueSession session = (QueueSession )ret.session(); 226 Queue destination = null; 227 String selector = null; 228 229 if (null != theReplyDestination || null != replyDest) { 230 destination = null != replyDest ? (Queue ) replyDest : (Queue )theReplyDestination; 231 232 selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'"; 233 } 234 235 ret.destination(destination); 236 MessageConsumer consumer = session.createReceiver(destination, selector); 237 ret.consumer(consumer); 238 } else { 239 try { 243 ret = replyCapableSessionCache.get(); 244 } catch (Throwable t) { 245 throw (JMSException )t; 248 } 249 } 250 } 251 } else { 252 ret = sendOnlySessionCache.poll(); 255 256 if (ret == null) { 257 if (replyCapableSessionCache != null) { 262 ret = replyCapableSessionCache.poll(); 263 } 264 265 if (ret == null) { 266 try { 270 ret = sendOnlySessionCache.get(); 271 } catch (Throwable t) { 272 throw (JMSException )t; 275 } 276 } 277 } 278 } 279 } 280 281 return ret; 282 } 283 284 291 public PooledSession get(Destination destination) throws JMSException { 292 PooledSession ret = null; 293 294 if (isQueueConnection) { 298 ret = createPointToPointServerSession(destination); 299 } else { 300 ret = createPubSubSession(false, true, destination); 301 } 302 303 return ret; 304 } 305 306 311 public void recycle(PooledSession pooledSession) { 312 final boolean replyCapable = pooledSession.destination() != null; 315 boolean discard = false; 316 317 synchronized (this) { 318 discard = replyCapable ? (!replyCapableSessionCache.recycle(pooledSession)) 321 : (!sendOnlySessionCache.recycle(pooledSession)); 322 } 323 324 if (discard) { 325 try { 326 pooledSession.close(); 327 } catch (JMSException e) { 328 LOG.log(Level.WARNING, "JMS Session discard failed: " + e); 329 } 330 } 331 } 332 333 334 337 public void shutdown() { 338 try { 339 PooledSession curr; 340 341 if (replyCapableSessionCache != null) { 342 curr = replyCapableSessionCache.poll(); 343 while (curr != null) { 344 curr.close(); 345 curr = replyCapableSessionCache.poll(); 346 } 347 } 348 349 if (sendOnlySessionCache != null) { 350 curr = sendOnlySessionCache.poll(); 351 while (curr != null) { 352 curr.close(); 353 curr = sendOnlySessionCache.poll(); 354 } 355 } 356 357 theConnection.close(); 358 } catch (JMSException e) { 359 LOG.log(Level.WARNING, "queue connection close failed: " + e); 360 } 361 362 replyCapableSessionCache = null; 365 sendOnlySessionCache = null; 366 } 367 368 369 377 PooledSession createPointToPointReplyCapableSession() throws JMSException { 378 QueueSession session = 379 ((QueueConnection )theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 380 Destination destination = null; 381 String selector = null; 382 383 if (null != theReplyDestination) { 384 destination = theReplyDestination; 385 386 selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'"; 387 388 389 } else { 390 destination = session.createTemporaryQueue(); 391 } 392 393 MessageConsumer consumer = session.createReceiver((Queue )destination, selector); 394 return new PooledSession(session, 395 destination, 396 session.createSender(null), 397 consumer); 398 } 399 400 401 406 PooledSession createPointToPointSendOnlySession() throws JMSException { 407 QueueSession session = 408 ((QueueConnection )theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 409 410 return new PooledSession(session, null, session.createSender(null), null); 411 } 412 413 414 420 private PooledSession createPointToPointServerSession(Destination destination) throws JMSException { 421 QueueSession session = 422 ((QueueConnection )theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 423 424 425 return new PooledSession(session, destination, session.createSender(null), 426 session.createReceiver((Queue )destination, 427 jmsServerPolicy.getMessageSelector())); 428 } 429 430 431 439 PooledSession createPubSubSession(boolean producer, 440 boolean consumer, 441 Destination destination) throws JMSException { 442 TopicSession session = ((TopicConnection )theConnection).createTopicSession(false, 443 Session.AUTO_ACKNOWLEDGE); 444 TopicSubscriber sub = null; 445 if (consumer) { 446 String messageSelector = jmsServerPolicy.getMessageSelector(); 447 String durableName = jmsServerPolicy.getDurableSubscriberName(); 448 if (durableName != null) { 449 sub = session.createDurableSubscriber((Topic )destination, 450 durableName, 451 messageSelector, 452 false); 453 } else { 454 sub = session.createSubscriber((Topic )destination, 455 messageSelector, 456 false); 457 } 458 } 459 460 return new PooledSession(session, 461 null, 462 producer ? session.createPublisher(null) : null, 463 sub); 464 } 465 466 private String generateUniqueSelector(Object obj) { 467 String host = "localhost"; 468 469 try { 470 InetAddress addr = InetAddress.getLocalHost(); 471 host = addr.getHostName(); 472 } catch (UnknownHostException ukex) { 473 } 475 476 long time = Calendar.getInstance().getTimeInMillis(); 477 return host + "_" 478 + System.getProperty("user.name") + "_" 479 + obj + time; 480 } 481 } 482 | Popular Tags |