1 24 25 package org.objectweb.cjdbc.controller.recoverylog; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.LinkedList ; 30 31 import org.objectweb.cjdbc.common.i18n.Translate; 32 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList; 33 import org.objectweb.cjdbc.common.log.Trace; 34 import org.objectweb.cjdbc.common.shared.BackendState; 35 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 36 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 37 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 38 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 39 import org.objectweb.cjdbc.controller.loadbalancer.tasks.BeginTask; 40 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 41 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 42 43 52 public class RecoverThread extends Thread 53 { 54 static Trace logger = Trace.getLogger(RecoverThread.class 55 .getName()); 56 57 private RecoveryLog recoveryLog; 58 private DatabaseBackend backend; 59 private AbstractLoadBalancer loadBalancer; 60 private SQLException exception; 61 62 private BackendWorkerThread bwt; 63 private ArrayList tids; 64 65 private AbstractScheduler scheduler; 66 67 private String checkpointName; 68 69 70 private int recoveryBatchSize; 71 72 81 public RecoverThread(AbstractScheduler scheduler, RecoveryLog recoveryLog, 82 DatabaseBackend backend, AbstractLoadBalancer loadBalancer, 83 String checkpointName) 84 { 85 this.scheduler = scheduler; 86 this.recoveryLog = recoveryLog; 87 this.backend = backend; 88 this.loadBalancer = loadBalancer; 89 this.checkpointName = checkpointName; 90 this.recoveryBatchSize = recoveryLog.getRecoveryBatchSize(); 91 tids = new ArrayList (); 92 } 93 94 99 public SQLException getException() 100 { 101 return exception; 102 } 103 104 107 public void run() 108 { 109 backend.setState(BackendState.REPLAYING); 110 try 111 { 112 backend.initializeConnections(); 113 } 114 catch (SQLException e) 115 { 116 recoveryFailed(e); 117 return; 118 } 119 recoveryLog.beginRecovery(); 120 121 long logIdx; 123 try 124 { 125 logIdx = recoveryLog.getCheckpointRequestId(checkpointName); 126 } 127 catch (SQLException e) 128 { 129 recoveryLog.endRecovery(); 130 String msg = Translate.get("recovery.cannot.get.checkpoint", e); 131 logger.error(msg); 132 recoveryFailed(new SQLException (msg)); 133 return; 134 } 135 136 try 137 { 138 startRecovery(); 139 140 logIdx = recover(logIdx); 143 144 scheduler.suspendWrites(); 146 147 logIdx = recover(logIdx); 149 150 } 151 catch (SQLException e) 152 { 153 recoveryFailed(e); 154 return; 155 } 156 finally 157 { 158 endRecovery(); 159 } 160 161 try 163 { 164 loadBalancer.enableBackend(backend, true); 165 scheduler.resumeWrites(); 166 } 167 catch (SQLException e) 168 { 169 recoveryFailed(e); 170 return; 171 } 172 logger.info(Translate.get("backend.state.enabled", backend.getName())); 173 } 174 175 181 private void recoveryFailed(SQLException e) 182 { 183 this.exception = e; 184 185 if (scheduler.isSuspendedWrites()) 186 scheduler.resumeWrites(); 187 188 backend.setLastKnownCheckpoint(null); 189 backend.setState(BackendState.DISABLED); 190 backend.notifyJmxError( 191 CjdbcNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED, e); 192 } 193 194 206 private long recover(long logIdx) throws SQLException 207 { 208 if (bwt == null) 209 throw new RuntimeException ( 210 "No BackendWorkerThread to recover, you should have called RecoveryLog.startRecovery()"); 211 RecoveryTask recoveryTask = null; 212 AbstractTask abstractTask = null; 213 214 logger.info(Translate.get("recovery.start.process")); 215 216 long tid; 217 LinkedList pendingRecoveryTasks = new LinkedList (); 218 while (logIdx != -1) 220 { 221 try 222 { 223 recoveryTask = recoveryLog.recoverNextRequest(logIdx); 224 } 225 catch (SQLException e) 226 { 227 recoveryLog.endRecovery(); 229 addWorkerTask(bwt, new KillThreadTask(1, 1)); 230 String msg = Translate.get("recovery.cannot.recover.from.index", e); 231 logger.error(msg, e); 232 throw new SQLException (msg); 233 } 234 if (recoveryTask == null) 235 break; 236 tid = recoveryTask.getTid(); 237 if (tid != 0) 238 { 239 if (recoveryTask.getTask() instanceof BeginTask) 240 tids.add(new Long (tid)); 241 else if (!tids.contains(new Long (tid))) 242 { 243 248 logIdx++; 249 continue; 250 } 251 } 253 abstractTask = recoveryTask.getTask(); 254 logIdx = recoveryTask.getId(); 255 addWorkerTask(bwt, abstractTask); 257 pendingRecoveryTasks.addLast(abstractTask); 259 260 do 263 { abstractTask = (AbstractTask) pendingRecoveryTasks.getFirst(); 265 if (abstractTask.hasFullyCompleted()) 266 { 267 pendingRecoveryTasks.removeFirst(); 269 if (abstractTask.getFailed() > 0) 270 { recoveryLog.endRecovery(); 273 addWorkerTask(bwt, new KillThreadTask(1, 1)); 274 pendingRecoveryTasks.clear(); 275 String msg = Translate.get("recovery.failed.with.error", 276 new String []{ 277 abstractTask.toString(), 278 ((Exception ) abstractTask.getExceptions().get(0)) 279 .getMessage()}); 280 logger.error(msg); 281 throw new SQLException (msg); 282 } 283 } 284 else 285 { if (pendingRecoveryTasks.size() > recoveryBatchSize) 287 { synchronized (abstractTask) 289 { 290 if (!abstractTask.hasFullyCompleted()) 291 try 292 { 293 abstractTask.wait(); 294 } 295 catch (InterruptedException ignore) 296 { 297 } 298 continue; 300 } 301 } 302 else 303 break; 306 } 307 } 308 while (!pendingRecoveryTasks.isEmpty()); 309 } 311 while (!pendingRecoveryTasks.isEmpty()) 314 { 315 abstractTask = (AbstractTask) pendingRecoveryTasks.remove(0); 316 synchronized (abstractTask) 317 { 318 while (!abstractTask.hasFullyCompleted()) 320 try 321 { 322 abstractTask.wait(); 323 } 324 catch (InterruptedException ignore) 325 { 326 } 327 328 if (abstractTask.getFailed() > 0) 330 { recoveryLog.endRecovery(); 333 addWorkerTask(bwt, new KillThreadTask(1, 1)); 334 pendingRecoveryTasks.clear(); 335 String msg = Translate.get("recovery.failed.with.error", 336 new String []{ 337 abstractTask.toString(), 338 ((Exception ) abstractTask.getExceptions().get(0)) 339 .getMessage()}); 340 logger.error(msg); 341 throw new SQLException (msg); 342 } 343 } 344 } 345 return logIdx; 346 } 347 348 354 private void addWorkerTask(BackendWorkerThread bwt, AbstractTask task) 355 { 356 synchronized (bwt) 357 { 358 bwt.addTask(task); 359 bwt.notify(); 360 } 361 } 362 363 369 public void endRecovery() 370 { 371 logger.info(Translate.get("recovery.process.complete")); 373 if (bwt != null) 374 { 375 addWorkerTask(bwt, new KillThreadTask(1, 1)); 376 try 377 { 378 bwt.join(); 379 } 380 catch (InterruptedException e) 381 { 382 recoveryLog.endRecovery(); 383 String msg = Translate.get("recovery.join.failed", e); 384 logger.error(msg, e); 385 exception = new SQLException (msg); 386 } 387 } 388 389 recoveryLog.endRecovery(); 390 } 391 392 399 public void startRecovery() throws SQLException 400 { 401 bwt = new BackendWorkerThread("Worker thread for recovery on backend:" 402 + backend.getName(), backend, loadBalancer); 403 bwt.start(); 404 } 405 } | Popular Tags |