1 22 package org.jboss.jms.asf; 23 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageListener ; 27 import javax.jms.ServerSession ; 28 import javax.jms.Session ; 29 import javax.jms.XASession ; 30 import javax.naming.InitialContext ; 31 import javax.transaction.Status ; 32 import javax.transaction.Transaction ; 33 import javax.transaction.TransactionManager ; 34 import javax.transaction.xa.XAResource ; 35 import javax.transaction.xa.Xid ; 36 import org.jboss.logging.Logger; 37 import org.jboss.tm.TransactionManagerService; 38 import org.jboss.tm.XidFactoryMBean; 39 40 49 public class StdServerSession implements Runnable , ServerSession , MessageListener 50 { 51 52 static Logger log = Logger.getLogger(StdServerSession.class); 53 54 55 private StdServerSessionPool serverSessionPool; 56 57 58 private Session session; 59 60 61 private XASession xaSession; 62 63 64 private TransactionManager tm; 65 66 71 private boolean useLocalTX; 72 73 74 private MessageListener delegateListener; 75 76 private XidFactoryMBean xidFactory; 77 78 82 public TransactionManager getTransactionManager() 83 { 84 return tm; 85 } 86 87 91 public void setTransactionManager(TransactionManager transactionManager) 92 { 93 this.tm = transactionManager; 94 } 95 96 106 StdServerSession(final StdServerSessionPool pool, 107 final Session session, 108 final XASession xaSession, 109 final MessageListener delegateListener, 110 boolean useLocalTX, 111 final XidFactoryMBean xidFactory, 112 final TransactionManager tm) 113 throws JMSException 114 { 115 this.serverSessionPool = pool; 116 this.session = session; 117 this.xaSession = xaSession; 118 this.delegateListener = delegateListener; 119 if (xaSession == null) 120 useLocalTX = false; 121 this.useLocalTX = useLocalTX; 122 this.xidFactory = xidFactory; 123 this.tm = tm; 124 125 log.trace(this + " initializing (pool, session, xaSession, useLocalTX): " + 126 pool + ", " + session + ", " + xaSession + ", " + useLocalTX); 127 128 if (StdServerSessionPoolFactory.USE_OLD && xaSession != null) 130 xaSession.setMessageListener(this); 131 else 132 session.setMessageListener(this); 133 134 if (tm == null) 135 { 136 InitialContext ctx = null; 137 try 138 { 139 ctx = new InitialContext (); 140 this.tm = (TransactionManager ) ctx.lookup(TransactionManagerService.JNDI_NAME); 141 } 142 catch (Exception e) 143 { 144 throw new JMSException ("Transation manager was not found"); 145 } 146 finally 147 { 148 if (ctx != null) 149 { 150 try 151 { 152 ctx.close(); 153 } 154 catch (Exception ignore) 155 { 156 } 157 } 158 } 159 } 160 } 161 162 172 public Session getSession() throws JMSException 173 { 174 if (StdServerSessionPoolFactory.USE_OLD && xaSession != null) 175 return xaSession; 176 else 177 return session; 178 } 179 180 186 public void run() 187 { 188 boolean trace = log.isTraceEnabled(); 189 190 TransactionDemarcation td = null; 191 if (StdServerSessionPoolFactory.USE_OLD == false) 192 { 193 td = createTransactionDemarcation(); 194 if (td == null) 195 return; 196 } 197 try 198 { 199 if (trace) 200 log.trace(this + " running..."); 201 202 if (StdServerSessionPoolFactory.USE_OLD && xaSession != null) 203 xaSession.run(); 204 else 205 session.run(); 206 207 if (trace) 208 log.trace(this + " run."); 209 } 210 catch (Throwable t) 211 { 212 log.error(this + " onMessage failed to run; setting rollback only", t); 213 if (td != null) 214 td.error(); 215 } 216 finally 217 { 218 if (td != null) 219 td.end(); 220 221 recycle(); 222 } 223 } 224 225 237 public void onMessage(Message msg) 238 { 239 boolean trace = log.isTraceEnabled(); 240 241 TransactionDemarcation td = null; 242 if (StdServerSessionPoolFactory.USE_OLD) 243 { 244 td = createTransactionDemarcation(); 245 if (td == null) 246 return; 247 } 248 try 249 { 250 if (trace) 251 log.trace(this + " onMessage running (pool, session, xaSession, useLocalTX): " + 252 ", " + session + ", " + xaSession + ", " + useLocalTX); 253 254 delegateListener.onMessage(msg); 256 257 if (trace) 258 log.trace(this + " onMessage finished"); 259 } 260 catch (Throwable t) 261 { 262 log.error(this + " onMessage failed to run; setting rollback only", t); 263 if (td != null) 264 td.error(); 265 } 266 finally 267 { 268 if (td != null) 269 td.end(); 270 } 271 if (trace) 272 log.trace(this + " onMessage done"); 273 } 274 275 280 public void start() throws JMSException 281 { 282 log.trace(this + " starting invokes on server session"); 283 284 if (session != null) 285 { 286 try 287 { 288 serverSessionPool.getExecutor().execute(this); 289 } 290 catch (InterruptedException ignore) 291 { 292 } 293 } 294 else 295 { 296 throw new JMSException (this + " no listener has been specified"); 297 } 298 } 299 300 303 void close() 304 { 305 log.trace(this + " closing."); 306 307 if (session != null) 308 { 309 try 310 { 311 session.close(); 312 } 313 catch (Exception ignore) 314 { 315 } 316 317 session = null; 318 } 319 320 if (xaSession != null) 321 { 322 try 323 { 324 xaSession.close(); 325 } 326 catch (Exception ignore) 327 { 328 } 329 xaSession = null; 330 } 331 332 log.debug("closed"); 333 } 334 335 339 void recycle() 340 { 341 boolean trace = log.isTraceEnabled(); 342 if (trace) 343 log.trace(this + " recycling"); 344 serverSessionPool.recycle(this); 345 if (trace) 346 log.trace(this + " recycled"); 347 } 348 349 TransactionDemarcation createTransactionDemarcation() 350 { 351 try 352 { 353 return new TransactionDemarcation(); 354 } 355 catch (Throwable t) 356 { 357 log.error(this + " error creating transaction demarcation ", t); 358 return null; 359 } 360 } 361 362 private class TransactionDemarcation 363 { 364 boolean trace = log.isTraceEnabled(); 365 366 Xid localXid = null; 368 boolean localRollbackFlag = false; 369 Transaction trans = null; 371 372 public TransactionDemarcation() throws Throwable 373 { 374 if (useLocalTX) 375 { 376 localXid = xidFactory.newXid(); XAResource res = xaSession.getXAResource(); 379 res.start(localXid, XAResource.TMNOFLAGS); 380 381 if (trace) 382 log.trace(StdServerSession.this + " using optimized 1p commit to control TX. xid=" + localXid); 383 } 384 else 385 { 386 387 tm.begin(); 389 try 390 { 391 trans = tm.getTransaction(); 392 393 if (trace) 394 log.trace(StdServerSession.this + " using tx=" + trans); 395 396 if (xaSession != null) 397 { 398 XAResource res = xaSession.getXAResource(); 399 if (!trans.enlistResource(res)) 400 { 401 throw new JMSException ("could not enlist resource"); 402 } 403 if (trace) 404 log.trace(StdServerSession.this + " XAResource '" + res + "' enlisted."); 405 } 406 } 407 catch (Throwable t) 408 { 409 try 410 { 411 tm.rollback(); 412 } 413 catch (Throwable ignored) 414 { 415 log.trace(StdServerSession.this + " ignored error rolling back after failed enlist", ignored); 416 } 417 throw t; 418 } 419 } 420 } 421 422 public void error() 423 { 424 if (useLocalTX) 425 { 426 localRollbackFlag = true; 428 } 429 else 430 { 431 try 433 { 434 if (trace) 436 log.trace(StdServerSession.this + " using TM to mark TX for rollback tx=" + trans); 437 trans.setRollbackOnly(); 438 } 439 catch (Throwable t) 440 { 441 log.error(StdServerSession.this + " failed to set rollback only", t); 442 } 443 } 444 } 445 446 public void end() 447 { 448 try 449 { 450 if (useLocalTX) 451 { 452 if (localRollbackFlag == true) 453 { 454 if (trace) 455 log.trace(StdServerSession.this + " using optimized 1p commit to rollback TX xid=" + localXid); 456 457 XAResource res = xaSession.getXAResource(); 458 res.end(localXid, XAResource.TMSUCCESS); 459 res.rollback(localXid); 460 461 } 462 else 463 { 464 if (trace) 465 log.trace(StdServerSession.this + " using optimized 1p commit to commit TX xid=" + localXid); 466 467 XAResource res = xaSession.getXAResource(); 468 res.end(localXid, XAResource.TMSUCCESS); 469 res.commit(localXid, true); 470 } 471 } 472 else 473 { 474 Transaction currentTx = tm.getTransaction(); 476 if (trans.equals(currentTx) == false) 477 throw new IllegalStateException ("Wrong tx association: expected " + trans + " was " + currentTx); 478 479 if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) 481 { 482 if (trace) 483 log.trace(StdServerSession.this + " rolling back JMS transaction tx=" + trans); 484 tm.rollback(); 486 487 if (xaSession == null && serverSessionPool.isTransacted()) 491 { 492 session.rollback(); 493 } 494 } 495 else if (trans.getStatus() == Status.STATUS_ACTIVE) 496 { 497 if (trace) 502 log.trace(StdServerSession.this + " commiting the JMS transaction tx=" + trans); 503 tm.commit(); 504 505 if (xaSession == null && serverSessionPool.isTransacted()) 508 { 509 session.commit(); 510 } 511 } 512 } 513 } 514 catch (Throwable t) 515 { 516 log.error(StdServerSession.this + " failed to commit/rollback", t); 517 } 518 } 519 } 520 } 521 | Popular Tags |