KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > recoverylog > LoggerThread


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): ______________________.
22  */

23
24 package org.continuent.sequoia.controller.recoverylog;
25
26 import java.sql.PreparedStatement JavaDoc;
27 import java.sql.ResultSet JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.LinkedList JavaDoc;
31
32 import org.continuent.sequoia.common.i18n.Translate;
33 import org.continuent.sequoia.common.log.Trace;
34 import org.continuent.sequoia.controller.recoverylog.events.LogEvent;
35 import org.continuent.sequoia.controller.recoverylog.events.LogRequestEvent;
36
37 /**
38  * Logger thread for the RecoveryLog.
39  *
40  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
41  * @version 1.0
42  */

43 public class LoggerThread extends Thread JavaDoc
44 {
45   private boolean killed = false; // Control thread death
46
/*
47    * the only place where we must remove the first element of the queue is in
48    * the run() method. do note remove its first element anywhere else!
49    */

50   private LinkedList JavaDoc logQueue;
51   private Trace logger;
52   private PreparedStatement JavaDoc logStmt;
53   private PreparedStatement JavaDoc updateStmt;
54   private RecoveryLog recoveryLog;
55   private LogEvent currentEvent = null;
56   private LogEvent lastFailed;
57
58   /**
59    * Creates a new <code>LoggerThread</code> object
60    *
61    * @param log the RecoveryLog that instanciates this thread
62    */

63   public LoggerThread(RecoveryLog log)
64   {
65     super("LoggerThread");
66     this.recoveryLog = log;
67     this.logger = RecoveryLog.logger;
68     logStmt = null;
69     updateStmt = null;
70     logQueue = new LinkedList JavaDoc();
71   }
72
73   /**
74    * Returns the logger value.
75    *
76    * @return Returns the logger.
77    */

78   public Trace getLogger()
79   {
80     return logger;
81   }
82
83   /**
84    * Tells whether there are pending logs
85    *
86    * @return true if no more jobs in the log queue
87    */

88   public synchronized boolean getLogQueueIsEmpty()
89   {
90     if (logQueue.isEmpty())
91     {
92       // Notifies the Recovery log that the queue is empty.
93
notify();
94       return true;
95     }
96     else
97     {
98       return false;
99     }
100   }
101
102   /**
103    * Return a PreparedStatement to log an entry as follows:
104    * <p>
105    * INSERT INTO LogTableName VALUES(?,?,?,?,?,?,?)
106    *
107    * @return a PreparedStatement
108    * @throws SQLException if an error occurs
109    */

110   public PreparedStatement JavaDoc getLogPreparedStatement() throws SQLException JavaDoc
111   {
112     if (logStmt == null)
113     {
114       logStmt = recoveryLog.getDatabaseConnection().prepareStatement(
115           "INSERT INTO " + recoveryLog.getLogTableName()
116               + " VALUES(?,?,?,?,?,?,?,?,?,?,?)");
117     }
118     return logStmt;
119   }
120
121   /**
122    * Return a PreparedStatement to update an entry as follows:
123    * <p>
124    * UPDATE LogTableName SET exec_status=?,exec_time=? WHERE log_id=?
125    *
126    * @return a PreparedStatement
127    * @throws SQLException if an error occurs
128    */

129   public PreparedStatement JavaDoc getUpdatePreparedStatement() throws SQLException JavaDoc
130   {
131     if (updateStmt == null)
132     {
133       updateStmt = recoveryLog
134           .getDatabaseConnection()
135           .prepareStatement(
136               "UPDATE "
137                   + recoveryLog.getLogTableName()
138                   + " SET exec_status=?,update_count=?,exec_time=?,completion_log_id=? WHERE log_id=?");
139     }
140     return updateStmt;
141   }
142
143   /**
144    * Returns the recoveryLog value.
145    *
146    * @return Returns the recoveryLog.
147    */

148   public RecoveryLog getRecoveryLog()
149   {
150     return recoveryLog;
151   }
152
153   /**
154    * Returns true if there is any log event in the queue that belongs to the
155    * given transaction.
156    *
157    * @param tid transaction id to look for
158    * @return true if a log entry belongs to this transaction
159    */

160   public synchronized boolean hasLogEntryForTransaction(long tid)
161   {
162     for (Iterator JavaDoc iter = logQueue.iterator(); iter.hasNext();)
163     {
164       LogEvent logEvent = (LogEvent) iter.next();
165       if (logEvent.belongToTransaction(tid))
166         return true;
167     }
168     return false;
169   }
170
171   /**
172    * Invalidate both logStmt and unlogStmt so that they can be renewed from a
173    * fresh connection.
174    *
175    * @see #getLogPreparedStatement()
176    * @see #getUpdatePreparedStatement()
177    */

178   public void invalidateLogStatements()
179   {
180     try
181     {
182       logStmt.close();
183     }
184     catch (Exception JavaDoc ignore)
185     {
186     }
187     try
188     {
189       updateStmt.close();
190     }
191     catch (Exception JavaDoc ignore)
192     {
193     }
194     logStmt = null;
195     updateStmt = null;
196     recoveryLog.invalidateInternalConnection();
197   }
198
199   /**
200    * Log a write-query into the recovery log. This posts the specified logObject
201    * (query) into this loggerThread queue. The actual write to the recoverly-log
202    * db is performed asynchronously by the thread.
203    *
204    * @param logObject the log event to be processed
205    */

206   public synchronized void log(LogEvent logObject)
207   {
208     logQueue.addLast(logObject);
209     notify();
210   }
211
212   /**
213    * Put back a log entry at the head of the queue in case a problem happened
214    * with this entry and we need to retry it right away.
215    *
216    * @param event the event to be used next by the logger thread.
217    * @param e exception causing the event to fail and to be retried
218    */

219   public synchronized void putBackAtHeadOfQueue(LogEvent event, Exception JavaDoc e)
220   {
221     if (lastFailed != event)
222     {
223       logQueue.addFirst(event);
224       notify();
225       lastFailed = event;
226     }
227     else
228     {
229       if (event instanceof LogRequestEvent)
230         logger
231             .error("WARNING! Your recovery log is probably corrupted, you should perform a restore log operation");
232       logger.error("Logger thread was unable to log " + event.toString()
233           + " because of " + e, e);
234     }
235   }
236
237   /**
238    * Remove all queries that have not been logged yet and belonging to the
239    * specified transaction.
240    *
241    * @param tid transaction id to rollback
242    */

243   public synchronized void removeQueriesOfTransactionFromQueue(long tid)
244   {
245     if (logger.isDebugEnabled())
246       logger.debug(Translate.get("recovery.jdbc.loggerthread.removing", tid));
247     Iterator JavaDoc iter = logQueue.iterator();
248     // do not remove the first element of the queue
249
// (must only be done by the run() method)
250
if (iter.hasNext())
251     {
252       iter.next();
253     }
254     while (iter.hasNext())
255     {
256       LogEvent event = (LogEvent) iter.next();
257       if (event.belongToTransaction(tid))
258       {
259         iter.remove();
260       }
261     }
262   }
263
264   /**
265    * Remove a possibly empty transaction from the recovery log. This method
266    * returns true if no entry or just a begin is found for that transaction. If
267    * a begin was found it will be removed from the log.
268    *
269    * @param transactionId the id of the transaction
270    * @return true if the transaction was empty
271    * @throws SQLException if an error occurs
272    */

273   public boolean removeEmptyTransaction(long transactionId) throws SQLException JavaDoc
274   {
275     if (hasLogEntryForTransaction(transactionId))
276       return false;
277
278     PreparedStatement JavaDoc stmt = null;
279     ResultSet JavaDoc rs = null;
280     try
281     {
282       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
283           "SELECT * FROM " + recoveryLog.getLogTableName()
284               + " WHERE transaction_id=?");
285       stmt.setLong(1, transactionId);
286       rs = stmt.executeQuery();
287       if (!rs.next())
288         return true; // no entry for that transaction
289

290       // Check if the first entry found is a begin
291
String JavaDoc sql = rs.getString(recoveryLog.getLogTableSqlColumnName());
292       if ((sql == null) || !sql.startsWith(RecoveryLog.BEGIN))
293         return false;
294
295       if (rs.next())
296         return false; // multiple entries in this transaction
297

298       rs.close();
299       stmt.close();
300
301       // There is a single BEGIN in the log for that transaction, remove it.
302
stmt = recoveryLog.getDatabaseConnection().prepareStatement(
303           "DELETE FROM " + recoveryLog.getLogTableName()
304               + " WHERE transaction_id=?");
305       stmt.setLong(1, transactionId);
306       stmt.executeUpdate();
307       return true;
308     }
309     catch (SQLException JavaDoc e)
310     {
311       throw new SQLException JavaDoc(Translate.get(
312           "recovery.jdbc.transaction.remove.failed", new String JavaDoc[]{
313               String.valueOf(transactionId), e.getMessage()}));
314     }
315     finally
316     {
317       try
318       {
319         if (rs != null)
320           rs.close();
321       }
322       catch (Exception JavaDoc ignore)
323       {
324       }
325       try
326       {
327         if (stmt != null)
328           stmt.close();
329       }
330       catch (Exception JavaDoc ignore)
331       {
332       }
333     }
334   }
335
336   /**
337    * Delete all entries from the CheckpointTable.
338    *
339    * @throws SQLException if an error occurs
340    */

