KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > recoverylog > RecoverThread


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Nicolas Modrzyk.
22  * Contributor(s): Emmanuel Cecchet.
23  */

24
25 package org.continuent.sequoia.controller.recoverylog;
26
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.LinkedList JavaDoc;
32 import java.util.List JavaDoc;
33
34 import javax.management.ObjectName JavaDoc;
35
36 import org.continuent.sequoia.common.i18n.Translate;
37 import org.continuent.sequoia.common.jmx.JmxConstants;
38 import org.continuent.sequoia.common.jmx.management.BackendState;
39 import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
40 import org.continuent.sequoia.common.log.Trace;
41 import org.continuent.sequoia.controller.backend.DatabaseBackend;
42 import org.continuent.sequoia.controller.jmx.MBeanServerManager;
43 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
44 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
45 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
46 import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
47 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
48 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
49 import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
50 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
51 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
52 import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
53 import org.continuent.sequoia.controller.requestmanager.RequestManager;
54 import org.continuent.sequoia.controller.requests.AbstractRequest;
55 import org.continuent.sequoia.controller.requests.StoredProcedure;
56 import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
57
58 /**
59  * This class defines a RecoverThread that is in charge of replaying the
60  * recovery log on a given backend to re-synchronize it with the other nodes of
61  * the cluster.
62  *
63  * @author <a HREF="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
64  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
65  * @version 1.0
66  */

