1 24 25 package org.continuent.sequoia.controller.recoverylog; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.HashMap ; 30 import java.util.Iterator ; 31 import java.util.LinkedList ; 32 import java.util.List ; 33 34 import javax.management.ObjectName ; 35 36 import org.continuent.sequoia.common.i18n.Translate; 37 import org.continuent.sequoia.common.jmx.JmxConstants; 38 import org.continuent.sequoia.common.jmx.management.BackendState; 39 import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList; 40 import org.continuent.sequoia.common.log.Trace; 41 import org.continuent.sequoia.controller.backend.DatabaseBackend; 42 import org.continuent.sequoia.controller.jmx.MBeanServerManager; 43 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues; 44 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy; 45 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask; 46 import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask; 47 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask; 48 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask; 49 import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask; 50 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask; 51 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask; 52 import org.continuent.sequoia.controller.recoverylog.events.LogEntry; 53 import org.continuent.sequoia.controller.requestmanager.RequestManager; 54 import org.continuent.sequoia.controller.requests.AbstractRequest; 55 import org.continuent.sequoia.controller.requests.StoredProcedure; 56 import org.continuent.sequoia.controller.scheduler.AbstractScheduler; 57 58 67 public class RecoverThread extends Thread 68 { 69 static Trace logger = Trace.getLogger(RecoverThread.class 70 .getName()); 71 72 static Trace endUserLogger = Trace 73 .getLogger("org.continuent.sequoia.enduser"); 74 private RecoveryLog recoveryLog; 75 private DatabaseBackend backend; 76 private RequestManager requestManager; 77 78 private SQLException exception; 80 81 85 private List persistentConnections; 86 87 91 private HashMap tids; 92 93 96 private AbstractScheduler scheduler; 97 98 private String checkpointName; 99 100 101 private int recoveryBatchSize; 102 103 112 public RecoverThread(AbstractScheduler scheduler, RecoveryLog recoveryLog, 113 DatabaseBackend backend, RequestManager requestManager, 114 String checkpointName) 115 { 116 super("RecoverThread for backend " + backend.getName()); 117 this.scheduler = scheduler; 118 this.recoveryLog = recoveryLog; 119 this.backend = backend; 120 this.requestManager = requestManager; 121 this.checkpointName = checkpointName; 122 this.recoveryBatchSize = recoveryLog.getRecoveryBatchSize(); 123 tids = new HashMap (); 124 persistentConnections = new ArrayList (); 125 } 126 127 132 public SQLException getException() 133 { 134 return exception; 135 } 136 137 140 public void run() 141 { 142 backend.setState(BackendState.REPLAYING); 143 try 144 { 145 if (!backend.isInitialized()) 146 backend.initializeConnections(); 147 } 148 catch (SQLException e) 149 { 150 recoveryFailed(e); 151 return; 152 } 153 recoveryLog.beginRecovery(); 156 157 long logIdx; 159 try 160 { 161 logIdx = recoveryLog.getCheckpointLogId(checkpointName); 162 } 163 catch (SQLException e) 164 { 165 recoveryLog.endRecovery(); 166 String msg = Translate.get("recovery.cannot.get.checkpoint", e); 167 logger.error(msg, e); 168 recoveryFailed(new SQLException (msg)); 169 return; 170 } 171 172 try 173 { 174 startRecovery(); 175 176 logger.info(Translate.get("recovery.start.process")); 177 178 LinkedList pendingRecoveryTasks = new LinkedList (); 181 try 182 { 183 logIdx = recover(logIdx, pendingRecoveryTasks); 184 } 185 catch (EndOfRecoveryLogException e) 186 { 187 logIdx = e.getLogIdx(); 188 } 189 190 requestManager.suspendActivity(); 191 192 201 try 202 { 203 recoveryLog 204 .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ..."); 205 } 206 catch (SQLException ignore) 207 { 208 } 209 210 boolean replayedAllLog = false; 212 do 213 { try 215 { 216 logIdx = recover(logIdx, pendingRecoveryTasks); 217 try 221 { 222 recoveryLog 223 .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ..."); 224 } 225 catch (SQLException ignore) 226 { 227 } 228 } 229 catch (EndOfRecoveryLogException e) 230 { 231 replayedAllLog = true; 232 } 233 } 234 while (!replayedAllLog); 235 waitForAllTasksCompletion(pendingRecoveryTasks); 236 } 237 catch (SQLException e) 238 { 239 recoveryFailed(e); 240 requestManager.resumeActivity(); 242 return; 243 } 244 finally 245 { 246 endRecovery(); 247 } 248 249 try 251 { 252 requestManager.getLoadBalancer().enableBackend(backend, true); 253 } 254 catch (SQLException e) 255 { 256 recoveryFailed(e); 257 return; 258 } 259 finally 260 { 261 requestManager.resumeActivity(); 263 } 264 logger.info(Translate.get("backend.state.enabled", backend.getName())); 265 } 266 267 273 private void recoveryFailed(SQLException e) 274 { 275 this.exception = e; 276 277 if (scheduler.isSuspendedWrites()) 278 scheduler.resumeWrites(); 279 280 backend.setLastKnownCheckpoint(null); 281 backend.setState(BackendState.DISABLED); 282 try 283 { 284 backend.finalizeConnections(); 285 } 286 catch (SQLException ignore) 287 { 288 } 289 backend.notifyJmxError( 290 SequoiaNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED, e); 291 } 292 293 306 private long recover(long logIdx, LinkedList pendingRecoveryTasks) 307 throws SQLException , EndOfRecoveryLogException 308 { 309 RecoveryTask recoveryTask = null; 310 AbstractTask abstractTask = null; 311 312 Long tid = null; 313 long previousRemaining = 0; 314 do 316 { 317 try 318 { 319 recoveryTask = recoveryLog.recoverNextRequest(logIdx, scheduler); 320 } 321 catch (SQLException e) 322 { 323 recoveryLog.endRecovery(); 325 addWorkerTask(new KillThreadTask(1, 1)); 326 String msg = Translate.get("recovery.cannot.recover.from.index", e); 327 logger.error(msg, e); 328 throw new SQLException (msg); 329 } 330 if (recoveryTask == null) 331 throw new EndOfRecoveryLogException(logIdx); 332 333 abstractTask = recoveryTask.getTask(); 334 if (abstractTask == null) 335 throw new SQLException ( 336 "Unexpected null abstract task in recovery task " + recoveryTask); 337 338 if (LogEntry.EXECUTING.equals(recoveryTask.getStatus())) 339 { 340 break; 343 } 344 345 if (!LogEntry.SUCCESS.equals(recoveryTask.getStatus())) 346 { if (!(abstractTask.getRequest() instanceof StoredProcedure)) 349 { 350 logIdx++; 351 continue; 352 } 353 } 354 if ((logIdx % 1000) == 0) 355 { 356 long remaining = recoveryLog.getCurrentLogId() - logIdx; 357 endUserLogger.info("Recovering log entry " + logIdx 358 + " remaining entries " + remaining); 359 if (previousRemaining > 0 && remaining > previousRemaining) 360 { 361 endUserLogger.warn("Recovery falling behind pending requests =" 362 + pendingRecoveryTasks.size()); 363 } 364 previousRemaining = remaining; 365 } 366 if (abstractTask.isPersistentConnection()) 367 { 368 long cid = abstractTask.getPersistentConnectionId(); 369 if (abstractTask instanceof OpenPersistentConnectionTask) 370 persistentConnections.add(new Long (cid)); 371 else if (abstractTask instanceof ClosePersistentConnectionTask) 372 persistentConnections.remove(new Long (cid)); 373 else if (!persistentConnections.contains(new Long (cid))) 374 { 375 386 logIdx++; 387 continue; 388 } 389 } 390 391 AbstractRequest request = null; 393 boolean mustCheckOrder = false; 394 if (!abstractTask.isAutoCommit()) 395 { 396 tid = new Long (recoveryTask.getTid()); 397 if (abstractTask instanceof BeginTask) 398 { 399 mustCheckOrder = true; 400 if (tids.containsKey(tid)) 401 { 402 logIdx++; 405 continue; 406 } 407 tids.put(tid, abstractTask.getRequest()); 408 } 409 else 410 { 411 request = (AbstractRequest) tids.get(tid); 412 if (request == null) 413 { 414 419 logIdx++; 420 continue; 421 } 422 if (abstractTask instanceof RollbackTask) 423 { 424 ((RollbackTask) abstractTask).getTransactionMetaData().setLogin( 426 request.getLogin()); 427 } 428 abstractTask 430 .setPersistentConnection(request.isPersistentConnection()); 431 abstractTask.setPersistentConnectionId(request 432 .getPersistentConnectionId()); 433 } 434 } else 436 mustCheckOrder = true; 437 438 if ((abstractTask instanceof CommitTask) 439 || (abstractTask instanceof RollbackTask)) 440 { 441 tids.remove(tid); 442 } 443 444 logIdx = recoveryTask.getId(); 445 451 if (mustCheckOrder) 452 { 453 for (Iterator iter = pendingRecoveryTasks.iterator(); iter.hasNext();) 454 { 455 RecoveryTask blocker = (RecoveryTask) iter.next(); 456 if (blocker.getCompletionLogId() > 0 457 && blocker.getCompletionLogId() < logIdx) 458 { 459 AbstractTask blockerTask = blocker.getTask(); 460 if (blockerTask.isAutoCommit() 461 || (blockerTask instanceof CommitTask) 462 || (blockerTask instanceof RollbackTask)) 463 { 464 blockerTask = blocker.getTask(); 465 synchronized (blockerTask) 466 { 467 while (!blockerTask.hasFullyCompleted()) 468 try 469 { 470 blockerTask.wait(); 471 } 472 catch (InterruptedException e) 473 { 474 } 475 } 476 } 477 } 478 } 479 } 480 481 addWorkerTask(abstractTask); 482 483 pendingRecoveryTasks.addLast(recoveryTask); 485 486 do 487 { 488 for (Iterator iter = pendingRecoveryTasks.iterator(); iter.hasNext();) 491 { 492 recoveryTask = (RecoveryTask) iter.next(); 493 abstractTask = recoveryTask.getTask(); 494 if (abstractTask.hasFullyCompleted()) 495 { iter.remove(); 497 498 if (LogEntry.SUCCESS.equals(recoveryTask.getStatus())) 499 { 501 if (abstractTask.getFailed() > 0) 502 { String msg; 505 if (abstractTask.isAutoCommit()) 506 msg = Translate.get("recovery.failed.with.error", 507 new Object []{ 508 abstractTask, 509 ((Exception ) abstractTask.getExceptions().get(0)) 510 .getMessage()}); 511 else 512 msg = Translate.get("recovery.failed.with.error.transaction", 513 new Object []{ 514 Long.toString(abstractTask.getTransactionId()), 515 abstractTask, 516 ((Exception ) abstractTask.getExceptions().get(0)) 517 .getMessage()}); 518 recoveryLog.endRecovery(); 519 addWorkerTask(new KillThreadTask(1, 1)); 520 pendingRecoveryTasks.clear(); 521 logger.error(msg); 522 throw new SQLException (msg); 523 } 524 } 525 } 526 } 527 528 536 if (tids.isEmpty() && persistentConnections.isEmpty() 537 && pendingRecoveryTasks.size() >= recoveryBatchSize) 538 try 539 { 540 recoveryTask = (RecoveryTask) pendingRecoveryTasks.getFirst(); 541 abstractTask = recoveryTask.getTask(); 542 synchronized (abstractTask) 543 { 544 if (!abstractTask.hasFullyCompleted()) 545 abstractTask.wait(); 546 } 547 } 548 catch (InterruptedException e) 549 { 550 break; 551 } 552 else 553 break; 554 } 555 while (true); 556 } 557 while (logIdx != -1); 559 return logIdx; 560 } 561 562 570 private void waitForAllTasksCompletion(LinkedList pendingRecoveryTasks) 571 throws SQLException 572 { 573 RecoveryTask recoveryTask; 574 AbstractTask abstractTask; 575 576 while (!pendingRecoveryTasks.isEmpty()) 577 { 578 recoveryTask = (RecoveryTask) pendingRecoveryTasks.removeFirst(); 579 abstractTask = recoveryTask.getTask(); 580 synchronized (abstractTask) 581 { 582 while (!abstractTask.hasFullyCompleted()) 584 try 585 { 586 abstractTask.wait(); 587 } 588 catch (InterruptedException ignore) 589 { 590 } 591 592 if (LogEntry.SUCCESS.equals(recoveryTask.getStatus())) 593 { if (abstractTask.getFailed() > 0) 595 { recoveryLog.endRecovery(); 598 addWorkerTask(new KillThreadTask(1, 1)); 599 pendingRecoveryTasks.clear(); 600 String msg; 601 if (abstractTask.isAutoCommit()) 602 msg = Translate.get("recovery.failed.with.error", new Object []{ 603 abstractTask, 604 ((Exception ) abstractTask.getExceptions().get(0)) 605 .getMessage()}); 606 else 607 msg = Translate.get("recovery.failed.with.error.transaction", 608 new Object []{ 609 Long.toString(abstractTask.getTransactionId()), 610 abstractTask, 611 ((Exception ) abstractTask.getExceptions().get(0)) 612 .getMessage()}); 613 logger.error(msg); 614 throw new SQLException (msg); 615 } 616 } 617 } 618 } 619 } 620 621 626 private void addWorkerTask(AbstractTask task) 627 { 628 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task, 629 recoveryBatchSize); 630 } 631 632 638 private void endRecovery() 639 { 640 logger.info(Translate.get("recovery.process.complete")); 642 backend.terminateWorkerThreads(); 643 644 recoveryLog.endRecovery(); 645 } 646 647 664 private void startRecovery() 665 { 666 try 667 { 668 ObjectName taskQueuesObjectName = JmxConstants 669 .getBackendTaskQueuesObjectName(backend.getVirtualDatabaseName(), 670 backend.getName()); 671 if (MBeanServerManager.getInstance().isRegistered(taskQueuesObjectName)) 672 { 673 MBeanServerManager.unregister(JmxConstants 674 .getBackendTaskQueuesObjectName(backend.getVirtualDatabaseName(), 675 backend.getName())); 676 } 677 } 678 catch (Exception e) 679 { 680 if (logger.isWarnEnabled()) 681 { 682 logger.warn("Exception while unregistering backend task queues mbean", 683 e); 684 } 685 } 686 boolean enforceTableLocking = requestManager.getLoadBalancer().waitForCompletionPolicy 688 .isEnforceTableLocking(); 689 backend.setTaskQueues(new BackendTaskQueues(backend, 690 new WaitForCompletionPolicy(WaitForCompletionPolicy.FIRST, 691 enforceTableLocking, 0), requestManager)); 692 backend.startWorkerThreads(requestManager.getLoadBalancer()); 693 } 694 695 701 private class EndOfRecoveryLogException extends Exception 702 { 703 private static final long serialVersionUID = 2826202288239306426L; 704 private long logIdx; 705 706 711 public EndOfRecoveryLogException(long logIdx) 712 { 713 this.logIdx = logIdx; 714 } 715 716 721 public long getLogIdx() 722 { 723 return logIdx; 724 } 725 } 726 727 } 728 | Popular Tags |