1 9 package org.ozoneDB.core; 10 11 import org.ozoneDB.*; 12 import org.ozoneDB.DxLib.*; 13 import org.ozoneDB.core.DbRemote.*; 14 import org.ozoneDB.core.dr.DeadlockRecognition; 15 import org.ozoneDB.util.LogWriter; 16 17 import java.util.Random ; 18 19 20 28 public final class TransactionManager extends ServerComponent { 29 30 33 protected DxMap taTable; 34 35 38 protected DxMap threadTable; 39 40 43 protected Transaction deadlockTA; 44 45 48 protected Thread exclusiveThread; 49 50 protected long acquireCount; 51 52 public TransactionManager( Env _env ) { 53 super( _env ); 54 taTable = new DxHashMap( 32 ); 56 threadTable = new DxHashMap( 32 ); 57 } 58 59 60 public void startup() throws Exception { 61 env.logWriter.newEntry( this, "startup...", LogWriter.INFO ); 62 } 63 64 65 public void shutdown() throws Exception { 66 env.logWriter.newEntry( this, "shutdown...", LogWriter.INFO ); 67 68 env.logWriter.newEntry( this, " there are " + taTable.count() + " pending transaction(s)", LogWriter.INFO ); 69 if (!taTable.isEmpty()) { 70 env.logWriter.newEntry( this, " aborting pending transactions...", LogWriter.INFO ); 71 DxIterator it = taTable.iterator(); 72 while (it.next() != null) { 73 ((Transaction)it.object()).stop(); 74 } 75 76 env.logWriter.newEntry( this, " waiting for transactions to end...", LogWriter.INFO ); 77 for (int sec = 10; !taTable.isEmpty(); sec--) { 78 Thread.sleep( 1000 ); 79 } 80 } 81 82 env.logWriter.newEntry( this, "acquire count total:" + acquireCount, LogWriter.INFO ); 83 } 84 85 86 public void save() throws Exception { 87 } 88 89 90 95 public Lock newLock() { 96 return new MROWLock(); 97 } 98 99 100 public int taTableCount() { 101 return taTable.count(); 102 } 103 104 105 public Transaction taForID( TransactionID taID ) { 106 return (Transaction)taTable.elementForKey( taID ); 107 } 108 109 110 114 public Transaction currentTA() { 115 Thread thread = Thread.currentThread(); 116 117 if (thread instanceof CommandThread) { 119 return ((CommandThread) thread).ta; 120 } else { 121 Transaction result = (Transaction) threadTable.elementForKey(thread); 122 return result; 123 } 124 } 125 126 127 public Transaction newTransaction( User owner ) throws TransactionException { 128 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 129 env.logWriter.newEntry( this, "newTransaction() *****************************", LogWriter.DEBUG3 ); 130 } 131 132 if (currentTA() != null) { 133 throw new TransactionException( "Thread is already joined to a transaction.", TransactionException.STATE ); 134 } 135 136 Transaction ta = env.storeManager.createTransaction(env, owner); 137 Thread thread = Thread.currentThread(); 138 139 if (thread instanceof CommandThread) { 142 CommandThread commandThread = (CommandThread)Thread.currentThread(); 143 commandThread.setTransaction( ta ); 144 } 145 146 if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) { 147 env.logWriter.newEntry( this, "newTransaction(): ta="+ta+", thread="+thread+".", LogWriter.DEBUG2 ); 148 } 149 150 synchronized (this) { 151 threadTable.addForKey( ta, thread ); 154 155 taTable.addForKey( ta, ta.taID() ); 156 157 } 158 return ta; 159 } 160 161 162 165 public void deleteTransaction() { 166 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 167 env.logWriter.newEntry( this, "deleteTransaction() --------------------------", LogWriter.DEBUG3 ); 168 } 169 170 Transaction ta = currentTA(); 171 if (ta == null) { 172 env.logWriter.newEntry( this, "deleteTransaction(): thread is not joined to a transaction.", 173 LogWriter.WARN ); 174 } else { 175 if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) { 176 env.logWriter.newEntry( this, "deleteTransaction() ta="+ta+".", LogWriter.DEBUG2 ); 177 } 178 synchronized (this) { 179 acquireCount += ta.acquireCount; 180 if (false&&env.logWriter.hasTarget( LogWriter.DEBUG2 )) { 181 env.logWriter.newEntry( this, " acquire count:" + ta.acquireCount, LogWriter.DEBUG2 ); 182 env.logWriter.newEntry( this, " total :" + acquireCount, LogWriter.DEBUG2 ); 183 } 184 185 Thread thread = Thread.currentThread(); 186 if (thread instanceof CommandThread) { 187 ((CommandThread)thread).setTransaction( null ); 188 } 189 190 threadTable.removeForKey( thread ); 191 taTable.removeForKey( ta.taID() ); 192 193 env.getGarbageCollector().removeTransactionRequiredToComplete(ta); 194 } 195 } 196 } 197 198 201 protected boolean alsoNotifySomeSleepingTransactions = true; 202 203 207 public void notifyWaitingTransactions() { 208 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 209 env.logWriter.newEntry( this, "notifyWaitingTransactions()", LogWriter.DEBUG3 ); 210 } 211 212 DxArrayBag tas = new DxArrayBag( taTable.count() ); 213 214 synchronized (this) { 218 DxIterator it = taTable.iterator(); 219 Transaction ta; 220 while ((ta = (Transaction)it.next()) != null) { 221 if ((ta.blocker != null)) { 222 tas.add( ta ); 223 } 224 } 225 226 if (alsoNotifySomeSleepingTransactions) { 227 if (tas.isEmpty()) { 228 it.reset(); 229 while ((ta = (Transaction)it.next()) != null) { 230 if (ta.isSleeping()) { tas.add(ta); 232 break; 233 } 234 } 235 } 236 } 237 } 238 239 DxIterator it = tas.iterator(); 240 Transaction ta; 241 while ((ta = (Transaction)it.next()) != null) { 242 if (false&&env.logWriter.hasTarget( LogWriter.DEBUG )) { 243 env.logWriter.newEntry( this, " notify: " + ta, LogWriter.DEBUG ); 244 } 245 synchronized (ta) { 246 ta.notifyAll(); 247 } 248 } 249 } 250 251 252 257 public void handleCommand(DbCommand command,DbInvokeClient client) { 258 handleCommand(command,client.getUser()); 259 } 260 261 266 public void handleCommand(DbCommand command,User user) { 267 if (env.logWriter.hasTarget( LogWriter.DEBUG )) { 268 env.logWriter.newEntry( this, "handleCommand(): " + command.toString(), LogWriter.DEBUG ); 269 } 270 271 CurrentDatabase.register(env.getDatabase()); 272 273 try { 274 command.result = null; 275 try { 276 if (command instanceof DbCloseConn) { 278 Transaction ta = currentTA(); 279 if (ta != null) { 280 abortTransaction( ta, command ); 281 deleteTransaction(); 282 } 283 } else if (!(command instanceof DbTransaction)) { 284 285 Transaction ta = currentTA(); 287 if (ta == null) { 288 completeTransaction( command, user ); 289 } else { 290 performCommand( ta, command ); 293 } 294 } else { 295 switch (((DbTransaction)command).mode()) { 299 case DbTransaction.MODE_BEGIN: { 300 if (currentTA() != null) { 301 command.result = new TransactionException( "Thread is already joined to a transaction.", 302 TransactionException.STATE ); 303 } else { 304 Transaction ta = newTransaction(user ); 305 command.result = ta.taID(); 306 } 307 } break; 308 case DbTransaction.MODE_PREPARE: { 309 prepareTransaction( currentTA(), command ); 310 } break; 311 case DbTransaction.MODE_COMMIT_TWOPHASE: { 312 commitTransaction( currentTA(), command ); 313 deleteTransaction(); 314 } break; 315 case DbTransaction.MODE_COMMIT_ONEPHASE: { 316 Transaction ta = currentTA(); 317 if (prepareTransaction( ta, command )) { 318 commitTransaction( ta, command ); 319 } 320 deleteTransaction(); 321 } break; 322 case DbTransaction.MODE_ABORT: { 323 abortTransaction( currentTA(), command ); 324 deleteTransaction(); 325 } break; 326 case DbTransaction.MODE_CHECKPOINT: { 327 command.result = new OzoneInternalException( "External CHECKPOINT is not implemented yet." ); 328 } break; 329 case DbTransaction.MODE_STATUS: { 330 command.result = new Integer ( currentTA().status() ); 331 } break; 332 } 333 } 334 } catch (Throwable e) { 335 env.logWriter.newEntry( this, "handleCommand(): " + e, e, LogWriter.ERROR ); 338 command.result = new OzoneInternalException(e.toString(),e); 339 deleteTransaction(); 340 } 341 } finally { 342 CurrentDatabase.unregister(); 343 } 344 } 345 346 347 350 protected void completeTransaction( DbCommand command,User user ) throws Exception { 351 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 352 env.logWriter.newEntry( this, "completeTransaction(): " + command, LogWriter.DEBUG3 ); 353 } 354 if (user == null) { 355 throw new TransactionException( "No owner set for current transaction." ); 356 } 357 358 Transaction ta = newTransaction( user); 359 360 boolean alright = false; 362 363 try { 364 if (alright = performCommand( ta, command )) { 365 if (prepareTransaction( ta, command )) { 366 commitTransaction( ta, command ); 367 } 368 } 369 } catch (Exception e) { 370 e.printStackTrace(); 371 if (env.logWriter.hasTarget(LogWriter.WARN)) { 372 env.logWriter.newEntry(this,"completeTransaction("+command+"): We've caught an exception (following) and that's why we'll abort now.",e,LogWriter.WARN); 373 } 374 throw e; 375 } catch (Error e) { 376 e.printStackTrace(); 377 if (env.logWriter.hasTarget(LogWriter.WARN)) { 378 env.logWriter.newEntry(this,"completeTransaction("+command+"): We've caught an error (following) and that's why we'll abort now.",e,LogWriter.WARN); 379 } 380 throw e; 381 } finally { 382 if (!alright) { 383 abortTransaction(ta,command); 384 } 385 } 386 deleteTransaction(); 387 } 388 389 390 401 protected boolean performCommand( Transaction ta, DbCommand command ) throws Exception { 402 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 403 env.logWriter.newEntry( this, "performCommand(): start: " + ta.toString() + ", " + command.toString(),LogWriter.DEBUG3); 404 } 405 406 boolean result = true; 407 408 try { 409 result = ta.performCommand( command ); 411 412 } catch (TransactionError e) { 415 if (e.code() == TransactionError.DEADLOCK) { 416 417 if (ta.commandCount > 1) { 418 env.logWriter.newEntry( this, ta.toString() + " deadlocked; throwing exception...", 419 LogWriter.WARN ); 420 command.result = new DeadlockException( "" ); 421 result = false; 422 } else { 423 Random rand = new Random (); 424 boolean deadlocked = true; 425 while (deadlocked) { 426 try { 427 env.logWriter.newEntry( this, ta.toString() + " aborting... (DEADLOCK)", LogWriter.WARN ); 428 abortTransaction( ta, command ); 429 430 ta.setDeadlocked(false); 431 432 long millis = (long)(rand.nextDouble() * ta.increaseDeadlockWaitTimeMaximum()); 433 env.logWriter.newEntry( this, ta.toString() + " sleeping " + millis + " milliseconds...", 434 LogWriter.WARN ); 435 ta.sleep(millis); 436 437 env.logWriter.newEntry( this, ta.toString() + " re-run...", LogWriter.WARN ); 438 ta.reset(); 439 result = ta.performCommand( command ); 440 441 deadlocked = false; 442 } 443 catch (TransactionError ee) { 444 if (ee.code() == TransactionError.DEADLOCK) { 445 } else { 447 throw ee; 448 } 449 } 450 } 451 } 452 } else { 453 throw e; 455 } 456 } finally { 457 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 458 env.logWriter.newEntry( this, "performCommand(): end: " + ta.toString() + ", " + command.toString(),LogWriter.DEBUG3); 459 } 460 } 461 return result; 462 } 463 464 465 470 protected boolean prepareTransaction( Transaction ta, DbCommand command ) throws Exception { 471 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 472 env.logWriter.newEntry( this, "prepareTransaction()", LogWriter.DEBUG3 ); 473 } 474 475 if (ta == null) { 478 env.logWriter.newEntry( this, "prepareTransaction(): Thread is not joined to a transaction.", 479 LogWriter.WARN ); 480 command.result = new TransactionException( "Thread is not joined to a transaction.", TransactionException.STATE ); 481 return false; 482 } else if (ta.status != Transaction.STATUS_STARTED) { 483 env.logWriter.newEntry( this, 484 "prepareTransaction():" + ta.toString() + ": Transaction has inproper status.", LogWriter.WARN ); 485 command.result = new TransactionException( "Transaction has inproper status.", TransactionException.STATE ); 486 return false; 487 } else if (ta.rollbackOnly) { 488 env.logWriter.newEntry( this, "prepareTransaction():" + ta.toString() + ": rollback only.", 489 LogWriter.WARN ); 490 abortTransaction( ta, command ); 491 command.result = new TransactionException( "Transaction is in rollback only status.", TransactionException.ROLLBACK ); 492 return false; 493 } else { 494 try { 495 ta.prepareCommit(); 497 return true; 498 } catch (Throwable e) { 499 env.logWriter.newEntry( this, "Prepare transaction failed: " + ta.toString() + "; aborting...", e, 500 LogWriter.WARN ); 501 abortTransaction( ta, command ); 502 503 command.result = new TransactionException( e.toString(), TransactionException.ROLLBACK ); 504 return false; 505 } finally { 506 } 508 } 509 } 510 511 512 protected void commitTransaction( Transaction ta, DbCommand command ) throws Exception { 513 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 514 env.logWriter.newEntry( this, "commitTransaction()", LogWriter.DEBUG3 ); 515 } 516 517 if (ta == null) { 520 env.logWriter.newEntry( this, "commitTransaction(): Thread is not joined to a transaction.", LogWriter.WARN ); 521 command.result = new TransactionException( "Thread is not joined to a transaction.", TransactionException.STATE ); 522 } else if (ta.status != Transaction.STATUS_PREPARED) { 523 env.logWriter.newEntry( this, "commitTransaction(): " + ta.toString() + ": Transaction has inproper status.", LogWriter.WARN ); 524 command.result = new TransactionException( "Transaction has inproper status.", TransactionException.STATE ); 525 } else { 526 try { 527 beginExclusion(); 528 ta.commit(); 529 } catch (Throwable e) { 530 env.logWriter.newEntry( this, "Commit transaction failed: " + ta.toString(), e, LogWriter.WARN ); 531 command.result = e; 532 if (e instanceof Error ) { 533 throw (Error )e; 534 } else { 535 throw (Exception )e; 536 } 537 } finally { 538 endExclusion(); 539 notifyWaitingTransactions(); 540 } 541 } 542 } 543 544 545 protected void abortTransaction( Transaction ta, DbCommand command ) throws Exception { 546 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 547 env.logWriter.newEntry(this, "abortTransaction()",LogWriter.DEBUG3); 548 } 549 550 if (ta == null) { 553 env.logWriter.newEntry( this, "abortTransaction(): Thread is not joined to a transaction.", 554 LogWriter.WARN ); 555 command.result = new TransactionException( "Thread is not joined to a transaction.", TransactionException.STATE ); 556 } else if (ta.status >= Transaction.STATUS_COMMITING) { 557 env.logWriter.newEntry( this, "abortTransaction(): " + ta.toString() + ": Transaction has inproper status.", 558 LogWriter.WARN ); 559 command.result = new TransactionException( "Transaction has inproper status.", TransactionException.STATE ); 560 } else { 561 try { 562 beginExclusion(); 563 ta.abort( command ); 564 } catch (Throwable e) { 565 env.logWriter.newEntry( this, "Aborting transaction failed: " + ta.toString(), e, LogWriter.WARN ); 566 command.result = e; 567 if (e instanceof Error ) { 568 throw (Error )e; 569 } else { 570 throw (Exception )e; 571 } 572 } finally { 573 endExclusion(); 574 notifyWaitingTransactions(); 575 } 576 } 577 } 578 579 580 584 public void checkExclusion() { 585 if (exclusiveThread != null && exclusiveThread != Thread.currentThread()) { 588 synchronized (this) { 589 while (exclusiveThread != null && exclusiveThread != Thread.currentThread()) { 590 try { 591 if (false) { 592 env.logWriter.newEntry(this,"checkExclusion(): waiting... ("+currentTA()+")",LogWriter.DEBUG2); 593 } 594 wait(); 595 if (false) { 596 env.logWriter.newEntry(this,"checkExclusion(): notified... ("+currentTA()+")",LogWriter.DEBUG2); 597 } 598 } catch (InterruptedException e) { 599 } 600 } 601 } 602 } 603 } 604 605 606 protected synchronized void beginExclusion() { 607 checkExclusion(); 608 609 Thread currentThread = Thread.currentThread(); 610 if (exclusiveThread != null && exclusiveThread != currentThread) { 611 throw new RuntimeException ( "Another thread already runs exclusively." ); 612 } 613 614 exclusiveThread = currentThread; 615 616 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 617 env.logWriter.newEntry( this, "beginExclusion(): ", LogWriter.DEBUG3 ); 618 } 619 } 620 621 622 protected synchronized void endExclusion() { 623 if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { 624 env.logWriter.newEntry( this, "endExclusion(): ", LogWriter.DEBUG3 ); 625 } 626 627 Thread currentThread = Thread.currentThread(); 628 if (exclusiveThread != currentThread) { 629 throw new RuntimeException ( "Current thread does not run exclusively." ); 630 } 631 632 exclusiveThread = null; 633 notifyAll(); 634 } 635 636 637 640 public synchronized void checkDeadlocks() { 641 DeadlockRecognition dr = env.deadlockRecognition(); 642 643 int blockedCount = 0; 645 DxIterator it = taTable.iterator(); 646 for (Transaction ta; (ta = (Transaction)it.next()) != null;) { 647 if (ta.isBlocked()) { 648 blockedCount++; 649 } 650 } 651 652 654 if (blockedCount >= 2) { 656 657 try { 658 beginExclusion(); 660 661 it = taTable.iterator(); 662 for (Transaction ta; (ta = (Transaction)it.next()) != null;) { 663 Transaction candidate = (Transaction)dr.detectDeadlock( ta ); 664 if (candidate != null) { 665 env.logWriter.newEntry( this, "*** *** DEADLOCK DETECTED: ta=" + ta, LogWriter.WARN ); 666 667 668 if (false) { } else { 671 675 676 681 } 682 683 deadlockTA = candidate; 686 synchronized (deadlockTA) { 688 deadlockTA.notifyAll(); 689 } 690 691 return; 696 } 697 } 698 } finally { 699 endExclusion(); 700 } 701 } 702 } 703 704 705 709 public synchronized boolean isDeadlockTA( Transaction ta ) { 710 if (ta == deadlockTA) { 711 deadlockTA = null; 712 return true; 713 } 714 return false; 715 } 716 717 721 public void startGarbageCollectionWaitForCurrentTransactionsToCompletePhase(GarbageCollector garbageCollector) { 722 synchronized (this) { 723 DxIterator i = taTable.elementSet().iterator(); 724 725 while (i.next()!=null) { 726 garbageCollector.addTransactionRequiredToComplete(i.object()); 727 } 728 garbageCollector.checkForEndOfWaitForCurrentTransactionsToCompletePhase(); 729 } 730 } 731 } 732 733 | Popular Tags |