67 public class RecoverThread extends Thread JavaDoc
68 {
69   static Trace logger = Trace.getLogger(RecoverThread.class
70                                               .getName());
71   /** end user logger */
72   static Trace endUserLogger = Trace
73                                               .getLogger("org.continuent.sequoia.enduser");
74   private RecoveryLog recoveryLog;
75   private DatabaseBackend backend;
76   private RequestManager requestManager;
77
78   // an exception thrown while recovering
79
private SQLException JavaDoc exception;
80
81   /**
82    * a List&lt;Long&gt; of persistent connection IDs that are re-opened during
83    * recovery
84    */

85   private List JavaDoc persistentConnections;
86
87   /**
88    * HashMap of transaction IDs which are replayed during recovery (key is
89    * transaction id, value is login)
90    */

91   private HashMap JavaDoc tids;
92
93   /**
94    * The scheduler used to suspend writes during the recovery process
95    */

96   private AbstractScheduler scheduler;
97
98   private String JavaDoc checkpointName;
99
100   /** Size of the pendingRecoveryTasks queue used during recovery */
101   private int recoveryBatchSize;
102
103   /**
104    * Creates a new <code>RecoverThread</code> object
105    *
106    * @param scheduler the currently used scheduler
107    * @param recoveryLog Recovery log that creates this thread
108    * @param backend database backend for logging
109    * @param requestManager the request manager to use for recovery
110    * @param checkpointName the checkpoint from which is started the recovery
111    */

112   public RecoverThread(AbstractScheduler scheduler, RecoveryLog recoveryLog,
113       DatabaseBackend backend, RequestManager requestManager,
114       String JavaDoc checkpointName)
115   {
116     super("RecoverThread for backend " + backend.getName());
117     this.scheduler = scheduler;
118     this.recoveryLog = recoveryLog;
119     this.backend = backend;
120     this.requestManager = requestManager;
121     this.checkpointName = checkpointName;
122     this.recoveryBatchSize = recoveryLog.getRecoveryBatchSize();
123     tids = new HashMap JavaDoc();
124     persistentConnections = new ArrayList JavaDoc();
125   }
126
127   /**
128    * Returns the exception value.
129    *
130    * @return Returns the exception.
131    */

132   public SQLException JavaDoc getException()
133   {
134     return exception;
135   }
136
137   /**
138    * @see java.lang.Runnable#run()
139    */

140   public void run()
141   {
142     backend.setState(BackendState.REPLAYING);
143     try
144     {
145       if (!backend.isInitialized())
146         backend.initializeConnections();
147     }
148     catch (SQLException JavaDoc e)
149     {
150       recoveryFailed(e);
151       return;
152     }
153     // notify the recovery log that a new
154
// recovery is about to begin
155
recoveryLog.beginRecovery();
156
157     // Get the checkpoint from the recovery log
158
long logIdx;
159     try
160     {
161       logIdx = recoveryLog.getCheckpointLogId(checkpointName);
162     }
163     catch (SQLException JavaDoc e)
164     {
165       recoveryLog.endRecovery();
166       String JavaDoc msg = Translate.get("recovery.cannot.get.checkpoint", e);
167       logger.error(msg, e);
168       recoveryFailed(new SQLException JavaDoc(msg));
169       return;
170     }
171
172     try
173     {
174       startRecovery();
175
176       logger.info(Translate.get("recovery.start.process"));
177
178       // Play write queries from the recovery log until the last entry or the
179
// first executing entry
180
LinkedList JavaDoc pendingRecoveryTasks = new LinkedList JavaDoc();
181       try
182       {
183         logIdx = recover(logIdx, pendingRecoveryTasks);
184       }
185       catch (EndOfRecoveryLogException e)
186       {
187         logIdx = e.getLogIdx();
188       }
189
190       requestManager.suspendActivity();
191
192       /*
193        * We need to make sure that the logger thread queue has been flushed to
194        * the database, so we need a synchronization event that will make sure
195        * that this happens. The getCheckpointLogId method posts a new object in
196        * the logger thread queue and waits for it to be processed before
197        * returning the result. We don't care about the result that is even
198        * supposed to throw an exception but at least we are sure that the whole
199        * queue has been flushed to disk.
200        */

201       try
202       {
203         recoveryLog
204             .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ...");
205       }
206       catch (SQLException JavaDoc ignore)
207       {
208       }
209
210       // Play the remaining writes that were pending and which have been logged
211
boolean replayedAllLog = false;
212       do
213       { // Loop until the whole recovery log has been replayed
214
try
215         {
216           logIdx = recover(logIdx, pendingRecoveryTasks);
217           // The status update for the last request (probably a commit/rollback)
218
// is not be there yet. Wait for it to be flushed to the log and
219
// retry.
220
try
221           {
222             recoveryLog
223                 .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ...");
224           }
225           catch (SQLException JavaDoc ignore)
226           {
227           }
228         }
229         catch (EndOfRecoveryLogException e)
230         {
231           replayedAllLog = true;
232         }
233       }
234       while (!replayedAllLog);
235       waitForAllTasksCompletion(pendingRecoveryTasks);
236     }
237     catch (SQLException JavaDoc e)
238     {
239       recoveryFailed(e);
240       // Resume writes, transactions and persistent connections
241
requestManager.resumeActivity();
242       return;
243     }
244     finally
245     {
246       endRecovery();
247     }
248
249     // Now enable it
250
try
251     {
252       requestManager.getLoadBalancer().enableBackend(backend, true);
253     }
254     catch (SQLException JavaDoc e)
255     {
256       recoveryFailed(e);
257       return;
258     }
259     finally
260     {
261       // Resume writes, transactions and persistent connections
262
requestManager.resumeActivity();
263     }
264     logger.info(Translate.get("backend.state.enabled", backend.getName()));
265   }
266
267   /**
268    * Unset the last known checkpoint and set the backend to disabled state. This
269    * should be called when the recovery has failed.
270    *
271    * @param e cause of the recovery failure
272    */

273   private void recoveryFailed(SQLException JavaDoc e)
274   {
275     this.exception = e;
276
277     if (scheduler.isSuspendedWrites())
278       scheduler.resumeWrites();
279
280     backend.setLastKnownCheckpoint(null);
281     backend.setState(BackendState.DISABLED);
282     try
283     {
284       backend.finalizeConnections();
285     }
286     catch (SQLException JavaDoc ignore)
287     {
288     }
289     backend.notifyJmxError(
290         SequoiaNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED, e);
291   }
292
293   /**
294    * Replay the recovery log from the given logIdx index. Note that
295    * startRecovery() must have been called to fork and start the
296    * BackendWorkerThread before calling recover. endRecovery() must be called
297    * after recover() to terminate the thread.
298    *
299    * @param logIdx logIdx used to start the recovery
300    * @param pendingRecoveryTasks
301    * @return last logIdx that was replayed.
302    * @throws SQLException if fails
303    * @see #startRecovery()
304    * @see #endRecovery()
305    */

306   private long recover(long logIdx, LinkedList JavaDoc pendingRecoveryTasks)
307       throws SQLException JavaDoc, EndOfRecoveryLogException
308   {
309     RecoveryTask recoveryTask = null;
310     AbstractTask abstractTask = null;
311
312     Long JavaDoc tid = null;
313     long previousRemaining = 0;
314     // Replay the whole log
315
do
316     {
317       try
318       {
319         recoveryTask = recoveryLog.recoverNextRequest(logIdx, scheduler);
320       }
321       catch (SQLException JavaDoc e)
322       {
323         // Signal end of recovery and kill worker thread
324
recoveryLog.endRecovery();
325         addWorkerTask(new KillThreadTask(1, 1));
326         String JavaDoc msg = Translate.get("recovery.cannot.recover.from.index", e);
327         logger.error(msg, e);
328         throw new SQLException JavaDoc(msg);
329       }
330       if (recoveryTask == null)
331         throw new EndOfRecoveryLogException(logIdx);
332
333       abstractTask = recoveryTask.getTask();
334       if (abstractTask == null)
335         throw new SQLException JavaDoc(
336             "Unexpected null abstract task in recovery task " + recoveryTask);
337
338       if (LogEntry.EXECUTING.equals(recoveryTask.getStatus()))
339       {
340         // Ok, wait for current tasks to complete and notify the recovery that
341
// we stopped on this entry
342
break;
343       }
344
345       if (!LogEntry.SUCCESS.equals(recoveryTask.getStatus()))
346       { // Ignore failed queries unless they are stored procedures that could
347
// have some side effect
348
if (!(abstractTask.getRequest() instanceof StoredProcedure))
349         {
350           logIdx++;
351           continue;
352         }
353       }
354       if ((logIdx % 1000) == 0)
355       {
356         long remaining = recoveryLog.getCurrentLogId() - logIdx;
357         endUserLogger.info("Recovering log entry " + logIdx
358             + " remaining entries " + remaining);
359         if (previousRemaining > 0 && remaining > previousRemaining)
360         {
361           endUserLogger.warn("Recovery falling behind pending requests ="
362               + pendingRecoveryTasks.size());
363         }
364         previousRemaining = remaining;
365       }
366       if (abstractTask.isPersistentConnection())
367       {
368         long cid = abstractTask.getPersistentConnectionId();
369         if (abstractTask instanceof OpenPersistentConnectionTask)
370           persistentConnections.add(new Long JavaDoc(cid));
371         else if (abstractTask instanceof ClosePersistentConnectionTask)
372           persistentConnections.remove(new Long JavaDoc(cid));
373         else if (!persistentConnections.contains(new Long JavaDoc(cid)))
374         {
375           /**
376            * If the task persistent connection id does not have a corresponding
377            * connection opening (it is not in the persistent connections list),
378            * then this task has already been played when the backend was
379            * disabled. So we can skip it.
380            * <p>
381            * Note that if the task is a BeginTask, skipping the begin will skip
382            * all requests in the transaction which is the expected behavior on a
383            * persistent connection (transaction has been played before the
384            * connection was closed, i.e. the backend was disabled).
385            */

386           logIdx++;
387           continue;
388         }
389       }
390
391       // Used to retrieve login and persistent connection id
392
AbstractRequest request = null;
393       boolean mustCheckOrder = false;
394       if (!abstractTask.isAutoCommit())
395       {
396         tid = new Long JavaDoc(recoveryTask.getTid());
397         if (abstractTask instanceof BeginTask)
398         {
399           mustCheckOrder = true;
400           if (tids.containsKey(tid))
401           {
402             // Skip multiple begins of the same transaction if exists (this is
403
// possible !!!)
404
logIdx++;
405             continue;
406           }
407           tids.put(tid, abstractTask.getRequest());
408         }
409         else
410         {
411           request = (AbstractRequest) tids.get(tid);
412           if (request == null)
413           {
414             /*
415              * if the task transaction id does not have a corresponding begin
416              * (it is not in the tids list), then this task has already been
417              * played when the backend was disabled. So we can skip it.
418              */

419             logIdx++;
420             continue;
421           }
422           if (abstractTask instanceof RollbackTask)
423           {
424             // Override login in case it was logged with UNKNOWN_USER
425
((RollbackTask) abstractTask).getTransactionMetaData().setLogin(
426                 request.getLogin());
427           }
428           // Restore persistent connection id information
429
abstractTask
430               .setPersistentConnection(request.isPersistentConnection());
431           abstractTask.setPersistentConnectionId(request
432               .getPersistentConnectionId());
433         }
434       } // else autocommit ok
435
else
436         mustCheckOrder = true;
437
438       if ((abstractTask instanceof CommitTask)
439           || (abstractTask instanceof RollbackTask))
440       {
441         tids.remove(tid);
442       }
443
444       logIdx = recoveryTask.getId();
445       // Add the task for execution by the BackendWorkerThread
446
/*
447        * We must wait for all transactions that completed before this
448        * transaction started to complete. This guarentees that 2 requests from
449        * the same application client will not be executed in parallel.
450        */

451       if (mustCheckOrder)
452       {
453         for (Iterator JavaDoc iter = pendingRecoveryTasks.iterator(); iter.hasNext();)
454         {
455           RecoveryTask blocker = (RecoveryTask) iter.next();
456           if (blocker.getCompletionLogId() > 0
457               && blocker.getCompletionLogId() < logIdx)
458           {
459             AbstractTask blockerTask = blocker.getTask();
460             if (blockerTask.isAutoCommit()
461                 || (blockerTask instanceof CommitTask)
462                 || (blockerTask instanceof RollbackTask))
463             {
464               blockerTask = blocker.getTask();
465               synchronized (blockerTask)
466               {
467                 while (!blockerTask.hasFullyCompleted())
468                   try
469                   {
470                     blockerTask.wait();
471                   }
472                   catch (InterruptedException JavaDoc e)
473                   {
474                   }
475               }
476             }
477           }
478         }
479       }
480
481       addWorkerTask(abstractTask);
482
483       // Add it to the list of currently executing tasks
484
pendingRecoveryTasks.addLast(recoveryTask);
485
486       do
487       {
488         // Now let's check which tasks have completed and remove them from the
489
// pending queue.
490
for (Iterator JavaDoc iter = pendingRecoveryTasks.iterator(); iter.hasNext();)
491         {
492           recoveryTask = (RecoveryTask) iter.next();
493           abstractTask = recoveryTask.getTask();
494           if (abstractTask.hasFullyCompleted())
495           { // Task has completed, remove it from the list
496
iter.remove();
497
498             if (LogEntry.SUCCESS.equals(recoveryTask.getStatus()))
499             { // Only deal with successful tasks
500

501               if (abstractTask.getFailed() > 0)
502               { // We fail to recover that task. Signal end of recovery and kill
503
// worker thread
504
String JavaDoc msg;
505                 if (abstractTask.isAutoCommit())
506                   msg = Translate.get("recovery.failed.with.error",
507                       new Object JavaDoc[]{
508                           abstractTask,
509                           ((Exception JavaDoc) abstractTask.getExceptions().get(0))
510                               .getMessage()});
511                 else
512                   msg = Translate.get("recovery.failed.with.error.transaction",
513                       new Object JavaDoc[]{
514                           Long.toString(abstractTask.getTransactionId()),
515                           abstractTask,
516                           ((Exception JavaDoc) abstractTask.getExceptions().get(0))
517                               .getMessage()});
518                 recoveryLog.endRecovery();
519                 addWorkerTask(new KillThreadTask(1, 1));
520                 pendingRecoveryTasks.clear();
521                 logger.error(msg);
522                 throw new SQLException JavaDoc(msg);
523               }
524             }
525           }
526         }
527
528         /*
529          * Transactions and persistentConnections limit by the number of pending
530          * requests at the backend total order queue. Only one request per
531          * transaction or persistent connection will be move from the total
532          * order queue to the task queues. When all the requests are auto
533          * commit, we need to limit the number of requests here. Otherwise, the
534          * conflicting queue can grow indefinitely large.
535          */

536         if (tids.isEmpty() && persistentConnections.isEmpty()
537             && pendingRecoveryTasks.size() >= recoveryBatchSize)
538           try
539           {
540             recoveryTask = (RecoveryTask) pendingRecoveryTasks.getFirst();
541             abstractTask = recoveryTask.getTask();
542             synchronized (abstractTask)
543             {
544               if (!abstractTask.hasFullyCompleted())
545                 abstractTask.wait();
546             }
547           }
548           catch (InterruptedException JavaDoc e)
549           {
550             break;
551           }
552         else
553           break;
554       }
555       while (true);
556     }
557     while (logIdx != -1); // while we have not reached the last querys
558

559     return logIdx;
560   }
561
562   /**
563    * Wait for all tasks in the given list to complete. Note that endRecovery()
564    * is called upon failure.
565    *
566    * @param pendingRecoveryTasks list of <code>RecoveryTask</code> currently
567    * executing tasks
568    * @throws SQLException if a failure occurs
569    */

570   private void waitForAllTasksCompletion(LinkedList JavaDoc pendingRecoveryTasks)
571       throws SQLException JavaDoc
572   {
573     RecoveryTask recoveryTask;
574     AbstractTask abstractTask;
575
576     while (!pendingRecoveryTasks.isEmpty())
577     {
578       recoveryTask = (RecoveryTask) pendingRecoveryTasks.removeFirst();
579       abstractTask = recoveryTask.getTask();
580       synchronized (abstractTask)
581       {
582         // Wait for task completion if needed
583
while (!abstractTask.hasFullyCompleted())
584           try
585           {
586             abstractTask.wait();
587           }
588           catch (InterruptedException JavaDoc ignore)
589           {
590           }
591
592         if (LogEntry.SUCCESS.equals(recoveryTask.getStatus()))
593         { // Only deal with successful tasks
594
if (abstractTask.getFailed() > 0)
595           { // We fail to recover that task. Signal end of recovery and kill
596
// worker thread
597
recoveryLog.endRecovery();
598             addWorkerTask(new KillThreadTask(1, 1));
599             pendingRecoveryTasks.clear();
600             String JavaDoc msg;
601             if (abstractTask.isAutoCommit())
602               msg = Translate.get("recovery.failed.with.error", new Object JavaDoc[]{
603                   abstractTask,
604                   ((Exception JavaDoc) abstractTask.getExceptions().get(0))
605                       .getMessage()});
606             else
607               msg = Translate.get("recovery.failed.with.error.transaction",
608                   new Object JavaDoc[]{
609                       Long.toString(abstractTask.getTransactionId()),
610                       abstractTask,
611                       ((Exception JavaDoc) abstractTask.getExceptions().get(0))
612                           .getMessage()});
613             logger.error(msg);
614             throw new SQLException JavaDoc(msg);
615           }
616         }
617       }
618     }
619   }
620
621   /**
622    * Add a task to a DatabaseBackend using the proper synchronization.
623    *
624    * @param task the task to add to the thread queue
625    */

626   private void addWorkerTask(AbstractTask task)
627   {
628     backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task,
629         recoveryBatchSize);
630   }
631
632   /**
633    * Properly end the recovery and kill the worker thread used for recovery if
634    * it exists.
635    *
636    * @see #startRecovery()
637    */

