1 5 package com.tc.object.tx; 6 7 import com.tc.exception.TCClassNotFoundException; 8 import com.tc.logging.TCLogger; 9 import com.tc.logging.TCLogging; 10 import com.tc.management.beans.tx.ClientTxMonitorMBean; 11 import com.tc.net.protocol.tcm.ChannelIDProvider; 12 import com.tc.object.ClientObjectManager; 13 import com.tc.object.LiteralValues; 14 import com.tc.object.ObjectID; 15 import com.tc.object.TCObject; 16 import com.tc.object.dmi.DmiDescriptor; 17 import com.tc.object.dna.api.DNA; 18 import com.tc.object.dna.api.DNAException; 19 import com.tc.object.loaders.Namespace; 20 import com.tc.object.lockmanager.api.LockID; 21 import com.tc.object.lockmanager.api.LockLevel; 22 import com.tc.object.lockmanager.api.ThreadLockManager; 23 import com.tc.object.lockmanager.api.WaitListener; 24 import com.tc.object.logging.RuntimeLogger; 25 import com.tc.object.session.SessionID; 26 import com.tc.text.Banner; 27 import com.tc.util.Assert; 28 import com.tc.util.ClassUtils; 29 30 import java.util.Collection ; 31 import java.util.Iterator ; 32 import java.util.LinkedList ; 33 import java.util.List ; 34 import java.util.Map ; 35 import java.util.Set ; 36 import java.util.Map.Entry; 37 38 41 public class ClientTransactionManagerImpl implements ClientTransactionManager { 42 private static final TCLogger logger = TCLogging.getLogger(ClientTransactionManagerImpl.class); 43 44 private final ThreadLocal transaction = new ThreadLocal () { 45 protected synchronized Object initialValue() { 46 return new ThreadTransactionContext(); 47 } 48 }; 49 50 private final ThreadLocal txnLogging = new ThreadLocal () { 51 protected Object initialValue() { 52 return new ThreadTransactionLoggingStack(); 53 } 54 55 }; 56 57 private final ClientTransactionFactory txFactory; 58 private final RemoteTransactionManager remoteTxManager; 59 private final ClientObjectManager objectManager; 60 private final ThreadLockManager lockManager; 61 private final LiteralValues literalValues = new LiteralValues(); 62 63 private final WaitListener waitListener = new WaitListener() { 64 public void handleWaitEvent() { 65 return; 66 } 67 }; 68 69 private final ChannelIDProvider cidProvider; 70 71 private final ClientTxMonitorMBean txMonitor; 72 73 public ClientTransactionManagerImpl(ChannelIDProvider cidProvider, ClientObjectManager objectManager, 74 ThreadLockManager lockManager, ClientTransactionFactory txFactory, 75 RemoteTransactionManager remoteTxManager, RuntimeLogger runtimeLogger, 76 final ClientTxMonitorMBean txMonitor) { 77 this.cidProvider = cidProvider; 78 this.txFactory = txFactory; 79 this.remoteTxManager = remoteTxManager; 80 this.objectManager = objectManager; 81 this.objectManager.setTransactionManager(this); 82 this.lockManager = lockManager; 83 this.txMonitor = txMonitor; 84 } 85 86 public int queueLength(String lockName) { 87 final LockID lockID = lockManager.lockIDFor(lockName); 88 return lockManager.queueLength(lockID); 89 } 90 91 public int waitLength(String lockName) { 92 final LockID lockID = lockManager.lockIDFor(lockName); 93 return lockManager.waitLength(lockID); 94 } 95 96 private int localHeldCount(String lockName, int lockLevel) { 97 final LockID lockID = lockManager.lockIDFor(lockName); 98 return lockManager.localHeldCount(lockID, lockLevel); 99 } 100 101 public boolean isHeldByCurrentThread(String lockName, int lockLevel) { 102 if (isTransactionLoggingDisabled()) { return true; } 103 return localHeldCount(lockName, lockLevel) > 0; 104 } 105 106 public boolean isLocked(String lockName) { 107 final LockID lockID = lockManager.lockIDFor(lockName); 108 return lockManager.isLocked(lockID); 109 } 110 111 public void lock(String lockName, int lockLevel) { 112 final LockID lockID = lockManager.lockIDFor(lockName); 113 lockManager.lock(lockID, lockLevel); 114 } 115 116 public void unlock(String lockName) { 117 final LockID lockID = lockManager.lockIDFor(lockName); 118 if (lockID != null) { 119 lockManager.unlock(lockID); 120 getThreadTransactionContext().removeLock(lockID); 121 } 122 } 123 124 public boolean tryBegin(String lockName, int lockLevel) { 125 logTryBegin0(lockName, lockLevel); 126 127 if (isTransactionLoggingDisabled() || objectManager.isCreationInProgress()) { return true; } 128 129 final TxnType txnType = getTxnTypeFromLockLevel(lockLevel); 130 ClientTransaction currentTransaction = getTransactionOrNull(); 131 132 if ((currentTransaction != null) && lockLevel == LockLevel.CONCURRENT) { 133 throw new AssertionError ("Can't acquire concurrent locks in a nested lock context."); 135 } 136 137 final LockID lockID = lockManager.lockIDFor(lockName); 138 boolean isLocked = lockManager.tryLock(lockID, lockLevel); 139 if (!isLocked) { return isLocked; } 140 141 pushTxContext(lockID, txnType); 142 143 if (currentTransaction == null) { 144 createTxAndInitContext(); 145 } else { 146 currentTransaction.setTransactionContext(this.peekContext()); 147 } 148 149 return isLocked; 150 } 151 152 public void begin(String lockName, int lockLevel) { 153 logBegin0(lockName, lockLevel); 154 155 if (isTransactionLoggingDisabled() || objectManager.isCreationInProgress()) { return; } 156 157 final TxnType txnType = getTxnTypeFromLockLevel(lockLevel); 158 ClientTransaction currentTransaction = getTransactionOrNull(); 159 160 final LockID lockID = lockManager.lockIDFor(lockName); 161 162 pushTxContext(lockID, txnType); 163 164 if (currentTransaction == null) { 165 createTxAndInitContext(); 166 } else { 167 currentTransaction.setTransactionContext(this.peekContext()); 168 } 169 170 lockManager.lock(lockID, lockLevel); 171 } 172 173 private TxnType getTxnTypeFromLockLevel(int lockLevel) { 174 switch (lockLevel) { 175 case LockLevel.READ: 176 return TxnType.READ_ONLY; 177 case LockLevel.CONCURRENT: 178 return TxnType.CONCURRENT; 179 case LockLevel.WRITE: 180 return TxnType.NORMAL; 181 case LockLevel.SYNCHRONOUS_WRITE: 182 return TxnType.NORMAL; 183 default: 184 throw Assert.failure("don't know how to translate lock level " + lockLevel); 185 } 186 } 187 188 public void wait(String lockName, WaitInvocation call, Object object) throws UnlockedSharedObjectException, 189 InterruptedException { 190 final ClientTransaction topTxn = getTransaction(); 191 192 LockID lockID = lockManager.lockIDFor(lockName); 193 194 if (lockID == null || lockID.isNull()) { 195 lockID = topTxn.getLockID(); 196 } 197 198 commit(lockID, topTxn, true); 199 200 try { 201 lockManager.wait(lockID, call, object, waitListener); 202 } finally { 203 createTxAndInitContext(); 204 } 205 } 206 207 public void notify(String lockName, boolean all, Object object) throws UnlockedSharedObjectException { 208 final ClientTransaction currentTxn = getTransaction(); 209 LockID lockID = lockManager.lockIDFor(lockName); 210 211 if (lockID == null || lockID.isNull()) { 212 lockID = currentTxn.getLockID(); 213 } 214 currentTxn.addNotify(lockManager.notify(lockID, all)); 215 } 216 217 private void logTryBegin0(String lockID, int type) { 218 if (logger.isDebugEnabled()) { 219 logger.debug("tryBegin(): lockID=" + (lockID == null ? "null" : lockID) + ", type = " + type); 220 } 221 } 222 223 private void logBegin0(String lockID, int type) { 224 if (logger.isDebugEnabled()) { 225 logger.debug("begin(): lockID=" + (lockID == null ? "null" : lockID) + ", type = " + type); 226 } 227 } 228 229 private ClientTransaction getTransactionOrNull() { 230 ThreadTransactionContext tx = getThreadTransactionContext(); 231 return tx.getCurrentTransaction(); 232 } 233 234 private ThreadTransactionContext getThreadTransactionContext() { 235 return (ThreadTransactionContext) this.transaction.get(); 236 } 237 238 public ClientTransaction getTransaction() throws UnlockedSharedObjectException { 239 return getTransaction(null); 240 } 241 242 private ClientTransaction getTransaction(Object context) throws UnlockedSharedObjectException { 243 ClientTransaction tx = getTransactionOrNull(); 244 if (tx == null) { 245 246 String type = context == null ? null : context.getClass().getName(); 247 String errorMsg = "Attempt to access a shared object outside the scope of a shared lock. " 248 + "\nAll access to shared objects must be within the scope of one or more shared locks defined in your Terracotta configuration. " 249 + "\nPlease alter the locks section of your Terracotta configuration so that this access is auto-locked or protected by a named lock."; 250 String details = ""; 251 if (type != null) { 252 details += "Shared Object Type: " + type; 253 } 254 throw new UnlockedSharedObjectException(errorMsg, Thread.currentThread().getName(), cidProvider.getChannelID() 255 .toLong(), details); 256 } 257 return tx; 258 } 259 260 public void checkWriteAccess(Object context) { 261 if (isTransactionLoggingDisabled()) { return; } 262 263 ClientTransaction txn = getTransaction(context); 265 266 if (txn.getTransactionType() == TxnType.READ_ONLY) { throw new ReadOnlyException( 269 "Current transaction with read-only access attempted to modify a shared object. " 270 + "\nPlease alter the locks section of your Terracotta configuration so that the methods involved in this transaction have read/write access.", 271 Thread.currentThread().getName(), 272 cidProvider.getChannelID() 273 .toLong()); } 274 } 275 276 285 public void commit(String lockName) throws UnlockedSharedObjectException { 286 logCommit0(); 287 if (isTransactionLoggingDisabled() || objectManager.isCreationInProgress()) { return; } 288 289 ClientTransaction tx = getTransaction(); 291 LockID lockID = lockManager.lockIDFor(lockName); 292 if (lockID == null || lockID.isNull()) { 293 lockID = tx.getLockID(); 294 } 295 boolean hasCommitted = commit(lockID, tx, false); 296 297 popTransaction(lockManager.lockIDFor(lockName)); 298 299 if (peekContext() != null) { 300 if (hasCommitted) { 301 createTxAndInitContext(); 302 } else { 303 tx.setTransactionContext(peekContext()); 306 setTransaction(tx); 307 } 308 } 309 } 310 311 private void createTxAndInitContext() { 312 ClientTransaction ctx = txFactory.newInstance(); 313 ctx.setTransactionContext(peekContext()); 314 setTransaction(ctx); 315 } 316 317 private ClientTransaction popTransaction() { 318 ThreadTransactionContext ttc = getThreadTransactionContext(); 319 return ttc.popCurrentTransaction(); 320 } 321 322 private ClientTransaction popTransaction(LockID lockID) { 323 if (lockID == null || lockID.isNull()) { return popTransaction(); } 324 ThreadTransactionContext ttc = getThreadTransactionContext(); 325 return ttc.popCurrentTransaction(lockID); 326 } 327 328 private TransactionContext peekContext(LockID lockID) { 329 ThreadTransactionContext ttc = getThreadTransactionContext(); 330 return ttc.peekContext(lockID); 331 } 332 333 private TransactionContext peekContext() { 334 ThreadTransactionContext ttc = getThreadTransactionContext(); 335 return ttc.peekContext(); 336 } 337 338 private void pushTxContext(LockID lockID, TxnType txnType) { 339 ThreadTransactionContext ttc = getThreadTransactionContext(); 340 ttc.pushContext(lockID, txnType); 341 } 342 343 private void logCommit0() { 344 if (logger.isDebugEnabled()) logger.debug("commit()"); 345 } 346 347 private boolean commit(LockID lockID, ClientTransaction currentTransaction, boolean isWaitContext) { 348 try { 349 return commitInternal(lockID, currentTransaction, isWaitContext); 350 } catch (Throwable t) { 351 remoteTxManager.stopProcessing(); 352 Banner.errorBanner("Terracotta client shutting down due to error " + t); 353 logger.error(t); 354 if (t instanceof Error ) { throw (Error ) t; } 355 if (t instanceof RuntimeException ) { throw (RuntimeException ) t; } 356 throw new RuntimeException (t); 357 } 358 } 359 360 private boolean commitInternal(LockID lockID, ClientTransaction currentTransaction, boolean isWaitContext) { 361 Assert.assertNotNull("transaction", currentTransaction); 362 363 try { 364 disableTransactionLogging(); 365 366 TransactionContext tc = peekContext(lockID); 368 if (tc.getType().equals(TxnType.READ_ONLY)) { 369 txMonitor.committedReadTransaction(); 370 return false; 371 } 372 373 boolean hasPendingCreateObjects = objectManager.hasPendingCreateObjects(); 374 if (hasPendingCreateObjects) { 375 objectManager.addPendingCreateObjectsToTransaction(); 376 } 377 378 currentTransaction.setAlreadyCommitted(); 379 if (currentTransaction.hasChangesOrNotifies() || hasPendingCreateObjects) { 380 if (txMonitor.isEnabled()) { 381 currentTransaction.updateMBean(txMonitor); 382 } 383 remoteTxManager.commit(currentTransaction); 384 } 385 return true; 386 } finally { 387 enableTransactionLogging(); 388 389 if (!isWaitContext && !currentTransaction.isNull()) { 394 if (lockID != null && !lockID.isNull()) { 395 lockManager.unlock(lockID); 396 } else { 397 throw new AssertionError ("Trying to unlock with lockID = null!"); 398 } 399 } 400 } 401 } 402 403 private void basicApply(Collection objectChanges, Map newRoots, boolean force) throws DNAException { 404 405 List l = new LinkedList (); 406 407 for (Iterator i = objectChanges.iterator(); i.hasNext();) { 408 DNA dna = (DNA) i.next(); 409 TCObject tcobj = null; 410 Assert.assertTrue(dna.isDelta()); 411 try { 412 objectManager.getClassFor(Namespace.parseClassNameIfNecessary(dna.getTypeName()), dna 416 .getDefiningLoaderDescription()); 417 tcobj = objectManager.lookup(dna.getObjectID()); 418 } catch (ClassNotFoundException cnfe) { 419 logger.warn("Could not apply change because class not local:" + dna.getTypeName()); 420 continue; 421 } 422 Object obj = tcobj == null ? null : tcobj.getPeerObject(); 425 l.add(obj); 426 if (obj != null) { 427 try { 428 tcobj.hydrate(dna, force); 429 } catch (ClassNotFoundException cnfe) { 430 logger.warn("Could not apply change because class not local:" + cnfe.getMessage()); 431 throw new TCClassNotFoundException(cnfe); 432 } 433 } 434 } 435 436 for (Iterator i = newRoots.entrySet().iterator(); i.hasNext();) { 437 Entry entry = (Entry) i.next(); 438 String rootName = (String ) entry.getKey(); 439 ObjectID newRootID = (ObjectID) entry.getValue(); 440 objectManager.replaceRootIDIfNecessary(rootName, newRootID); 441 } 442 } 443 444 public void receivedAcknowledgement(SessionID sessionID, TransactionID transactionID) { 445 this.remoteTxManager.receivedAcknowledgement(sessionID, transactionID); 446 } 447 448 public void receivedBatchAcknowledgement(TxnBatchID batchID) { 449 this.remoteTxManager.receivedBatchAcknowledgement(batchID); 450 } 451 452 public void apply(TxnType txType, LockID[] lockIDs, Collection objectChanges, Set lookupObjectIDs, Map newRoots) { 453 try { 455 disableTransactionLogging(); 456 basicApply(objectChanges, newRoots, false); 457 } finally { 458 enableTransactionLogging(); 460 } 461 } 462 463 471 480 public void literalValueChanged(TCObject source, Object newValue, Object oldValue) { 481 if (isTransactionLoggingDisabled()) { return; } 482 483 try { 484 disableTransactionLogging(); 485 486 Object pojo = source.getPeerObject(); 487 ClientTransaction tx = getTransaction(pojo); 488 489 tx.literalValueChanged(source, newValue, oldValue); 490 } finally { 491 enableTransactionLogging(); 492 } 493 494 } 495 496 public void fieldChanged(TCObject source, String classname, String fieldname, Object newValue, int index) { 497 if (isTransactionLoggingDisabled()) { return; } 498 499 try { 500 disableTransactionLogging(); 501 502 Object pojo = source.getPeerObject(); 503 504 ClientTransaction tx = getTransaction(pojo); 505 logFieldChanged0(source, classname, fieldname, newValue, tx); 506 507 if (newValue != null && literalValues.isLiteralInstance(newValue)) { 508 tx.fieldChanged(source, classname, fieldname, newValue, index); 509 } else { 510 if (newValue != null) { 511 objectManager.checkPortabilityOfField(newValue, fieldname, pojo.getClass()); 512 } 513 514 TCObject tco = objectManager.lookupOrCreate(newValue); 515 tx.fieldChanged(source, classname, fieldname, tco.getObjectID(), index); 516 517 if (newValue != null) { 520 tx.createObject(tco); 521 } 522 } 523 } finally { 524 enableTransactionLogging(); 525 } 526 } 527 528 public void arrayChanged(TCObject source, int startPos, Object array, int length) { 529 if (isTransactionLoggingDisabled()) { return; } 530 try { 531 disableTransactionLogging(); 532 Object pojo = source.getPeerObject(); 533 ClientTransaction tx = getTransaction(pojo); 534 535 if (!ClassUtils.isPrimitiveArray(array)) { 536 Object [] objArray = (Object []) array; 537 for (int i = 0; i < length; i++) { 538 539 Object element = objArray[i]; 540 if (!literalValues.isLiteralInstance(element)) { 541 if (element != null) objectManager.checkPortabilityOfField(element, String.valueOf(i), pojo.getClass()); 542 543 TCObject tco = objectManager.lookupOrCreate(element); 544 objArray[i] = tco.getObjectID(); 545 if (element != null) tx.createObject(tco); 548 } 549 } 550 } 551 tx.arrayChanged(source, startPos, array, length); 552 553 } finally { 554 enableTransactionLogging(); 555 } 556 } 557 558 private void logFieldChanged0(TCObject source, String classname, String fieldname, Object newValue, 559 ClientTransaction tx) { 560 if (logger.isDebugEnabled()) logger.debug("fieldChanged(source=" + source + ", classname=" + classname 561 + ", fieldname=" + fieldname + ", newValue=" + newValue + ", tx=" + tx); 562 } 563 564 public void logicalInvoke(TCObject source, int method, String methodName, Object [] parameters) { 565 if (isTransactionLoggingDisabled()) { return; } 566 567 try { 568 disableTransactionLogging(); 569 570 Object pojo = source.getPeerObject(); 571 ClientTransaction tx = getTransaction(pojo); 572 573 for (int i = 0; i < parameters.length; i++) { 574 Object p = parameters[i]; 575 boolean isLiteral = literalValues.isLiteralInstance(p); 576 if (!isLiteral) { 577 if (p != null) { 578 objectManager.checkPortabilityOfLogicalAction(p, methodName, pojo.getClass()); 579 } 580 581 TCObject tco = objectManager.lookupOrCreate(p); 582 parameters[i] = tco.getObjectID(); 583 if (p != null) { 584 tx.createObject(tco); 587 } 588 } 589 } 590 591 tx.logicalInvoke(source, method, parameters, methodName); 592 } finally { 593 enableTransactionLogging(); 594 } 595 } 596 597 private void setTransaction(ClientTransaction tx) { 598 getThreadTransactionContext().setCurrentTransaction(tx); 599 } 600 601 public void createObject(TCObject source) { 602 getTransaction().createObject(source); 603 } 604 605 public void createRoot(String name, ObjectID rootID) { 606 getTransaction().createRoot(name, rootID); 607 } 608 609 public void addReference(TCObject tco) { 610 ClientTransaction txn = getTransactionOrNull(); 611 if (txn != null) { 612 txn.createObject(tco); 613 } 614 } 615 616 public ChannelIDProvider getChannelIDProvider() { 617 return cidProvider; 618 } 619 620 public void disableTransactionLogging() { 621 ThreadTransactionLoggingStack txnStack = (ThreadTransactionLoggingStack) txnLogging.get(); 622 txnStack.increament(); 623 } 624 625 public void enableTransactionLogging() { 626 ThreadTransactionLoggingStack txnStack = (ThreadTransactionLoggingStack) txnLogging.get(); 627 final int size = txnStack.decrement(); 628 Assert.assertTrue("size=" + size, size >= 0); 629 } 630 631 public boolean isTransactionLoggingDisabled() { 632 ThreadTransactionLoggingStack txnStack = (ThreadTransactionLoggingStack) txnLogging.get(); 633 return (txnStack.get() > 0); 634 } 635 636 public static class ThreadTransactionLoggingStack { 637 int callCount = 0; 638 639 int increament() { 640 return ++callCount; 641 } 642 643 int decrement() { 644 return --callCount; 645 } 646 647 int get() { 648 return callCount; 649 } 650 } 651 652 public void addDmiDescriptor(DmiDescriptor dd) { 653 getTransaction().addDmiDescritor(dd); 654 } 655 656 } 657 | Popular Tags |