1 package org.apache.ojb.broker.core; 2 3 17 18 import javax.transaction.RollbackException ; 19 import javax.transaction.Status ; 20 import javax.transaction.Synchronization ; 21 import javax.transaction.SystemException ; 22 import javax.transaction.Transaction ; 23 import javax.transaction.TransactionManager ; 24 import java.lang.reflect.Field ; 25 import java.util.ArrayList ; 26 import java.util.Collections ; 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 import java.util.Map ; 31 import java.util.WeakHashMap ; 32 33 import org.apache.commons.pool.KeyedObjectPool; 34 import org.apache.ojb.broker.PBFactoryException; 35 import org.apache.ojb.broker.PBKey; 36 import org.apache.ojb.broker.PersistenceBrokerInternal; 37 import org.apache.ojb.broker.TransactionAbortedException; 38 import org.apache.ojb.broker.TransactionInProgressException; 39 import org.apache.ojb.broker.TransactionNotInProgressException; 40 import org.apache.ojb.broker.accesslayer.ConnectionManagerIF; 41 import org.apache.ojb.broker.transaction.tm.TransactionManagerFactoryException; 42 import org.apache.ojb.broker.transaction.tm.TransactionManagerFactoryFactory; 43 import org.apache.ojb.broker.util.BrokerHelper; 44 import org.apache.ojb.broker.util.logging.Logger; 45 import org.apache.ojb.broker.util.logging.LoggerFactory; 46 47 64 public class PersistenceBrokerFactorySyncImpl extends PersistenceBrokerFactoryDefaultImpl 65 { 66 private Logger log = LoggerFactory.getLogger(PersistenceBrokerFactorySyncImpl.class); 67 private TransactionManager txMan; 68 private TxRegistry txRegistry; 69 70 public PersistenceBrokerFactorySyncImpl() 71 { 72 super(); 73 try 74 { 75 txMan = TransactionManagerFactoryFactory.instance().getTransactionManager(); 76 } 77 catch (TransactionManagerFactoryException e) 78 { 79 throw new PBFactoryException("Can't instantiate TransactionManager of managed environment", e); 80 } 81 txRegistry = new TxRegistry(); 82 } 83 84 public PersistenceBrokerInternal createPersistenceBroker(PBKey pbKey) throws PBFactoryException 85 { 86 89 pbKey = BrokerHelper.crossCheckPBKey(pbKey); 90 98 Transaction tx; 99 try 100 { 101 tx = searchForValidTx(); 103 } 104 catch (SystemException e) 105 { 106 throw new PBFactoryException("Can't create PB instance, failure while lookup" + 107 " running JTA transaction",e); 108 } 109 PersistenceBrokerSyncImpl obtainedBroker = null; 110 PersistenceBrokerSyncHandle result; 111 if (tx != null) 112 { 113 obtainedBroker = txRegistry.findBroker(tx, pbKey); 115 } 116 117 if(obtainedBroker == null || obtainedBroker.isClosed()) 118 { 119 result = (PersistenceBrokerSyncHandle) super.createPersistenceBroker(pbKey); 123 } 124 else 125 { 126 result = new PersistenceBrokerSyncHandle(obtainedBroker); 129 } 130 return result; 131 } 132 133 protected PersistenceBrokerInternal wrapBrokerWithPoolingHandle(PersistenceBrokerInternal broker, KeyedObjectPool pool) 134 { 135 return new PersistenceBrokerSyncImpl(broker, pool); 137 } 138 139 protected PersistenceBrokerInternal wrapRequestedBrokerInstance(PersistenceBrokerInternal broker) 140 { 141 if (!(broker instanceof PersistenceBrokerSyncImpl)) 143 { 144 throw new PBFactoryException("Expect instance of " + PersistenceBrokerSyncImpl.class 145 + ", found " + broker.getClass()); 146 } 147 150 PersistenceBrokerSyncImpl pb = (PersistenceBrokerSyncImpl) broker; 151 try 152 { 153 Transaction tx = searchForValidTx(); 155 if (tx != null) 156 { 157 txRegistry.register(tx, pb); 158 try 159 { 160 pb.internBegin(); 161 } 162 catch (Exception e) 163 { 164 168 log.error("Unexpected exception when start intern pb-tx", e); 169 try 170 { 171 tx.setRollbackOnly(); 172 } 173 catch (Throwable ignore) 174 { 175 } 176 throw new PBFactoryException("Unexpected exception when start intern pb-tx", e); 177 } 178 } 179 } 180 catch (Exception e) 181 { 182 if(e instanceof PBFactoryException) 183 { 184 throw (PBFactoryException) e; 185 } 186 else 187 { 188 throw new PBFactoryException("Error while try to participate in JTA transaction", e); 189 } 190 } 191 return new PersistenceBrokerSyncHandle(pb); 192 } 193 194 private Transaction searchForValidTx() throws SystemException 195 { 196 Transaction tx = txMan.getTransaction(); 197 if (tx != null) 198 { 199 int status = tx.getStatus(); 200 if (status != Status.STATUS_ACTIVE && status != Status.STATUS_NO_TRANSACTION) 201 { 202 throw new PBFactoryException("Transaction synchronization failed - wrong" + 203 " status of external JTA tx. Expected was an 'active' or 'no transaction'" 204 + ", found status is '" + getStatusFlagAsString(status) + "'"); 205 } 206 } 207 return tx; 208 } 209 210 214 private static String getStatusFlagAsString(int status) 215 { 216 String statusName = "no match, unknown status!"; 217 try 218 { 219 Field [] fields = Status .class.getDeclaredFields(); 220 for (int i = 0; i < fields.length; i++) 221 { 222 if (fields[i].getInt(null) == status) 223 { 224 statusName = fields[i].getName(); 225 break; 226 } 227 } 228 } 229 catch (Exception e) 230 { 231 statusName = "no match, unknown status!"; 232 } 233 return statusName; 234 } 235 236 public static class PersistenceBrokerSyncImpl extends PoolablePersistenceBroker implements Synchronization 240 { 241 private Logger log = LoggerFactory.getLogger(PersistenceBrokerSyncImpl.class); 242 245 private List handleList = new ArrayList (); 246 247 public PersistenceBrokerSyncImpl(PersistenceBrokerInternal broker, KeyedObjectPool pool) 248 { 249 super(broker, pool); 250 } 251 252 public void beforeCompletion() 253 { 254 if (log.isDebugEnabled()) log.debug("beforeCompletion was called, nothing to do"); 255 if(handleList.size() > 0) 256 { 257 for(int i = 0; i < handleList.size(); i++) 258 { 259 log.warn("Found unclosed PersistenceBroker handle, will do automatic close. Please make" + 260 " sure that all used PB instances will be closed."); 261 PersistenceBrokerHandle pbh = (PersistenceBrokerHandle) handleList.get(i); 262 pbh.close(); 263 } 264 handleList.clear(); 265 } 266 ConnectionManagerIF cm = serviceConnectionManager(); 267 if(cm.isBatchMode()) cm.executeBatch(); 268 if(cm.isInLocalTransaction()) 270 { 271 log.warn("Seems the used PersistenceBroker handle wasn't closed, close the used" + 273 " handle before the transaction completes."); 274 cm.localCommit(); 277 } 278 cm.releaseConnection(); 279 } 280 281 public void afterCompletion(int status) 282 { 283 if (log.isDebugEnabled()) log.debug("afterCompletion was called"); 284 287 try 288 { 289 if (status != Status.STATUS_COMMITTED) 290 { 291 if (status == Status.STATUS_ROLLEDBACK || status == Status.STATUS_ROLLING_BACK) 292 { 293 if (log.isDebugEnabled()) log.debug("Aborting PB-tx due to JTA initiated Rollback: " 294 + getStatusFlagAsString(status)); 295 } 296 else 297 { 298 log.error("Aborting PB-tx due to inconsistent, and unexpected, status of JTA tx: " 299 + getStatusFlagAsString(status)); 300 } 301 internAbort(); 302 } 303 else 304 { 305 if (log.isDebugEnabled()) log.debug("Commit PB-tx"); 306 internCommit(); 307 } 308 } 309 finally 310 { 311 doRealClose(); 313 } 314 } 315 316 private void internBegin() 317 { 318 setManaged(true); 319 super.beginTransaction(); 320 } 321 322 private void internCommit() 323 { 324 super.commitTransaction(); 325 } 326 327 private void internAbort() 328 { 329 super.abortTransaction(); 330 } 331 332 private void doRealClose() 333 { 334 if (log.isDebugEnabled()) log.debug("Now do real close of PB instance"); 335 super.close(); 336 } 337 338 public boolean close() 339 { 340 if(!isInTransaction()) 341 { 342 if (log.isDebugEnabled()) 343 log.debug("PB close was called, pass the close call to underlying PB instance"); 344 348 doRealClose(); 349 } 350 else 351 { 352 if(handleList.size() > 0) 355 { 356 if(log.isEnabledFor(Logger.INFO)) log.info("PB.close(): Active used by " + handleList.size() 357 + " handle objects, will skip close call"); 358 } 359 else 360 { 361 371 if (log.isDebugEnabled()) 372 log.debug("PB close was called, only close the PB handle when in JTA-tx"); 373 374 381 PersistenceBrokerImpl pb = ((PersistenceBrokerImpl) getInnermostDelegate()); 382 pb.fireBrokerEvent(pb.BEFORE_CLOSE_EVENT); 383 384 ConnectionManagerIF cm = serviceConnectionManager(); 385 if(cm.isInLocalTransaction()) 386 { 387 394 cm.localCommit(); 395 } 396 cm.releaseConnection(); 397 } 398 } 399 return true; 400 } 401 402 void registerHandle(PersistenceBrokerHandle handle) 403 { 404 handleList.add(handle); 405 } 406 407 void deregisterHandle(PersistenceBrokerHandle handle) 408 { 409 handleList.remove(handle); 410 } 411 412 public void beginTransaction() throws TransactionInProgressException, TransactionAbortedException 413 { 414 throw new UnsupportedOperationException ("In managed environments only JTA transaction demarcation allowed"); 415 } 416 417 public void commitTransaction() throws TransactionNotInProgressException, TransactionAbortedException 418 { 419 throw new UnsupportedOperationException ("In managed environments only JTA transaction demarcation allowed"); 420 } 421 422 public void abortTransaction() throws TransactionNotInProgressException 423 { 424 throw new UnsupportedOperationException ("In managed environments only JTA transaction demarcation allowed"); 425 } 426 } 427 428 434 class TransactionBox implements Synchronization 435 { 436 Transaction jtaTx; 437 Map syncMap = new HashMap (); 438 boolean isLocked = false; 439 boolean isClosed = false; 440 441 public TransactionBox(Transaction tx) 442 { 443 this.jtaTx = tx; 444 } 445 446 PersistenceBrokerSyncImpl find(PBKey key) 447 { 448 return (PersistenceBrokerSyncImpl) syncMap.get(key); 449 } 450 451 void add(PersistenceBrokerSyncImpl syncObj) 452 { 453 if (isLocked) 454 { 455 throw new PBFactoryException("Can't associate object with JTA transaction, because tx-completion started"); 456 } 457 syncMap.put(syncObj.getPBKey(), syncObj); 458 } 459 460 public void afterCompletion(int status) 461 { 462 boolean failures = false; 463 Synchronization synchronization = null; 464 for (Iterator iterator = syncMap.values().iterator(); iterator.hasNext();) 465 { 466 try 467 { 468 synchronization = (Synchronization ) iterator.next(); 469 synchronization.afterCompletion(status); 470 } 471 catch (Exception e) 472 { 473 failures = true; 474 log.error("Unexpected error when perform Synchronization#afterCompletion method" + 475 " call on object " + synchronization, e); 476 } 477 } 478 isClosed = true; 479 txRegistry.removeTxBox(jtaTx); 481 if (failures) 482 { 483 throw new PBFactoryException("Unexpected error occured while performing" + 484 " Synchronization#afterCompletion method"); 485 } 486 } 487 488 public void beforeCompletion() 489 { 490 boolean failures = false; 491 Synchronization synchronization = null; 492 for (Iterator iterator = syncMap.values().iterator(); iterator.hasNext();) 493 { 494 try 495 { 496 synchronization = (Synchronization ) iterator.next(); 497 synchronization.beforeCompletion(); 498 } 499 catch (Exception e) 500 { 501 failures = true; 502 log.error("Unexpected error when perform Synchronization#beforeCompletion method" + 503 " call on object " + synchronization, e); 504 } 505 } 506 isLocked = true; 507 if (failures) 508 { 509 throw new PBFactoryException("Unexpected error occured while performing" + 510 " Synchronization#beforeCompletion method"); 511 } 512 } 513 } 514 515 525 class TxRegistry 526 { 527 Map txBoxMap; 528 529 public TxRegistry() 530 { 531 txBoxMap = Collections.synchronizedMap(new WeakHashMap ()); 532 } 533 534 void register(Transaction tx, PersistenceBrokerSyncImpl syncObject) throws RollbackException , SystemException 535 { 536 TransactionBox txBox = (TransactionBox) txBoxMap.get(tx); 537 if (txBox == null || txBox.isClosed) 538 { 539 if (txBox != null) txBoxMap.remove(tx); 541 txBox = new TransactionBox(tx); 542 tx.registerSynchronization(txBox); 543 txBoxMap.put(tx, txBox); 544 } 545 txBox.add(syncObject); 546 } 547 548 PersistenceBrokerSyncImpl findBroker(Transaction tx, PBKey pbKey) 549 { 550 PersistenceBrokerSyncImpl result = null; 551 TransactionBox txBox = (TransactionBox) txBoxMap.get(tx); 552 if(txBox != null) 553 { 554 result = txBox.find(pbKey); 555 } 556 return result; 557 } 558 559 TransactionBox findTxBox(Transaction tx) 560 { 561 return (TransactionBox) txBoxMap.get(tx); 562 } 563 564 void removeTxBox(Transaction tx) 565 { 566 txBoxMap.remove(tx); 567 } 568 } 569 570 577 class PersistenceBrokerSyncHandle extends PersistenceBrokerHandle 578 { 579 583 public PersistenceBrokerSyncHandle(PersistenceBrokerSyncImpl broker) 584 { 585 super(broker); 586 broker.registerHandle(this); 588 } 589 590 public boolean isClosed() 591 { 592 return super.isClosed(); 593 } 594 595 public boolean close() 596 { 597 if(getDelegate() != null) 598 { 599 ((PersistenceBrokerSyncImpl) getDelegate()).deregisterHandle(this); 601 } 602 return super.close(); 603 } 604 } 605 } 606 | Popular Tags |