638   private void endRecovery()
639   {
640     // We are done with the recovery
641
logger.info(Translate.get("recovery.process.complete"));
642     backend.terminateWorkerThreads();
643
644     recoveryLog.endRecovery();
645   }
646
647   /**
648    * Start the recovery process by forking a BackendWorkerThread. <br />
649    * You must call endRecovery() to terminate the thread.
650    * <p>
651    * when starting the recovery, we create a new BackendTaskQueues for the
652    * backend but only its non-conflicting queue will be used.<br />
653    * We also use only one BackendWorkerThread to ensure that the request will be
654    * replayed serially in the same order they were logged.
655    * </p>
656    * <p>
657    * A new BackendTaskQueues will be set on the backend when it is enabled in
658    * the endRecovery() method.
659    * </p>
660    *
661    * @see #endRecovery()
662    * @see #addWorkerTask(AbstractTask)
663    */

664   private void startRecovery()
665   {
666     try
667     {
668       ObjectName JavaDoc taskQueuesObjectName = JmxConstants
669           .getBackendTaskQueuesObjectName(backend.getVirtualDatabaseName(),
670               backend.getName());
671       if (MBeanServerManager.getInstance().isRegistered(taskQueuesObjectName))
672       {
673         MBeanServerManager.unregister(JmxConstants
674             .getBackendTaskQueuesObjectName(backend.getVirtualDatabaseName(),
675                 backend.getName()));
676       }
677     }
678     catch (Exception JavaDoc e)
679     {
680       if (logger.isWarnEnabled())
681       {
682         logger.warn("Exception while unregistering backend task queues mbean",
683             e);
684       }
685     }
686     // find the correct enforceTableLocking option for this backend
687
boolean enforceTableLocking = requestManager.getLoadBalancer().waitForCompletionPolicy
688         .isEnforceTableLocking();
689     backend.setTaskQueues(new BackendTaskQueues(backend,
690         new WaitForCompletionPolicy(WaitForCompletionPolicy.FIRST,
691             enforceTableLocking, 0), requestManager));
692     backend.startWorkerThreads(requestManager.getLoadBalancer());
693   }
694
695   /*
696    * Used to signal that we have reached the end of the recovery log during the
697    * recovery process. There are other conditions that interrupt the recovery
698    * process, such as finding a request which is still executing. In such case
699    * we will not throw this exception.
700    */

701   private class EndOfRecoveryLogException extends Exception JavaDoc
702   {
703     private static final long serialVersionUID = 2826202288239306426L;
704     private long logIdx;
705
706     /**
707      * Creates a new <code>EndOfRecoveryLogException</code> object
708      *
709      * @param logIdx recovery log index we stopped at
710      */

711     public EndOfRecoveryLogException(long logIdx)
712     {
713       this.logIdx = logIdx;
714     }
715
716     /**
717      * Return the last recovery log index reached
718      *
719      * @return last recovery log index
720      */

721     public long getLogIdx()
722     {
723       return logIdx;
724     }
725   }
726
727 }
728
Popular Tags