1 22 package org.jboss.mq.pm; 23 24 import java.util.Iterator ; 25 import java.util.Map ; 26 import java.util.Set ; 27 28 import javax.jms.JMSException ; 29 import javax.transaction.xa.Xid ; 30 31 import org.jboss.logging.Logger; 32 import org.jboss.mq.ConnectionToken; 33 import org.jboss.mq.Recoverable; 34 import org.jboss.mq.SpyJMSException; 35 36 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 37 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 38 39 47 public class TxManager implements Recoverable 48 { 49 50 private static final Logger log = Logger.getLogger(TxManager.class); 51 52 53 PersistenceManager persistenceManager; 54 55 56 ConcurrentHashMap globalToLocal = new ConcurrentHashMap(); 57 58 59 ConcurrentHashMap prepared = new ConcurrentHashMap(); 60 61 66 public TxManager(PersistenceManager pm) 67 { 68 persistenceManager = pm; 69 } 70 71 80 public final Tx getPrepared(ConnectionToken dc, Object xid) throws JMSException 81 { 82 GlobalXID gxid = new GlobalXID(dc, xid); 83 Tx txid = (Tx) globalToLocal.get(gxid); 84 if (txid == null) 85 throw new SpyJMSException("Transaction does not exist from: " + dc.getClientID() + " xid=" + xid); 86 87 return txid; 88 } 89 90 96 public final Tx createTx() throws JMSException 97 { 98 Tx txId = persistenceManager.createPersistentTx(); 99 return txId; 100 } 101 102 108 public final void commitTx(Tx txId) throws JMSException 109 { 110 boolean trace = log.isTraceEnabled(); 111 if (trace) 112 log.trace("Commit branch=" + txId.longValue()); 113 txId.commit(persistenceManager); 114 } 115 116 123 public final void commitTx(ConnectionToken dc, Object xid) throws JMSException 124 { 125 boolean trace = log.isTraceEnabled(); 126 GlobalXID gxid = new GlobalXID(dc, xid); 127 Tx txid = (Tx) globalToLocal.get(gxid); 128 if (txid == null) 129 { 130 PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid); 131 if (preparedInfo == null) 132 throw new SpyJMSException("Transaction does not exist from: " + dc.getClientID() + " xid=" + xid); 133 Set txids = preparedInfo.getTxids(); 134 for (Iterator i = txids.iterator(); i.hasNext();) 135 { 136 txid = (Tx) i.next(); 137 if (trace) 138 log.trace("Commit xid=" + xid + " branch=" + txid.longValue()); 139 txid.commit(persistenceManager); 140 } 141 prepared.remove(xid); 142 } 143 else 144 { 145 if (trace) 146 log.trace("Commit xid=" + xid + " branch=" + txid.longValue()); 147 txid.commit(persistenceManager); 148 } 149 } 150 151 158 public void addPostCommitTask(Tx txId, Runnable task) throws JMSException 159 { 160 if (txId == null) 161 { 162 task.run(); 163 return; 164 } 165 166 txId.addPostCommitTask(task); 167 } 168 169 175 public void rollbackTx(Tx txId) throws JMSException 176 { 177 boolean trace = log.isTraceEnabled(); 178 if (trace) 179 log.trace("Rollback branch=" + txId.longValue()); 180 txId.rollback(persistenceManager); 181 } 182 183 190 public final void rollbackTx(ConnectionToken dc, Object xid) throws JMSException 191 { 192 boolean trace = log.isTraceEnabled(); 193 GlobalXID gxid = new GlobalXID(dc, xid); 194 Tx txid = (Tx) globalToLocal.get(gxid); 195 if (txid == null) 196 { 197 PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid); 198 if (preparedInfo == null) 199 throw new SpyJMSException("Transaction does not exist from: " + dc.getClientID() + " xid=" + xid); 200 Set txids = preparedInfo.getTxids(); 201 for (Iterator i = txids.iterator(); i.hasNext();) 202 { 203 txid = (Tx) i.next(); 204 if (trace) 205 log.trace("Rolling back xid=" + xid + " branch=" + txid.longValue()); 206 txid.rollback(persistenceManager); 207 } 208 prepared.remove(xid); 209 } 210 else 211 { 212 if (trace) 213 log.trace("Rolling back xid=" + xid + " branch=" + txid.longValue()); 214 txid.rollback(persistenceManager); 215 } 216 } 217 218 225 public void addPostRollbackTask(Tx txId, Runnable task) throws JMSException 226 { 227 if (txId == null) 228 return; 229 230 txId.addPostRollbackTask(task); 231 } 232 233 243 public Tx createTx(ConnectionToken dc, Object xid) throws JMSException 244 { 245 GlobalXID gxid = new GlobalXID(dc, xid); 246 if (globalToLocal.containsKey(gxid)) 247 throw new SpyJMSException("Duplicate transaction from: " + dc.getClientID() + " xid=" + xid); 248 249 Tx txId = createTx(); 250 if (xid != null && xid instanceof Xid ) 251 txId.setXid((Xid ) xid); 252 globalToLocal.put(gxid, txId); 253 254 txId.addPostCommitTask(gxid); 256 txId.addPostRollbackTask(gxid); 257 258 return txId; 259 } 260 261 267 public void restoreTx(Tx txId) throws JMSException 268 { 269 addPreparedTx(txId, txId.getXid(), true); 270 } 271 272 280 void addPreparedTx(Tx txId, Xid xid, boolean inDoubt) throws JMSException 281 { 282 PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid); 283 if (preparedInfo == null) 284 { 285 preparedInfo = new PreparedInfo(xid, false); 286 prepared.put(xid, preparedInfo); 287 } 288 preparedInfo.add(txId); 289 if (inDoubt) 290 preparedInfo.setInDoubt(true); 291 } 292 293 301 public void markPrepared(ConnectionToken dc, Object xid, Tx txId) throws JMSException 302 { 303 try 304 { 305 if (xid instanceof Xid ) 306 addPreparedTx(txId, (Xid ) xid, false); 307 } 308 catch (Throwable t) 309 { 310 SpyJMSException.rethrowAsJMSException("Error marking transaction as prepared xid=" + xid + " tx=" + txId, t); 311 } 312 } 313 314 public Xid [] recover(ConnectionToken dc, int flags) throws Exception 315 { 316 Set preparedXids = prepared.keySet(); 317 Xid [] xids = (Xid []) preparedXids.toArray(new Xid [preparedXids.size()]); 318 return xids; 319 } 320 321 326 public Map getPreparedTransactions() 327 { 328 return prepared; 329 } 330 331 334 class GlobalXID implements Runnable 335 { 336 ConnectionToken dc; 337 Object xid; 338 339 GlobalXID(ConnectionToken dc, Object xid) 340 { 341 this.dc = dc; 342 this.xid = xid; 343 } 344 345 public boolean equals(Object obj) 346 { 347 if (obj == null) 348 { 349 return false; 350 } 351 if (obj.getClass() != GlobalXID.class) 352 { 353 return false; 354 } 355 return ((GlobalXID) obj).xid.equals(xid) && ((GlobalXID) obj).dc.equals(dc); 356 } 357 358 public int hashCode() 359 { 360 return xid.hashCode(); 361 } 362 363 public void run() 364 { 365 Tx txId = (Tx) globalToLocal.remove(this); 366 if (txId != null) 368 { 369 PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid); 370 if (preparedInfo != null) 371 { 372 preparedInfo.remove(txId); 373 if (preparedInfo.isEmpty()) 374 prepared.remove(xid); 375 } 376 } 377 } 378 } 379 380 386 public static class PreparedInfo 387 { 388 389 private Xid xid; 390 391 392 private boolean inDoubt; 393 394 395 private Set txids = new CopyOnWriteArraySet(); 396 397 403 public PreparedInfo(Xid xid, boolean inDoubt) 404 { 405 this.xid = xid; 406 this.inDoubt = inDoubt; 407 } 408 409 414 public boolean isInDoubt() 415 { 416 return inDoubt; 417 } 418 419 424 public void setInDoubt(boolean inDoubt) 425 { 426 this.inDoubt = inDoubt; 427 } 428 429 434 public Xid getXid() 435 { 436 return xid; 437 } 438 439 444 public Set getTxids() 445 { 446 return txids; 447 } 448 449 454 public void add(Tx txid) 455 { 456 txids.add(txid); 457 } 458 459 464 public void remove(Tx txid) 465 { 466 txids.remove(txid); 467 } 468 469 474 public boolean isEmpty() 475 { 476 return txids.isEmpty(); 477 } 478 } 479 } | Popular Tags |