1 23 24 28 29 package com.sun.jts.CosTransactions; 30 31 import java.util.*; 32 import java.io.*; 33 34 import org.omg.CORBA.*; 35 import org.omg.CosTransactions.*; 36 37 import com.sun.jts.jtsxa.*; 38 39 import javax.transaction.xa.*; 40 import com.sun.jts.jta.TransactionManagerImpl; 41 42 import com.sun.jts.trace.*; 43 import java.util.logging.Logger ; 44 import java.util.logging.Level ; 45 import com.sun.logging.LogDomains; 46 import com.sun.jts.utils.LogFormatter; 47 59 60 public class DelegatedRecoveryManager { 61 62 private static Hashtable recoveryStatetable = new Hashtable(); 63 private static Hashtable tmoutMgrtable = new Hashtable(); 64 65 synchronized static DelegatedTimeoutManager getTimeoutManager(String logPath) { 66 DelegatedTimeoutManager tmoutMgr = (DelegatedTimeoutManager)tmoutMgrtable.get(logPath); 67 if (tmoutMgr != null) 68 return tmoutMgr; 69 tmoutMgr = new DelegatedTimeoutManager(logPath); 70 tmoutMgrtable.put(logPath,tmoutMgr); 71 return tmoutMgr; 72 } 73 74 static Logger _logger = LogDomains.getLogger(LogDomains.TRANSACTION_LOGGER); 75 76 static boolean addCoordinator(GlobalTID globalTID, 77 Long localTID, CoordinatorImpl coord, int timeout, String logPath) { 78 79 80 boolean result = true; 81 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 82 83 86 state.coordsByGlobalTID.put(globalTID,coord); 87 state.coordsByLocalTID.put(localTID,coord); 88 89 94 if (timeout != 0) { 95 DelegatedTimeoutManager tmoutMgr = getTimeoutManager(logPath); 96 tmoutMgr.setTimeout(localTID, DelegatedTimeoutManager.ACTIVE_TIMEOUT, 97 timeout); 98 } 99 100 return result; 101 } 102 103 static boolean removeCoordinator(GlobalTID globalTID, 104 Long localTID, boolean aborted, String logPath) { 105 106 boolean result = false; 107 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 108 109 111 CoordinatorImpl coord = null; 112 result = (state.coordsByGlobalTID.remove(globalTID) != null); 113 114 116 if (result) { 117 coord = (CoordinatorImpl) state.coordsByLocalTID.remove(localTID); 118 result = (coord != null); 119 } 120 121 126 if (coord != null) { 127 try { 128 if (coord.is_top_level_transaction()) { 129 CoordinatorLog.removeLog(localTID, logPath); 130 } 131 } catch(SystemException exc) { 132 result = false; 133 } 134 } 135 136 139 DelegatedTimeoutManager tmoutMgr = getTimeoutManager(logPath); 140 tmoutMgr.setTimeout(localTID, DelegatedTimeoutManager.CANCEL_TIMEOUT, 0); 141 142 143 144 147 148 149 155 158 if (state.resyncCoords > 0) { 159 160 state.resyncCoords--; 161 162 165 if (state.resyncCoords == 0) { 166 try { 167 resyncComplete(true, true, logPath); 168 } catch (Throwable exc) {} 169 } 170 } 171 172 return result; 173 } 174 175 176 static CoordinatorImpl getCoordinator(GlobalTID globalTID, String logPath) { 177 178 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 179 CoordinatorImpl result = (CoordinatorImpl) 180 state.coordsByGlobalTID.get(globalTID); 181 182 return result; 183 } 184 185 186 public static boolean delegated_recover(String logPath, XAResource[] resources) throws Exception { 187 try { 188 File recoveryFile = new File(logPath,LogControl.RECOVERY_STRING_FILE_NAME); 189 RandomAccessFile raf = new RandomAccessFile(recoveryFile,"r"); 190 long length = raf.length(); 191 byte b1[] = new byte[(int)length]; raf.readFully(b1); 193 String serverName = new String (b1); 194 raf.close(); 195 return delegated_recover(serverName,logPath,resources); 196 } catch (IOException ex) { 197 _logger.log(Level.WARNING,"jts.exception_in_recovery_file_handling",ex); 198 throw ex; 199 } 201 } 202 public static boolean delegated_recover(String serverName, String logPath, XAResource[] resources) throws Exception { 203 if (logPath == null || serverName == null) { 204 return false; 205 } 206 Configuration.setServerName(logPath,serverName); 207 boolean result = false; 208 boolean keypointRequired = false; 209 RecoveryStateHolder state = new RecoveryStateHolder(); 210 recoveryStatetable.put(logPath,state); 211 Enumeration logRecords = CoordinatorLog.getLogged(logPath); 212 while (logRecords.hasMoreElements()) { 213 keypointRequired = true; 214 try { 215 (new TopCoordinator()).delegated_reconstruct((CoordinatorLog) logRecords.nextElement(), logPath); 216 } catch(Exception exc) { 217 _logger.log(Level.SEVERE,"jts.recovery_in_doubt_exception",exc); 218 _logger.log(Level.SEVERE,"jts.recovery_in_doubt",exc.toString()); 219 String msg = LogFormatter.getLocalizedMessage(_logger, "jts.recovery_in_doubt", 220 new java.lang.Object [] {exc.toString()}); 221 throw new org.omg.CORBA.INTERNAL (msg); 222 } 223 } 224 225 226 int size = resources.length; 227 Vector v = new Vector(); 228 for (int i=0; i<size; i++) { 229 v.addElement(resources[i]); 230 } 231 state.uniqueRMSet = getUniqueRMSet(v.elements()); 232 proceedWithXARecovery(logPath); 233 state.recoveryInProgress.post(); 234 235 238 result = state.coordsByGlobalTID.size() > 0; 239 if (!result) { 240 try { 241 resyncComplete(false,keypointRequired,logPath); 242 } catch(Throwable exc) {} 243 } 244 245 if (result) 246 resync(logPath); 247 return true; 248 } 249 250 static void resync(String logPath) { 251 252 258 265 RecoveryStateHolder recoveryState = (RecoveryStateHolder)recoveryStatetable.get(logPath); 266 267 recoveryState.resyncCoords = recoveryState.coordsByGlobalTID.size(); 268 Enumeration resyncList = 269 ((Hashtable) recoveryState.coordsByGlobalTID.clone()).elements(); 270 271 boolean isRoot[] = new boolean[1]; 272 273 276 while (resyncList.hasMoreElements()) { 277 278 TopCoordinator coord = (TopCoordinator)resyncList.nextElement(); 279 280 try { 281 282 284 synchronized (coord) { 285 286 Status state = coord.recover(isRoot); 287 288 if (state == Status.StatusUnknown) { 289 290 301 DelegatedTimeoutManager tmoutMgr = getTimeoutManager(logPath); 302 tmoutMgr.setTimeout( 303 new Long (coord.getLocalTID()), 304 DelegatedTimeoutManager.IN_DOUBT_TIMEOUT, 305 60); 306 307 } else if (state == Status.StatusCommitted) { 308 309 if(_logger.isLoggable(Level.FINE)) { 317 _logger.logp(Level.FINE,"DelegatedRecoveryManager","resync()", 318 "Before invoking commit on the reconstructed coordinator"+ 319 "GTID is: "+ 320 ((TopCoordinator)coord).superInfo.globalTID.toString()); 321 322 } 323 324 325 try { 326 coord.commit(); 327 } catch (Throwable exc) { 328 _logger.log(Level.WARNING,"jts.exception_during_resync", 329 new java.lang.Object [] {exc.toString(),"commit"}); 330 } 331 332 if (isRoot[0]) { 333 try { 334 coord.afterCompletion(state); 335 } catch (Throwable exc) { 336 _logger.log(Level.WARNING,"jts.exception_during_resync", 337 new java.lang.Object [] {exc.toString(), 338 "after_completion"}); 339 } 340 } 341 342 } else { 343 344 346 try { 347 if(_logger.isLoggable(Level.FINE)) { 348 _logger.logp(Level.FINE,"DelegatedRecoveryManager","resync()", 349 "Before invoking rollback on the"+ 350 "reconstructed coordinator :"+ 351 "GTID is : "+ 352 ((TopCoordinator)coord).superInfo.globalTID.toString()); 353 354 } 355 coord.rollback(true); 356 } catch (Throwable exc) { 357 _logger.log(Level.WARNING,"jts.resync_failed", 358 new java.lang.Object [] {exc.toString(),"rollback"}); 359 } 360 361 if (isRoot[0]) { 362 try { 363 coord.afterCompletion(Status.StatusRolledBack); 364 } catch (Throwable exc) { 365 _logger.log(Level.WARNING,"jts.resync_failed", 366 new java.lang.Object [] 367 { exc.toString(), "after_completion"}); 368 } 369 } 370 } 371 } 372 } catch (Throwable exc) {} 373 } 374 375 } 379 380 381 private static void resyncComplete(boolean resynced, 382 boolean keypointRequired, String logPath) throws LogicErrorException { 383 384 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 385 388 391 393 if (keypointRequired) { 394 CoordinatorLog.keypoint(logPath); 395 } 396 397 399 state.resyncInProgress.post(); 400 state.resyncInProgress = null; 401 } 402 403 415 416 static CoordinatorImpl getLocalCoordinator(Long localTID, String logPath) { 417 418 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 419 CoordinatorImpl result = (CoordinatorImpl) 420 state.coordsByLocalTID.get(localTID); 421 422 return result; 423 } 424 425 437 438 static boolean validLocalTID(Long localTID, String logPath) { 439 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 440 441 boolean result = state.coordsByLocalTID.containsKey(localTID); 442 443 return result; 444 } 445 446 460 static void shutdown(boolean immediate) { 461 462 463 Enumeration keys = recoveryStatetable.keys(); 464 if (keys.hasMoreElements()) { 465 String logPath = (String )keys.nextElement(); 466 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 467 if (immediate) { 468 470 } else { 471 472 474 if (state.resyncInProgress != null) { 475 try { 476 state.resyncInProgress.waitEvent(); 477 } catch (InterruptedException exc) {} 478 } 479 } 480 481 484 487 if (!immediate) { 488 CoordinatorLog.keypoint(logPath); 489 CoordinatorLog.finalizeAll(logPath); 490 } 491 492 } 494 } 495 496 501 private static String stringifyXid(Xid xid) { 502 int glen = xid.getGlobalTransactionId().length; 503 int blen = xid.getBranchQualifier().length; 504 byte[] xidRep = new byte[glen + 1 + blen]; 505 506 System.arraycopy(xid.getGlobalTransactionId(), 0, xidRep, 0, glen); 507 xidRep[glen] = (byte) ','; 508 System.arraycopy(xid.getBranchQualifier(), 0, xidRep, glen + 1, blen); 509 510 return new String (xidRep); 511 } 512 513 517 private static Enumeration getUniqueRMSet(Enumeration xaResourceList){ 518 519 Vector uniqueRMList = new Vector(); 520 521 while (xaResourceList.hasMoreElements()) { 522 XAResource xaRes = (XAResource) xaResourceList.nextElement(); 523 int size = uniqueRMList.size(); 524 boolean match = false; 525 for (int i = 0; i < size; i++) { XAResource uniqueXaRes = (XAResource) uniqueRMList.elementAt(i); 527 try { 528 if (xaRes.isSameRM(uniqueXaRes)) { 529 match = true; 530 break; 531 } 532 } catch (XAException xe) {} 533 } 534 if (!match) { 535 uniqueRMList.add(xaRes); 536 } 537 } 538 539 return uniqueRMList.elements(); 540 } 541 542 543 550 private static void proceedWithXARecovery(String logPath) { 551 552 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 553 554 555 556 Enumeration xaResources = state.uniqueRMSet; 557 558 if (xaResources == null) { 560 return; 561 } 562 563 Vector otsResources = new Vector(); 564 Map uniqueXids = new Hashtable(); 565 566 while (xaResources.hasMoreElements()) { 567 568 XAResource xaResource = (XAResource) xaResources.nextElement(); 569 570 573 Xid[] inDoubtXids = RecoveryManager.getInDoubtXids(xaResource); 574 uniqueXids.clear(); 575 if (inDoubtXids == null || inDoubtXids.length == 0) { 576 break; } 578 579 for (int i = 0; i < inDoubtXids.length; i++) { 580 581 583 String branchQualifier = 584 new String (inDoubtXids[i].getBranchQualifier()); 585 String serverName = Configuration.getServerName(logPath); 586 587 if (branchQualifier.startsWith(serverName)) { 588 589 597 String xidStr = stringifyXid(inDoubtXids[i]); 598 if (uniqueXids.get(xidStr) == null) { 600 uniqueXids.put(xidStr, xidStr); 602 otsResources.addElement( 606 new OTSResourceImpl(inDoubtXids[i], 607 xaResource, null 608 ).getCORBAObjReference()); 609 } 610 } 611 } 612 } 613 614 617 for (int i = 0; i < otsResources.size(); i++) { 618 619 OTSResource otsResource = (OTSResource) otsResources.elementAt(i); 620 GlobalTID globalTID = new GlobalTID(otsResource.getGlobalTID()); 621 TopCoordinator coord = 622 (TopCoordinator) state.coordsByGlobalTID.get(globalTID); 623 624 if (coord == null) { 625 if(_logger.isLoggable(Level.FINE)) { 629 _logger.logp(Level.FINE,"DelegatedRecoveryManager","proceedWithXARecovery()", 630 "Could not recognize OTSResource: "+otsResource + 631 " with tid: " + 632 LogFormatter.convertToString(globalTID.realTID.tid)+ 633 ";Hence rolling this resource back..."); 634 } 635 boolean infiniteRetry = true; 636 int commitRetries = Configuration.getRetries(); 637 if (commitRetries >= 0) 638 infiniteRetry = false; 639 int commitRetriesLeft = commitRetries; 640 boolean exceptionisThrown = true; 641 while (exceptionisThrown) { 642 try { 643 otsResource.rollback(); 644 exceptionisThrown = false; 645 } catch (Throwable exc) { 646 if ((exc instanceof COMM_FAILURE) || (exc instanceof TRANSIENT)) { 647 if (commitRetriesLeft > 0 || infiniteRetry) { 648 if (!infiniteRetry) { 651 commitRetriesLeft--; 652 } 653 654 try { 655 Thread.sleep(Configuration.COMMIT_RETRY_WAIT); 656 } catch( Throwable e ) {} 657 } 658 else { 659 _logger.log(Level.WARNING,"jts.exception_during_resync", 660 new java.lang.Object [] {exc.toString(),"OTSResource rollback"}); 661 exceptionisThrown = false; 662 } 663 } 664 else { 665 _logger.log(Level.WARNING,"jts.exception_during_resync", 666 new java.lang.Object [] {exc.toString(),"OTSResource rollback"}); 667 exceptionisThrown = false; 668 } 669 } 670 } 671 } else { 672 681 if(_logger.isLoggable(Level.FINE)) { 684 _logger.logp(Level.FINE,"DelegatedRecoveryManager", 685 "proceedWithXARecovery()", 686 "Recognized OTSResource: " + otsResource + 687 " with tid: " + 688 LogFormatter.convertToString(globalTID.realTID.tid) + 689 ";Hence registering this resource with coordinator..."); 690 } 691 coord.directRegisterResource(otsResource); 692 } 693 } 694 } 695 696 697 706 static CoordinatorImpl[] getCoordinators(String logPath) { 707 708 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 709 int size = state.coordsByGlobalTID.size(); 710 CoordinatorImpl[] result = new CoordinatorImpl[size]; 711 712 Enumeration coords = state.coordsByGlobalTID.elements(); 713 714 for(int pos = 0;pos<size;){ 715 result[pos++] = (CoordinatorImpl) coords.nextElement(); 716 } 717 718 return result; 719 } 720 721 722 static Hashtable getCoordsByGlobalTID(String logPath) { 723 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 724 return state.coordsByGlobalTID; 725 } 726 727 728 729 public static void waitForRecovery(String logPath) { 730 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 731 732 if (state.recoveryInProgress != null) { 733 try { 734 state.recoveryInProgress.waitEvent(); 735 } catch (InterruptedException exc) { 736 _logger.log(Level.SEVERE,"jts.wait_for_resync_complete_interrupted"); 737 String msg = LogFormatter.getLocalizedMessage(_logger, 738 "jts.wait_for_resync_complete_interrupted"); 739 throw new org.omg.CORBA.INTERNAL (msg); 740 } 741 } 742 } 743 744 753 public static void waitForResync(String logPath) { 754 RecoveryStateHolder state = (RecoveryStateHolder)recoveryStatetable.get(logPath); 755 if (state.resyncInProgress != null) { 756 try { 757 state.resyncInProgress.waitEvent(); 758 } catch (InterruptedException exc) { 759 _logger.log(Level.SEVERE,"jts.wait_for_resync_complete_interrupted"); 760 String msg = LogFormatter.getLocalizedMessage(_logger, 761 "jts.wait_for_resync_complete_interrupted"); 762 throw new org.omg.CORBA.INTERNAL (msg); 763 } 764 } 765 } 766 767 } 768 769 class RecoveryStateHolder { 770 771 774 Enumeration uniqueRMSet = null; 775 776 781 int resyncCoords = 0; 782 783 787 EventSemaphore resyncInProgress = new EventSemaphore(); 788 789 793 EventSemaphore recoveryInProgress = new EventSemaphore(); 794 795 Hashtable coordsByGlobalTID = new Hashtable(); 796 Hashtable coordsByLocalTID = new Hashtable(); 797 Hashtable transactionIds = new Hashtable(); 798 } 799 | Popular Tags |