341   public void deleteCheckpointTable() throws SQLException JavaDoc
342   {
343     // First delete from the checkpoint table
344
PreparedStatement JavaDoc stmt = null;
345     try
346     {
347       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
348           "DELETE FROM " + recoveryLog.getCheckpointTableName());
349       stmt.executeUpdate();
350     }
351     catch (SQLException JavaDoc e)
352     {
353       String JavaDoc msg = "Failed to delete checkpoint table";
354       logger.warn(msg, e);
355       throw new SQLException JavaDoc(msg);
356     }
357     finally
358     {
359       try
360       {
361         if (stmt != null)
362           stmt.close();
363       }
364       catch (Exception JavaDoc ignore)
365       {
366       }
367     }
368   }
369
370   /**
371    * Store a checkpoint in the recovery log using the provided local log id.<br>
372    * Moreover, in case of error, additionally closes and invalidates log and
373    * unlog statements (internal) before calling
374    * RecoveryLog#invalidateInternalConnection().
375    *
376    * @param checkpointName checkpoint name to insert
377    * @param checkpointLogId checkpoint log identifier
378    * @throws SQLException if a database access error occurs
379    * @see RecoveryLog#storeCheckpoint(String)
380    * @see #invalidateLogStatements()
381    */

382   public void storeCheckpointWithLogId(String JavaDoc checkpointName,
383       long checkpointLogId) throws SQLException JavaDoc
384   {
385     PreparedStatement JavaDoc stmt = null;
386     try
387     {
388       if (logger.isDebugEnabled())
389         logger.debug("Storing checkpoint " + checkpointName + " at request id "
390             + checkpointLogId);
391       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
392           "INSERT INTO " + recoveryLog.getCheckpointTableName()
393               + " VALUES(?,?)");
394       stmt.setString(1, checkpointName);
395       stmt.setLong(2, checkpointLogId);
396       stmt.executeUpdate();
397     }
398     catch (SQLException JavaDoc e)
399     {
400       invalidateLogStatements();
401       throw new SQLException JavaDoc(Translate.get(
402           "recovery.jdbc.checkpoint.store.failed", new String JavaDoc[]{checkpointName,
403               e.getMessage()}));
404     }
405     finally
406     {
407       try
408       {
409         if (stmt != null)
410           stmt.close();
411       }
412       catch (Exception JavaDoc ignore)
413       {
414       }
415     }
416   }
417
418   /**
419    * Remove a checkpoint in the recovery log.<br />
420    * In case of error, additionely close and invalidates log and unlog
421    * statements (internal) before calling
422    * RecoveryLog#invalidateInternalConnection().
423    *
424    * @param checkpointName name of the checkpoint to remove
425    * @throws SQLException if a database access error occurs
426    * @see org.continuent.sequoia.controller.recoverylog.events.RemoveCheckpointEvent
427    */

