|                                                                                                              1
 9   package org.ozoneDB.core;
 10
 11  import org.ozoneDB.*;
 12  import org.ozoneDB.DxLib.*;
 13  import org.ozoneDB.core.DbRemote.*;
 14  import org.ozoneDB.core.dr.DeadlockRecognition;
 15  import org.ozoneDB.util.LogWriter;
 16
 17  import java.util.Random
  ; 18
 19
 20
 28  public final class TransactionManager extends ServerComponent {
 29
 30
 33      protected DxMap taTable;
 34
 35
 38      protected DxMap threadTable;
 39
 40
 43      protected Transaction deadlockTA;
 44
 45
 48      protected Thread
  exclusiveThread; 49
 50      protected long acquireCount;
 51
 52      public TransactionManager( Env _env ) {
 53          super( _env );
 54          taTable = new DxHashMap( 32 );
 56          threadTable = new DxHashMap( 32 );
 57      }
 58
 59
 60      public void startup() throws Exception
  { 61          env.logWriter.newEntry( this, "startup...", LogWriter.INFO );
 62      }
 63
 64
 65      public void shutdown() throws Exception
  { 66          env.logWriter.newEntry( this, "shutdown...", LogWriter.INFO );
 67
 68          env.logWriter.newEntry( this, "    there are " + taTable.count() + " pending transaction(s)", LogWriter.INFO );
 69          if (!taTable.isEmpty()) {
 70              env.logWriter.newEntry( this, "    aborting pending transactions...", LogWriter.INFO );
 71              DxIterator it = taTable.iterator();
 72              while (it.next() != null) {
 73                  ((Transaction)it.object()).stop();
 74              }
 75
 76              env.logWriter.newEntry( this, "    waiting for transactions to end...", LogWriter.INFO );
 77              for (int sec = 10; !taTable.isEmpty(); sec--) {
 78                  Thread.sleep( 1000 );
 79              }
 80          }
 81
 82          env.logWriter.newEntry( this, "acquire count total:" + acquireCount, LogWriter.INFO );
 83      }
 84
 85
 86      public void save() throws Exception
  { 87      }
 88
 89
 90
 95      public Lock newLock() {
 96          return new MROWLock();
 97      }
 98
 99
 100     public int taTableCount() {
 101         return taTable.count();
 102     }
 103
 104
 105     public Transaction taForID( TransactionID taID ) {
 106         return (Transaction)taTable.elementForKey( taID );
 107     }
 108
 109
 110
 114     public Transaction currentTA() {
 115         Thread
  thread = Thread.currentThread(); 116
 117                 if (thread instanceof CommandThread) {
 119             return ((CommandThread) thread).ta;
 120         } else {
 121             Transaction result = (Transaction) threadTable.elementForKey(thread);
 122             return result;
 123         }
 124     }
 125
 126
 127     public Transaction newTransaction( User owner ) throws TransactionException {
 128         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 129             env.logWriter.newEntry( this, "newTransaction() *****************************", LogWriter.DEBUG3 );
 130         }
 131
 132         if (currentTA() != null) {
 133             throw new TransactionException( "Thread is already joined to a transaction.", TransactionException.STATE );
 134         }
 135
 136         Transaction ta = env.storeManager.createTransaction(env, owner);
 137         Thread
  thread = Thread.currentThread(); 138
 139                         if (thread instanceof CommandThread) {
 142             CommandThread commandThread = (CommandThread)Thread.currentThread();
 143             commandThread.setTransaction( ta );
 144         }
 145
 146         if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
 147             env.logWriter.newEntry( this, "newTransaction(): ta="+ta+", thread="+thread+".", LogWriter.DEBUG2 );
 148         }
 149
 150         synchronized (this) {
 151                                     threadTable.addForKey( ta, thread );
 154
 155             taTable.addForKey( ta, ta.taID() );
 156
 157         }
 158         return ta;
 159     }
 160
 161
 162
 165     public void deleteTransaction() {
 166         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 167             env.logWriter.newEntry( this, "deleteTransaction() --------------------------", LogWriter.DEBUG3 );
 168         }
 169
 170         Transaction ta = currentTA();
 171         if (ta == null) {
 172             env.logWriter.newEntry( this, "deleteTransaction(): thread is not joined to a transaction.",
 173                     LogWriter.WARN );
 174         } else {
 175             if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
 176                 env.logWriter.newEntry( this, "deleteTransaction() ta="+ta+".", LogWriter.DEBUG2 );
 177             }
 178             synchronized (this) {
 179                 acquireCount += ta.acquireCount;
 180                 if (false&&env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
 181                     env.logWriter.newEntry( this, "    acquire count:" + ta.acquireCount, LogWriter.DEBUG2 );
 182                     env.logWriter.newEntry( this, "    total        :" + acquireCount, LogWriter.DEBUG2 );
 183                 }
 184
 185                 Thread
  thread = Thread.currentThread(); 186                 if (thread instanceof CommandThread) {
 187                     ((CommandThread)thread).setTransaction( null );
 188                 }
 189
 190                 threadTable.removeForKey( thread );
 191                 taTable.removeForKey( ta.taID() );
 192
 193                 env.getGarbageCollector().removeTransactionRequiredToComplete(ta);
 194             }
 195         }
 196     }
 197
 198
 201     protected boolean alsoNotifySomeSleepingTransactions = true;
 202
 203
 207     public void notifyWaitingTransactions() {
 208         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 209             env.logWriter.newEntry( this, "notifyWaitingTransactions()", LogWriter.DEBUG3 );
 210         }
 211
 212         DxArrayBag tas = new DxArrayBag( taTable.count() );
 213
 214                                 synchronized (this) {
 218             DxIterator it = taTable.iterator();
 219             Transaction ta;
 220             while ((ta = (Transaction)it.next()) != null) {
 221                 if ((ta.blocker != null)) {
 222                     tas.add( ta );
 223                 }
 224             }
 225
 226             if (alsoNotifySomeSleepingTransactions) {
 227                 if (tas.isEmpty()) {
 228                     it.reset();
 229                     while ((ta = (Transaction)it.next()) != null) {
 230                         if (ta.isSleeping()) {                             tas.add(ta);
 232                             break;
 233                         }
 234                     }
 235                 }
 236             }
 237         }
 238
 239         DxIterator it = tas.iterator();
 240         Transaction ta;
 241         while ((ta = (Transaction)it.next()) != null) {
 242             if (false&&env.logWriter.hasTarget( LogWriter.DEBUG )) {
 243                 env.logWriter.newEntry( this, "    notify: " + ta, LogWriter.DEBUG );
 244             }
 245             synchronized (ta) {
 246                 ta.notifyAll();
 247             }
 248         }
 249     }
 250
 251
 252
 257     public void handleCommand(DbCommand command,DbInvokeClient client) {
 258         handleCommand(command,client.getUser());
 259     }
 260
 261
 266     public void handleCommand(DbCommand command,User user) {
 267         if (env.logWriter.hasTarget( LogWriter.DEBUG )) {
 268             env.logWriter.newEntry( this, "handleCommand(): " + command.toString(), LogWriter.DEBUG );
 269         }
 270
 271         CurrentDatabase.register(env.getDatabase());
 272
 273         try {
 274             command.result = null;
 275             try {
 276                                 if (command instanceof DbCloseConn) {
 278                     Transaction ta = currentTA();
 279                     if (ta != null) {
 280                         abortTransaction( ta, command );
 281                         deleteTransaction();
 282                     }
 283                 } else if (!(command instanceof DbTransaction)) {
 284
 285                                         Transaction ta = currentTA();
 287                     if (ta == null) {
 288                         completeTransaction( command,  user );
 289                     } else {
 290                                                                         performCommand( ta, command );
 293                     }
 294                 } else {
 295                                                                                 switch (((DbTransaction)command).mode()) {
 299                         case DbTransaction.MODE_BEGIN: {
 300                             if (currentTA() != null) {
 301                                 command.result = new TransactionException( "Thread is already joined to a transaction.",
 302                                         TransactionException.STATE );
 303                             } else {
 304                                 Transaction ta = newTransaction(user );
 305                                 command.result = ta.taID();
 306                             }
 307                         } break;
 308                         case DbTransaction.MODE_PREPARE: {
 309                             prepareTransaction( currentTA(), command );
 310                         } break;
 311                         case DbTransaction.MODE_COMMIT_TWOPHASE: {
 312                             commitTransaction( currentTA(), command );
 313                             deleteTransaction();
 314                         } break;
 315                         case DbTransaction.MODE_COMMIT_ONEPHASE: {
 316                             Transaction ta = currentTA();
 317                             if (prepareTransaction( ta, command )) {
 318                                 commitTransaction( ta, command );
 319                             }
 320                             deleteTransaction();
 321                         } break;
 322                         case DbTransaction.MODE_ABORT: {
 323                             abortTransaction( currentTA(), command );
 324                             deleteTransaction();
 325                         } break;
 326                         case DbTransaction.MODE_CHECKPOINT: {
 327                             command.result = new OzoneInternalException( "External CHECKPOINT is not implemented yet." );
 328                         } break;
 329                         case DbTransaction.MODE_STATUS: {
 330                             command.result = new Integer
  ( currentTA().status() ); 331                         } break;
 332                     }
 333                 }
 334             } catch (Throwable
  e) { 335                                                 env.logWriter.newEntry( this, "handleCommand(): " + e, e, LogWriter.ERROR );
 338                 command.result = new OzoneInternalException(e.toString(),e);
 339                 deleteTransaction();
 340             }
 341         } finally {
 342             CurrentDatabase.unregister();
 343         }
 344     }
 345
 346
 347
 350     protected void completeTransaction( DbCommand command,User user ) throws Exception
  { 351         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 352             env.logWriter.newEntry( this, "completeTransaction(): " + command, LogWriter.DEBUG3 );
 353         }
 354         if (user == null) {
 355             throw new TransactionException( "No owner set for current transaction." );
 356         }
 357
 358         Transaction ta = newTransaction( user);
 359
 360                 boolean alright = false;
 362
 363         try {
 364             if (alright = performCommand( ta, command )) {
 365                 if (prepareTransaction( ta, command )) {
 366                     commitTransaction( ta, command );
 367                 }
 368             }
 369         } catch (Exception
  e) { 370             e.printStackTrace();
 371             if (env.logWriter.hasTarget(LogWriter.WARN)) {
 372                 env.logWriter.newEntry(this,"completeTransaction("+command+"): We've caught an exception (following) and that's why we'll abort now.",e,LogWriter.WARN);
 373             }
 374             throw e;
 375         } catch (Error
  e) { 376             e.printStackTrace();
 377             if (env.logWriter.hasTarget(LogWriter.WARN)) {
 378                 env.logWriter.newEntry(this,"completeTransaction("+command+"): We've caught an error (following) and that's why we'll abort now.",e,LogWriter.WARN);
 379             }
 380             throw e;
 381         } finally {
 382             if (!alright) {
 383                 abortTransaction(ta,command);
 384             }
 385         }
 386         deleteTransaction();
 387     }
 388
 389
 390
 401     protected boolean performCommand( Transaction ta, DbCommand command ) throws Exception
  { 402         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 403             env.logWriter.newEntry( this, "performCommand(): start: " + ta.toString() + ", " + command.toString(),LogWriter.DEBUG3);
 404         }
 405
 406         boolean result = true;
 407
 408         try {
 409                         result = ta.performCommand( command );
 411
 412                         } catch (TransactionError e) {
 415             if (e.code() == TransactionError.DEADLOCK) {
 416
 417                 if (ta.commandCount > 1) {
 418                     env.logWriter.newEntry( this, ta.toString() + " deadlocked; throwing exception...",
 419                             LogWriter.WARN );
 420                     command.result = new DeadlockException( "" );
 421                     result = false;
 422                 } else {
 423                     Random
  rand = new Random  (); 424                     boolean deadlocked = true;
 425                     while (deadlocked) {
 426                         try {
 427                             env.logWriter.newEntry( this, ta.toString() + " aborting... (DEADLOCK)", LogWriter.WARN );
 428                             abortTransaction( ta, command );
 429
 430                             ta.setDeadlocked(false);
 431
 432                             long millis = (long)(rand.nextDouble() * ta.increaseDeadlockWaitTimeMaximum());
 433                             env.logWriter.newEntry( this, ta.toString() + " sleeping " + millis + " milliseconds...",
 434                                     LogWriter.WARN );
 435                             ta.sleep(millis);
 436
 437                             env.logWriter.newEntry( this, ta.toString() + " re-run...", LogWriter.WARN );
 438                             ta.reset();
 439                             result = ta.performCommand( command );
 440
 441                             deadlocked = false;
 442                         }
 443                         catch (TransactionError ee) {
 444                             if (ee.code() == TransactionError.DEADLOCK) {
 445                                                         } else {
 447                                 throw ee;
 448                             }
 449                         }
 450                     }
 451                 }
 452             } else {
 453                                 throw e;
 455             }
 456         } finally {
 457             if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 458                 env.logWriter.newEntry( this, "performCommand(): end: " + ta.toString() + ", " + command.toString(),LogWriter.DEBUG3);
 459             }
 460         }
 461         return result;
 462     }
 463
 464
 465
 470     protected boolean prepareTransaction( Transaction ta, DbCommand command ) throws Exception
  { 471         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 472             env.logWriter.newEntry( this, "prepareTransaction()", LogWriter.DEBUG3 );
 473         }
 474
 475                         if (ta == null) {
 478             env.logWriter.newEntry( this, "prepareTransaction(): Thread is not joined to a transaction.",
 479                     LogWriter.WARN );
 480             command.result = new TransactionException( "Thread is not joined to a transaction.", TransactionException.STATE );
 481             return false;
 482         } else if (ta.status != Transaction.STATUS_STARTED) {
 483             env.logWriter.newEntry( this,
 484                     "prepareTransaction():" + ta.toString() + ": Transaction has inproper status.", LogWriter.WARN );
 485             command.result = new TransactionException( "Transaction has inproper status.", TransactionException.STATE );
 486             return false;
 487         } else if (ta.rollbackOnly) {
 488             env.logWriter.newEntry( this, "prepareTransaction():" + ta.toString() + ": rollback only.",
 489                     LogWriter.WARN );
 490             abortTransaction( ta, command );
 491             command.result = new TransactionException( "Transaction is in rollback only status.", TransactionException.ROLLBACK );
 492             return false;
 493         } else {
 494             try {
 495                                 ta.prepareCommit();
 497                 return true;
 498             } catch (Throwable
  e) { 499                 env.logWriter.newEntry( this, "Prepare transaction failed: " + ta.toString() + "; aborting...", e,
 500                         LogWriter.WARN );
 501                 abortTransaction( ta, command );
 502
 503                 command.result = new TransactionException( e.toString(), TransactionException.ROLLBACK );
 504                 return false;
 505             } finally {
 506                         }
 508         }
 509     }
 510
 511
 512     protected void commitTransaction( Transaction ta, DbCommand command ) throws Exception
  { 513         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 514             env.logWriter.newEntry( this, "commitTransaction()", LogWriter.DEBUG3 );
 515         }
 516
 517                         if (ta == null) {
 520             env.logWriter.newEntry( this, "commitTransaction(): Thread is not joined to a transaction.", LogWriter.WARN );
 521             command.result = new TransactionException( "Thread is not joined to a transaction.", TransactionException.STATE );
 522         } else  if (ta.status != Transaction.STATUS_PREPARED) {
 523             env.logWriter.newEntry( this, "commitTransaction(): " + ta.toString() + ": Transaction has inproper status.", LogWriter.WARN );
 524             command.result = new TransactionException( "Transaction has inproper status.", TransactionException.STATE );
 525         } else {
 526             try {
 527                 beginExclusion();
 528                 ta.commit();
 529             } catch (Throwable
  e) { 530                 env.logWriter.newEntry( this, "Commit transaction failed: " + ta.toString(), e, LogWriter.WARN );
 531                 command.result = e;
 532                 if (e instanceof Error
  ) { 533                     throw (Error
  )e; 534                 } else {
 535                     throw (Exception
  )e; 536                 }
 537             } finally {
 538                 endExclusion();
 539                 notifyWaitingTransactions();
 540             }
 541         }
 542     }
 543
 544
 545     protected void abortTransaction( Transaction ta, DbCommand command ) throws Exception
  { 546         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
 547             env.logWriter.newEntry(this, "abortTransaction()",LogWriter.DEBUG3);
 548         }
 549
 550                         if (ta == null) {
 553             env.logWriter.newEntry( this, "abortTransaction(): Thread is not joined to a transaction.",
 554                     LogWriter.WARN );
 555             command.result = new TransactionException( "Thread is not joined to a transaction.", TransactionException.STATE );
 556         } else if (ta.status >= Transaction.STATUS_COMMITING) {
 557             env.logWriter.newEntry( this, "abortTransaction(): " + ta.toString() + ": Transaction has inproper status.",
 558                     LogWriter.WARN );
 559             command.result = new TransactionException( "Transaction has inproper status.", TransactionException.STATE );
 560         } else {
 561             try {
 562                 beginExclusion();
 563                 ta.abort( command );
 564             } catch (Throwable
  e) { 565                 env.logWriter.newEntry( this, "Aborting transaction failed: " + ta.toString(), e, LogWriter.WARN );
 566                 command.result = e;
 567                 if (e instanceof Error
  ) { 568                     throw (Error
  )e; 569                 } else {
 570                     throw (Exception
  )e; 571                 }
 572             } finally {
 573                 endExclusion();
 574                 notifyWaitingTransactions();
 575             }
 576         }
 577     }
 578
 579
 580
 584     public void checkExclusion() {
 585                         if (exclusiveThread != null && exclusiveThread != Thread.currentThread()) {
 588             synchronized (this) {
 589                 while (exclusiveThread != null && exclusiveThread != Thread.currentThread()) {
 590                     try {
 591                         if (false) {
 592                             env.logWriter.newEntry(this,"checkExclusion(): waiting... ("+currentTA()+")",LogWriter.DEBUG2);
 593                         }
 594                         wait();
 595                         if (false) {
 596                             env.logWriter.newEntry(this,"checkExclusion(): notified... ("+currentTA()+")",LogWriter.DEBUG2);
 597                         }
 598                     } catch (InterruptedException
  e) { 599                     }
 600                 }
 601             }
 602         }
 603     }
 604
 605
 606     protected synchronized void beginExclusion() {
 607         checkExclusion();
 608
 609         Thread
  currentThread = Thread.currentThread(); 610         if (exclusiveThread != null && exclusiveThread != currentThread) {
 611             throw new RuntimeException
  ( "Another thread already runs exclusively." ); 612         }
 613
 614         exclusiveThread = currentThread;
 615
 616         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 617             env.logWriter.newEntry( this, "beginExclusion(): ", LogWriter.DEBUG3 );
 618         }
 619     }
 620
 621
 622     protected synchronized void endExclusion() {
 623         if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
 624             env.logWriter.newEntry( this, "endExclusion(): ", LogWriter.DEBUG3 );
 625         }
 626
 627         Thread
  currentThread = Thread.currentThread(); 628         if (exclusiveThread != currentThread) {
 629             throw new RuntimeException
  ( "Current thread does not run exclusively." ); 630         }
 631
 632         exclusiveThread = null;
 633         notifyAll();
 634     }
 635
 636
 637
 640     public synchronized void checkDeadlocks() {
 641         DeadlockRecognition dr = env.deadlockRecognition();
 642
 643                 int blockedCount = 0;
 645         DxIterator it = taTable.iterator();
 646         for (Transaction ta; (ta = (Transaction)it.next()) != null;) {
 647             if (ta.isBlocked()) {
 648                 blockedCount++;
 649             }
 650         }
 651
 652
 654                 if (blockedCount >= 2) {
 656
 657             try {
 658                                 beginExclusion();
 660
 661                 it = taTable.iterator();
 662                 for (Transaction ta; (ta = (Transaction)it.next()) != null;) {
 663                     Transaction candidate = (Transaction)dr.detectDeadlock( ta );
 664                     if (candidate != null) {
 665                         env.logWriter.newEntry( this, "*** *** DEADLOCK DETECTED: ta=" + ta, LogWriter.WARN );
 666
 667
 668                         if (false) {                         } else {
 671
 675
 676
 681                         }
 682
 683                                                                         deadlockTA = candidate;
 686                                                 synchronized (deadlockTA) {
 688                             deadlockTA.notifyAll();
 689                         }
 690
 691                                                                                                                         return;
 696                     }
 697                 }
 698             } finally {
 699                 endExclusion();
 700             }
 701         }
 702     }
 703
 704
 705
 709     public synchronized boolean isDeadlockTA( Transaction ta ) {
 710         if (ta == deadlockTA) {
 711             deadlockTA = null;
 712             return true;
 713         }
 714         return false;
 715     }
 716
 717
 721     public void startGarbageCollectionWaitForCurrentTransactionsToCompletePhase(GarbageCollector garbageCollector) {
 722         synchronized (this) {
 723             DxIterator i = taTable.elementSet().iterator();
 724
 725             while (i.next()!=null) {
 726                 garbageCollector.addTransactionRequiredToComplete(i.object());
 727             }
 728             garbageCollector.checkForEndOfWaitForCurrentTransactionsToCompletePhase();
 729         }
 730     }
 731 }
 732
 733
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |