1 16 17 package org.springframework.jms.connection; 18 19 import javax.jms.Connection ; 20 import javax.jms.ConnectionFactory ; 21 import javax.jms.JMSException ; 22 import javax.jms.QueueConnection ; 23 import javax.jms.QueueConnectionFactory ; 24 import javax.jms.QueueSession ; 25 import javax.jms.Session ; 26 import javax.jms.TopicConnection ; 27 import javax.jms.TopicConnectionFactory ; 28 import javax.jms.TopicSession ; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 import org.springframework.transaction.support.TransactionSynchronizationAdapter; 34 import org.springframework.transaction.support.TransactionSynchronizationManager; 35 import org.springframework.util.Assert; 36 37 49 public abstract class ConnectionFactoryUtils { 50 51 private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class); 52 53 54 67 public static void releaseConnection(Connection con, ConnectionFactory cf, boolean started) { 68 if (con == null) { 69 return; 70 } 71 if (started && cf instanceof SmartConnectionFactory && ((SmartConnectionFactory) cf).shouldStop(con)) { 72 try { 73 con.stop(); 74 } 75 catch (Throwable ex) { 76 logger.debug("Could not stop JMS Connection before closing it", ex); 77 } 78 } 79 try { 80 con.close(); 81 } 82 catch (Throwable ex) { 83 logger.debug("Could not close JMS Connection", ex); 84 } 85 } 86 87 94 public static boolean isSessionTransactional(Session session, ConnectionFactory cf) { 95 if (session == null || cf == null) { 96 return false; 97 } 98 JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(cf); 99 return (resourceHolder != null && resourceHolder.containsSession(session)); 100 } 101 102 103 116 public static Session getTransactionalSession( 117 final ConnectionFactory cf, final Connection existingCon, final boolean synchedLocalTransactionAllowed) 118 throws JMSException { 119 120 return doGetTransactionalSession(cf, new ResourceFactory() { 121 public Session getSession(JmsResourceHolder holder) { 122 return holder.getSession(Session .class, existingCon); 123 } 124 public Connection getConnection(JmsResourceHolder holder) { 125 return (existingCon != null ? existingCon : holder.getConnection()); 126 } 127 public Connection createConnection() throws JMSException { 128 return cf.createConnection(); 129 } 130 public Session createSession(Connection con) throws JMSException { 131 return con.createSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 132 } 133 public boolean isSynchedLocalTransactionAllowed() { 134 return synchedLocalTransactionAllowed; 135 } 136 }); 137 } 138 139 153 public static QueueSession getTransactionalQueueSession( 154 final QueueConnectionFactory cf, final QueueConnection existingCon, final boolean synchedLocalTransactionAllowed) 155 throws JMSException { 156 157 return (QueueSession ) doGetTransactionalSession(cf, new ResourceFactory() { 158 public Session getSession(JmsResourceHolder holder) { 159 return holder.getSession(QueueSession .class, existingCon); 160 } 161 public Connection getConnection(JmsResourceHolder holder) { 162 return (existingCon != null ? existingCon : holder.getConnection(QueueConnection .class)); 163 } 164 public Connection createConnection() throws JMSException { 165 return cf.createQueueConnection(); 166 } 167 public Session createSession(Connection con) throws JMSException { 168 return ((QueueConnection ) con).createQueueSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 169 } 170 public boolean isSynchedLocalTransactionAllowed() { 171 return synchedLocalTransactionAllowed; 172 } 173 }); 174 } 175 176 190 public static TopicSession getTransactionalTopicSession( 191 final TopicConnectionFactory cf, final TopicConnection existingCon, final boolean synchedLocalTransactionAllowed) 192 throws JMSException { 193 194 return (TopicSession ) doGetTransactionalSession(cf, new ResourceFactory() { 195 public Session getSession(JmsResourceHolder holder) { 196 return holder.getSession(TopicSession .class, existingCon); 197 } 198 public Connection getConnection(JmsResourceHolder holder) { 199 return (existingCon != null ? existingCon : holder.getConnection(TopicConnection .class)); 200 } 201 public Connection createConnection() throws JMSException { 202 return cf.createTopicConnection(); 203 } 204 public Session createSession(Connection con) throws JMSException { 205 return ((TopicConnection ) con).createTopicSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 206 } 207 public boolean isSynchedLocalTransactionAllowed() { 208 return synchedLocalTransactionAllowed; 209 } 210 }); 211 } 212 213 222 public static Session doGetTransactionalSession( 223 ConnectionFactory connectionFactory, ResourceFactory resourceFactory) 224 throws JMSException { 225 226 Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); 227 Assert.notNull(resourceFactory, "ResourceFactory must not be null"); 228 229 JmsResourceHolder resourceHolder = 230 (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 231 if (resourceHolder != null) { 232 Session session = resourceFactory.getSession(resourceHolder); 233 if (session != null || resourceHolder.isFrozen()) { 234 return session; 235 } 236 } 237 if (!TransactionSynchronizationManager.isSynchronizationActive()) { 238 return null; 239 } 240 JmsResourceHolder resourceHolderToUse = resourceHolder; 241 if (resourceHolderToUse == null) { 242 resourceHolderToUse = new JmsResourceHolder(connectionFactory); 243 } 244 Connection con = resourceFactory.getConnection(resourceHolderToUse); 245 Session session = null; 246 try { 247 boolean isExistingCon = (con != null); 248 if (!isExistingCon) { 249 con = resourceFactory.createConnection(); 250 resourceHolderToUse.addConnection(con); 251 } 252 session = resourceFactory.createSession(con); 253 resourceHolderToUse.addSession(session, con); 254 if (!isExistingCon) { 255 con.start(); 256 } 257 } 258 catch (JMSException ex) { 259 if (session != null) { 260 try { 261 session.close(); 262 } 263 catch (Throwable ex2) { 264 } 266 } 267 if (con != null) { 268 try { 269 con.close(); 270 } 271 catch (Throwable ex2) { 272 } 274 } 275 throw ex; 276 } 277 if (resourceHolderToUse != resourceHolder) { 278 TransactionSynchronizationManager.registerSynchronization( 279 new JmsResourceSynchronization( 280 connectionFactory, resourceHolderToUse, resourceFactory.isSynchedLocalTransactionAllowed())); 281 resourceHolderToUse.setSynchronizedWithTransaction(true); 282 TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse); 283 } 284 return session; 285 } 286 287 288 292 public interface ResourceFactory { 293 294 300 Session getSession(JmsResourceHolder holder); 301 302 308 Connection getConnection(JmsResourceHolder holder); 309 310 315 Connection createConnection() throws JMSException ; 316 317 323 Session createSession(Connection con) throws JMSException ; 324 325 332 boolean isSynchedLocalTransactionAllowed(); 333 } 334 335 336 341 private static class JmsResourceSynchronization extends TransactionSynchronizationAdapter { 342 343 private final Object resourceKey; 344 345 private final JmsResourceHolder resourceHolder; 346 347 private final boolean transacted; 348 349 private boolean holderActive = true; 350 351 public JmsResourceSynchronization(Object resourceKey, JmsResourceHolder resourceHolder, boolean transacted) { 352 this.resourceKey = resourceKey; 353 this.resourceHolder = resourceHolder; 354 this.transacted = transacted; 355 } 356 357 public void suspend() { 358 if (this.holderActive) { 359 TransactionSynchronizationManager.unbindResource(this.resourceKey); 360 } 361 } 362 363 public void resume() { 364 if (this.holderActive) { 365 TransactionSynchronizationManager.bindResource(this.resourceKey, this.resourceHolder); 366 } 367 } 368 369 public void beforeCompletion() { 370 TransactionSynchronizationManager.unbindResource(this.resourceKey); 371 this.holderActive = false; 372 if (!this.transacted) { 373 this.resourceHolder.closeAll(); 374 } 375 } 376 377 public void afterCommit() { 378 if (this.transacted) { 379 try { 380 this.resourceHolder.commitAll(); 381 } 382 catch (JMSException ex) { 383 throw new SynchedLocalTransactionFailedException("Local JMS transaction failed to commit", ex); 384 } 385 } 386 } 387 388 public void afterCompletion(int status) { 389 if (this.transacted) { 390 this.resourceHolder.closeAll(); 391 } 392 } 393 } 394 395 } 396 | Popular Tags |