428   public void removeCheckpoint(String JavaDoc checkpointName) throws SQLException JavaDoc
429   {
430     PreparedStatement JavaDoc stmt = null;
431
432     try
433     {
434       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
435           "DELETE FROM " + recoveryLog.getCheckpointTableName()
436               + " WHERE name like ?");
437       stmt.setString(1, checkpointName);
438       stmt.executeUpdate();
439       stmt.close();
440     }
441     catch (SQLException JavaDoc e)
442     {
443       invalidateLogStatements();
444       throw new SQLException JavaDoc(Translate.get(
445           "recovery.jdbc.checkpoint.remove.failed", new String JavaDoc[]{
446               checkpointName, e.getMessage()}));
447     }
448     finally
449     {
450       try
451       {
452         if (stmt != null)
453           stmt.close();
454       }
455       catch (Exception JavaDoc ignore)
456       {
457       }
458     }
459   }
460
461   /**
462    * Delete all LogEntries with an identifier lower than oldId (inclusive).
463    * oldId is normally derived from a checkpoint name, which marks the last
464    * request before the checkpoint.
465    *
466    * @param oldId the id up to which entries should be removed.
467    * @throws SQLException if an error occurs
468    */

469   public void deleteLogEntriesBeforeId(long oldId) throws SQLException JavaDoc
470   {
471     PreparedStatement JavaDoc stmt = null;
472     try
473     {
474       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
475           "DELETE FROM " + recoveryLog.getLogTableName() + " WHERE log_id<=?");
476       stmt.setLong(1, oldId);
477       stmt.executeUpdate();
478     }
479     catch (SQLException JavaDoc e)
480     {
481       // TODO: Check error message below
482
throw new SQLException JavaDoc(Translate.get(
483           "recovery.jdbc.transaction.remove.failed", new String JavaDoc[]{
484               String.valueOf(oldId), e.getMessage()}));
485     }
486     finally
487     {
488       try
489       {
490         if (stmt != null)
491           stmt.close();
492       }
493       catch (Exception JavaDoc ignore)
494       {
495       }
496     }
497   }
498
499   /**
500    * Return the real number of log entries between 2 log ids (usually matching
501    * checkpoint indices). The SELECT excludes both boundaries.
502    *
503    * @param lowerLogId the lower log id
504    * @param upperLogId the upper log id
505    * @return the number of entries between the 2 ids
506    * @throws SQLException if an error occurs querying the recovery log
507    */

