KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > recoverylog > RecoverThread


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Nicolas Modrzyk.
22  * Contributor(s): Emmanuel Cecchet.
23  */

24
25 package org.objectweb.cjdbc.controller.recoverylog;
26
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.LinkedList JavaDoc;
30
31 import org.objectweb.cjdbc.common.i18n.Translate;
32 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList;
33 import org.objectweb.cjdbc.common.log.Trace;
34 import org.objectweb.cjdbc.common.shared.BackendState;
35 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
36 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
37 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
38 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
39 import org.objectweb.cjdbc.controller.loadbalancer.tasks.BeginTask;
40 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
41 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
42
43 /**
44  * This class defines a RecoverThread that is in charge of replaying the
45  * recovery log on a given backend to re-synchronize it with the other nodes of
46  * the cluster.
47  *
48  * @author <a HREF="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
49  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
50  * @version 1.0
51  */

52 public class RecoverThread extends Thread JavaDoc
53 {
54   static Trace logger = Trace.getLogger(RecoverThread.class
55                                           .getName());
56
57   private RecoveryLog recoveryLog;
58   private DatabaseBackend backend;
59   private AbstractLoadBalancer loadBalancer;
60   private SQLException JavaDoc exception;
61
62   private BackendWorkerThread bwt;
63   private ArrayList JavaDoc tids;
64
65   private AbstractScheduler scheduler;
66
67   private String JavaDoc checkpointName;
68
69   /** Size of the pendingRecoveryTasks queue used during recovery */
70   private int recoveryBatchSize;
71
72   /**
73    * Creates a new <code>RecoverThread</code> object
74    *
75    * @param scheduler the currently used scheduler
76    * @param recoveryLog Recovery log that creates this thread
77    * @param backend database backend for logging
78    * @param loadBalancer index to start from for recovery
79    * @param checkpointName load balancer to use to create a BackendWorkerThread
80    */

81   public RecoverThread(AbstractScheduler scheduler, RecoveryLog recoveryLog,
82       DatabaseBackend backend, AbstractLoadBalancer loadBalancer,
83       String JavaDoc checkpointName)
84   {
85     this.scheduler = scheduler;
86     this.recoveryLog = recoveryLog;
87     this.backend = backend;
88     this.loadBalancer = loadBalancer;
89     this.checkpointName = checkpointName;
90     this.recoveryBatchSize = recoveryLog.getRecoveryBatchSize();
91     tids = new ArrayList JavaDoc();
92   }
93
94   /**
95    * Returns the exception value.
96    *
97    * @return Returns the exception.
98    */

99   public SQLException JavaDoc getException()
100   {
101     return exception;
102   }
103
104   /**
105    * @see java.lang.Runnable#run()
106    */

107   public void run()
108   {
109     backend.setState(BackendState.REPLAYING);
110     try
111     {
112       backend.initializeConnections();
113     }
114     catch (SQLException JavaDoc e)
115     {
116       recoveryFailed(e);
117       return;
118     }
119     recoveryLog.beginRecovery();
120
121     // Get the checkpoint from the recovery log
122
long logIdx;
123     try
124     {
125       logIdx = recoveryLog.getCheckpointRequestId(checkpointName);
126     }
127     catch (SQLException JavaDoc e)
128     {
129       recoveryLog.endRecovery();
130       String JavaDoc msg = Translate.get("recovery.cannot.get.checkpoint", e);
131       logger.error(msg);
132       recoveryFailed(new SQLException JavaDoc(msg));
133       return;
134     }
135
136     try
137     {
138       startRecovery();
139
140       // Play writes from the recovery log until the last possible transaction
141
// without blocking the scheduler
142
logIdx = recover(logIdx);
143
144       // Suspend the writes
145
scheduler.suspendWrites();
146
147       // Play the remaining writes that were pending
148
logIdx = recover(logIdx);
149
150     }
151     catch (SQLException JavaDoc e)
152     {
153       recoveryFailed(e);
154       return;
155     }
156     finally
157     {
158       endRecovery();
159     }
160
161     // Now enable it
162
try
163     {
164       loadBalancer.enableBackend(backend, true);
165       scheduler.resumeWrites();
166     }
167     catch (SQLException JavaDoc e)
168     {
169       recoveryFailed(e);
170       return;
171     }
172     logger.info(Translate.get("backend.state.enabled", backend.getName()));
173   }
174
175   /**
176    * Unset the last known checkpoint and set the backend to disabled state. This
177    * should be called when the recovery has failed.
178    *
179    * @param e cause of the recovery failure
180    */

181   private void recoveryFailed(SQLException JavaDoc e)
182   {
183     this.exception = e;
184
185     if (scheduler.isSuspendedWrites())
186       scheduler.resumeWrites();
187
188     backend.setLastKnownCheckpoint(null);
189     backend.setState(BackendState.DISABLED);
190     backend.notifyJmxError(
191         CjdbcNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED, e);
192   }
193
194   /**
195    * Replay the recovery log from the given logIdx index. Note that
196    * startRecovery() must have been called to fork and start the
197    * BackendWorkerThread before calling recover. endRecovery() must be called
198    * after recover() to terminate the thread.
199    *
200    * @param logIdx logIdx used to start the recovery
201    * @return last logIdx that was replayed.
202    * @throws SQLException if fails
203    * @see #startRecovery()
204    * @see #endRecovery()
205    */

206   private long recover(long logIdx) throws SQLException JavaDoc
207   {
208     if (bwt == null)
209       throw new RuntimeException JavaDoc(
210           "No BackendWorkerThread to recover, you should have called RecoveryLog.startRecovery()");
211     RecoveryTask recoveryTask = null;
212     AbstractTask abstractTask = null;
213
214     logger.info(Translate.get("recovery.start.process"));
215
216     long tid;
217     LinkedList JavaDoc pendingRecoveryTasks = new LinkedList JavaDoc();
218     // Replay the whole log
219
while (logIdx != -1)
220     {
221       try
222       {
223         recoveryTask = recoveryLog.recoverNextRequest(logIdx);
224       }
225       catch (SQLException JavaDoc e)
226       {
227         // Signal end of recovery and kill worker thread
228
recoveryLog.endRecovery();
229         addWorkerTask(bwt, new KillThreadTask(1, 1));
230         String JavaDoc msg = Translate.get("recovery.cannot.recover.from.index", e);
231         logger.error(msg, e);
232         throw new SQLException JavaDoc(msg);
233       }
234       if (recoveryTask == null)
235         break;
236       tid = recoveryTask.getTid();
237       if (tid != 0)
238       {
239         if (recoveryTask.getTask() instanceof BeginTask)
240           tids.add(new Long JavaDoc(tid));
241         else if (!tids.contains(new Long JavaDoc(tid)))
242         {
243           /*
244            * if the task transaction id does not have a corresponding begin (it
245            * is not in the tids arraylist), then this task has already been
246            * played when the backend was disabled. So we can skip it.
247            */

248           logIdx++;
249           continue;
250         }
251       } // else autocommit ok
252

253       abstractTask = recoveryTask.getTask();
254       logIdx = recoveryTask.getId();
255       // Add the task for execution by the BackendWorkerThread
256
addWorkerTask(bwt, abstractTask);
257       // Add it to the list of currently executing tasks
258
pendingRecoveryTasks.addLast(abstractTask);
259
260       // Now let's check which task have completed and remove them from the
261
// pending queue.
262
do
263       { // Take the first task of the list
264
abstractTask = (AbstractTask) pendingRecoveryTasks.getFirst();
265         if (abstractTask.hasFullyCompleted())
266         {
267           // Task has completed, remove it from the list
268
pendingRecoveryTasks.removeFirst();
269           if (abstractTask.getFailed() > 0)
270           { // We fail to recover that task. Signal end of recovery and kill
271
// worker thread
272
recoveryLog.endRecovery();
273             addWorkerTask(bwt, new KillThreadTask(1, 1));
274             pendingRecoveryTasks.clear();
275             String JavaDoc msg = Translate.get("recovery.failed.with.error",
276                 new String JavaDoc[]{
277                     abstractTask.toString(),
278                     ((Exception JavaDoc) abstractTask.getExceptions().get(0))
279                         .getMessage()});
280             logger.error(msg);
281             throw new SQLException JavaDoc(msg);
282           }
283         }
284         else
285         { // Task has not completed yet
286
if (pendingRecoveryTasks.size() > recoveryBatchSize)
287           { // Queue is full, wait for first query to complete
288
synchronized (abstractTask)
289             {
290               if (!abstractTask.hasFullyCompleted())
291                 try
292                 {
293                   abstractTask.wait();
294                 }
295                 catch (InterruptedException JavaDoc ignore)
296                 {
297                 }
298               // Let's check the task completion status by restarting the loop
299
continue;
300             }
301           }
302           else
303             // All current queries are executing but the queue is not full,
304
// let's add some more
305
break;
306         }
307       }
308       while (!pendingRecoveryTasks.isEmpty());
309     } // while we have not reached the last query
310

311     // Ok, now everything has been replayed but we have to wait for the last
312
// queries in the pending queue to complete.
313
while (!pendingRecoveryTasks.isEmpty())
314     {
315       abstractTask = (AbstractTask) pendingRecoveryTasks.remove(0);
316       synchronized (abstractTask)
317       {
318         // Wait for task completion if needed
319
while (!abstractTask.hasFullyCompleted())
320           try
321           {
322             abstractTask.wait();
323           }
324           catch (InterruptedException JavaDoc ignore)
325           {
326           }
327
328         // Check if tasks completed successfully
329
if (abstractTask.getFailed() > 0)
330         { // We fail to recover that task. Signal end of recovery and kill
331
// worker thread
332
recoveryLog.endRecovery();
333           addWorkerTask(bwt, new KillThreadTask(1, 1));
334           pendingRecoveryTasks.clear();
335           String JavaDoc msg = Translate.get("recovery.failed.with.error",
336               new String JavaDoc[]{
337                   abstractTask.toString(),
338                   ((Exception JavaDoc) abstractTask.getExceptions().get(0))
339                       .getMessage()});
340           logger.error(msg);
341           throw new SQLException JavaDoc(msg);
342         }
343       }
344     }
345     return logIdx;
346   }
347
348   /**
349    * Add a task to a BackendWorkerThread using the proper synchronization.
350    *
351    * @param bwt BackendWorkerThread to synchronize on
352    * @param task the task to add to the thread queue
353    */

354   private void addWorkerTask(BackendWorkerThread bwt, AbstractTask task)
355   {
356     synchronized (bwt)
357     {
358       bwt.addTask(task);
359       bwt.notify();
360     }
361   }
362
363   /**
364    * Properly end the recovery and kill the worker thread used for recovery if
365    * it exists.
366    *
367    * @see #startRecovery()
368    */

369   public void endRecovery()
370   {
371     // We are done with the recovery
372
logger.info(Translate.get("recovery.process.complete"));
373     if (bwt != null)
374     {
375       addWorkerTask(bwt, new KillThreadTask(1, 1));
376       try
377       {
378         bwt.join();
379       }
380       catch (InterruptedException JavaDoc e)
381       {
382         recoveryLog.endRecovery();
383         String JavaDoc msg = Translate.get("recovery.join.failed", e);
384         logger.error(msg, e);
385         exception = new SQLException JavaDoc(msg);
386       }
387     }
388
389     recoveryLog.endRecovery();
390   }
391
392   /**
393    * Start the recovery process by forking a BackendWorkerThread. You must call
394    * endRecovery() to terminate the thread.
395    *
396    * @throws SQLException if an error occurs
397    * @see #endRecovery()
398    */

399   public void startRecovery() throws SQLException JavaDoc
400   {
401     bwt = new BackendWorkerThread("Worker thread for recovery on backend:"
402         + backend.getName(), backend, loadBalancer);
403     bwt.start();
404   }
405 }
Popular Tags