508   public long getNumberOfLogEntries(long lowerLogId, long upperLogId)
509       throws SQLException JavaDoc
510   {
511     ResultSet JavaDoc rs = null;
512     PreparedStatement JavaDoc stmt = null;
513     try
514     {
515       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
516           "SELECT COUNT(*) FROM " + recoveryLog.getLogTableName()
517               + " WHERE log_id>? AND log_id<?");
518       // Note that the statement is closed in the finally block
519
stmt.setLong(1, lowerLogId);
520       stmt.setLong(2, upperLogId);
521       rs = stmt.executeQuery();
522       if (!rs.next())
523         throw new SQLException JavaDoc(
524             "Failed to retrieve number of log entries (no rows returned)");
525
526       return rs.getLong(1);
527     }
528     catch (SQLException JavaDoc e)
529     {
530       throw e;
531     }
532     finally
533     {
534       try
535       {
536         if (rs != null)
537           rs.close();
538       }
539       catch (Exception JavaDoc ignore)
540       {
541       }
542       try
543       {
544         if (stmt != null)
545           stmt.close();
546       }
547       catch (Exception JavaDoc ignore)
548       {
549       }
550     }
551   }
552
553   /**
554    * Shift LogEntries identifiers from the specified value (value is added to
555    * existing identifiers).
556    *
557    * @param shiftValue the value to shift
558    * @throws SQLException if an error occurs
559    */

560   public void shiftLogEntriesIds(long shiftValue) throws SQLException JavaDoc
561   {
562     PreparedStatement JavaDoc stmt = null;
563     try
564     {
565       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
566           "UPDATE " + recoveryLog.getLogTableName() + " SET log_id=log_id+?");
567       stmt.setLong(1, shiftValue);
568       stmt.executeUpdate();
569     }
570     catch (SQLException JavaDoc e)
571     {
572       throw new SQLException JavaDoc(Translate.get(
573           "recovery.jdbc.loggerthread.shift.failed", e.getMessage()));
574     }
575     finally
576     {
577       try
578       {
579         if (stmt != null)
580           stmt.close();
581       }
582       catch (Exception JavaDoc ignore)
583       {
584       }
585     }
586   }
587
588   /**
589    * Shift LogEntries identifiers from the specified shiftValue (value is added
590    * to existing identifiers) starting with identifier with a value strictly
591    * greater than the given id.
592    *
593    * @param fromId id to start shifting from
594    * @param shiftValue the value to shift
595    * @throws SQLException if an error occurs
596    */

597   public void shiftLogEntriesAfterId(long fromId, long shiftValue)
598       throws SQLException JavaDoc
599   {
600     PreparedStatement JavaDoc stmt = null;
601     try
602     {
603       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
604           "UPDATE " + recoveryLog.getLogTableName()
605               + " SET log_id=log_id+? WHERE log_id>?");
606       stmt.setLong(1, shiftValue);
607       stmt.setLong(2, fromId);
608       stmt.executeUpdate();
609     }
610     catch (SQLException JavaDoc e)
611     {
612       throw new SQLException JavaDoc(Translate.get(
613           "recovery.jdbc.loggerthread.shift.failed", e.getMessage()));
614     }
615     finally
616     {
617       try
618       {
619         if (stmt != null)
620           stmt.close();
621       }
622       catch (Exception JavaDoc ignore)
623       {
624       }
625     }
626   }
627
628   /**
629    * Delete all log entries that have an id strictly between the 2 given
630    * boundaries (commonCheckpointId<id<nowCheckpointId). All checkpoints
631    * pointing to an id in the wiped zone will be deleted as well.
632    *
633    * @param commonCheckpointId lower id bound
634    * @param nowCheckpointId upper id bound
635    * @throws SQLException if an error occurs accessing the log
636    */

637   public void deleteLogEntriesAndCheckpointBetween(long commonCheckpointId,
638       long nowCheckpointId) throws SQLException JavaDoc
639   {
640     PreparedStatement JavaDoc stmt = null;
641     try
642     {
643       // Delete log entries first
644
stmt = recoveryLog.getDatabaseConnection().prepareStatement(
645           "DELETE FROM " + recoveryLog.getLogTableName()
646               + " WHERE ?<log_id AND log_id<?");
647       stmt.setLong(1, commonCheckpointId);
648       stmt.setLong(2, nowCheckpointId);
649       int rows = stmt.executeUpdate();
650       stmt.close();
651
652       if (logger.isInfoEnabled())
653       {
654         logger.info(rows
655             + " outdated log entries have been removed from the recovery log");
656
657         // Print checkpoints that will be deleted
658
stmt = recoveryLog.getDatabaseConnection().prepareStatement(
659             "SELECT * FROM " + recoveryLog.getCheckpointTableName()
660                 + " WHERE ?<log_id AND log_id<?");
661         stmt.setLong(1, commonCheckpointId);
662         stmt.setLong(2, nowCheckpointId);
663         ResultSet JavaDoc rs = stmt.executeQuery();
664         while (rs.next())
665         {
666           logger.info("Checkpoint " + rs.getString(1) + " (" + rs.getLong(2)
667               + ") will be deleted.");
668         }
669         if (rs != null)
670           rs.close();
671         stmt.close();
672       }
673
674       // Now delete checkpoints
675
stmt = recoveryLog.getDatabaseConnection().prepareStatement(
676           "DELETE FROM " + recoveryLog.getCheckpointTableName()
677               + " WHERE ?<log_id AND log_id<?");
678       stmt.setLong(1, commonCheckpointId);
679       stmt.setLong(2, nowCheckpointId);
680       rows = stmt.executeUpdate();
681
682       if (logger.isInfoEnabled())
683         logger
684             .info(rows
685                 + " out of sync checkpoints have been removed from the recovery log");
686
687     }
688     catch (SQLException JavaDoc e)
689     {
690       throw new SQLException JavaDoc(Translate.get(
691           "recovery.jdbc.entries.remove.failed", e.getMessage()));
692     }
693     finally
694     {
695       try
696       {
697         if (stmt != null)
698           stmt.close();
699       }
700       catch (Exception JavaDoc ignore)
701       {
702       }
703     }
704   }
705
706   /**
707    * Log the requests from queue until the thread is explicetly killed. The
708    * logger used is the one of the RecoveryLog.
709    */

710   public void run()
711   {
712     while (!killed)
713     {
714       synchronized (this)
715       {
716         while (getLogQueueIsEmpty() && !killed)
717         {
718           try
719           {
720             wait();
721           }
722           catch (InterruptedException JavaDoc e)
723           {
724             logger.warn(Translate.get("recovery.jdbc.loggerthread.awaken"), e);
725           }
726         }
727         if (killed)
728           break;
729         // Pump first log entry from the queue but leave it in the queue to show
730
// that we are processing it
731
currentEvent = (LogEvent) logQueue.getFirst();
732       }
733       try
734       {
735         currentEvent.execute(this);
736       }
737       finally
738       { // Remove from the queue anyway
739
synchronized (this)
740         {
741           logQueue.removeFirst();
742         }
743       }
744     }
745
746     // Ensure that the log is empty.
747
int finalLogSize = logQueue.size();
748     if (finalLogSize > 0)
749     {
750       logger.warn("Log queue contains requests following shutdown: "
751           + finalLogSize);
752     }
753     logger.info("Logger thread ending: " + this.getName());
754   }
755
756   /**
757    * Shutdown the current thread. This will cause the log to terminate as soon
758    * as the current event is finished processing. Any remaining events in the
759    * log queue will be discarded.
760    */

761   public synchronized void shutdown()
762   {
763     killed = true;
764     logger.info("Log shutdown method has been invoked");
765     notify();
766   }
767
768 }
769
Popular Tags