KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > recoverylog > LoggerThread


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.cjdbc.controller.recoverylog;
26
27 import java.sql.PreparedStatement JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.util.LinkedList JavaDoc;
30
31 import org.objectweb.cjdbc.common.i18n.Translate;
32 import org.objectweb.cjdbc.common.log.Trace;
33 import org.objectweb.cjdbc.controller.recoverylog.events.LogEvent;
34
35 /**
36  * Logger thread for the RecoveryLog.
37  *
38  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
39  * @version 1.0
40  */

41 public class LoggerThread extends Thread JavaDoc
42 {
43   private boolean killed = false; // Control thread death
44
private LinkedList JavaDoc logQueue;
45   private Trace logger;
46   private PreparedStatement JavaDoc logStmt;
47   private PreparedStatement JavaDoc unlogStmt;
48   private RecoveryLog recoveryLog;
49
50   /**
51    * Creates a new <code>LoggerThread</code> object
52    *
53    * @param log the RecoveryLog that instanciates this thread
54    */

55   public LoggerThread(RecoveryLog log)
56   {
57     super("LoggerThread");
58     this.recoveryLog = log;
59     this.logger = RecoveryLog.logger;
60     logStmt = null;
61     unlogStmt = null;
62     logQueue = new LinkedList JavaDoc();
63   }
64
65   /**
66    * Returns the logger value.
67    *
68    * @return Returns the logger.
69    */

70   public Trace getLogger()
71   {
72     return logger;
73   }
74
75   /**
76    * Tells whether there are pending logs
77    *
78    * @return true if no more jobs in the log queue
79    */

80   public synchronized boolean getLogQueueIsEmpty()
81   {
82     if (logQueue.isEmpty())
83     {
84       // Notifies the Recovery log that the queue is empty.
85
notify();
86       return true;
87     }
88     else
89     {
90       return false;
91     }
92   }
93
94   /**
95    * Return a PreparedStatement to log an entry as follows:
96    * <p>
97    * INSERT INTO LogTableName VALUES(?,?,?,?)
98    *
99    * @return a PreparedStatement
100    * @throws SQLException if an error occurs
101    */

102   public PreparedStatement JavaDoc getLogPreparedStatement() throws SQLException JavaDoc
103   {
104     if (logStmt == null)
105     {
106       logStmt = recoveryLog.getDatabaseConnection().prepareStatement(
107           "INSERT INTO " + recoveryLog.getLogTableName() + " VALUES(?,?,?,?)");
108     }
109     return logStmt;
110   }
111
112   /**
113    * Returns the recoveryLog value.
114    *
115    * @return Returns the recoveryLog.
116    */

117   public RecoveryLog getRecoveryLog()
118   {
119     return recoveryLog;
120   }
121
122   /**
123    * Return a PreparedStatement to unlog an entry as follows:
124    * <p>
125    * DELETE FROM LogTableName WHERE id=? AND vlogin=? AND SqlColumnName=? AND
126    * transaction_id=?
127    *
128    * @return a PreparedStatement
129    * @throws SQLException if an error occurs
130    */

131   public PreparedStatement JavaDoc getUnlogPreparedStatement() throws SQLException JavaDoc
132   {
133     if (unlogStmt == null)
134     {
135       unlogStmt = recoveryLog.getDatabaseConnection().prepareStatement(
136           "DELETE FROM " + recoveryLog.getLogTableName()
137               + " WHERE id=? AND vlogin=? AND "
138               + recoveryLog.getLogTableSqlColumnName()
139               + "=? AND transaction_id=?");
140     }
141     return unlogStmt;
142   }
143
144   /**
145    * Invalidate both logStmt and unlogStmt so that they can be renewed from a
146    * fresh connection.
147    *
148    * @see #getLogPreparedStatement()
149    * @see #getUnlogPreparedStatement()
150    */

151   public void invalidateLogStatements()
152   {
153     try
154     {
155       logStmt.close();
156     }
157     catch (Exception JavaDoc ignore)
158     {
159     }
160     try
161     {
162       unlogStmt.close();
163     }
164     catch (Exception JavaDoc ignore)
165     {
166     }
167     logStmt = null;
168     unlogStmt = null;
169     recoveryLog.invalidateInternalConnection();
170   }
171
172   /**
173    * Log a write-query into the recovery log. This posts the specified logObject
174    * (query) into this loggerThread queue. The actual write to the recoverly-log
175    * db is performed asynchronously by the thread.
176    *
177    * @param logObject the log event to be processed
178    */

179   public synchronized void log(LogEvent logObject)
180   {
181     logQueue.addLast(logObject);
182     notify();
183   }
184
185   /**
186    * Put back a log entry at the head of the queue in case a problem happened
187    * with this entry and we need to retry it right away.
188    *
189    * @param event the event to be used next by the logger thread.
190    */

191   public synchronized void putBackAtHeadOfQueue(LogEvent event)
192   {
193     logQueue.addFirst(event);
194     notify();
195   }
196
197   /**
198    * Remove all queries that have not been logged yet and belonging to the
199    * specified transaction.
200    *
201    * @param tid transaction id to rollback
202    */

203   public synchronized void removeQueriesOfTransactionFromQueue(long tid)
204   {
205     if (logger.isDebugEnabled())
206       logger.debug(Translate.get("recovery.jdbc.loggerthread.removing", tid));
207     LogEvent logEvent;
208     for (int i = 0; i < logQueue.size(); i++)
209     {
210       logEvent = (LogEvent) logQueue.get(i);
211       if (logEvent.belongToTransaction(tid))
212       {
213         logQueue.remove(i);
214         i--;
215       }
216     }
217   }
218
219   /**
220    * Remove a transaction that has rollbacked (no check is made if the
221    * transaction has really rollbacked or not).
222    *
223    * @param transactionId the id of the transaction
224    * @throws SQLException if an error occurs
225    */

226   public void removeRollbackedTransaction(long transactionId)
227       throws SQLException JavaDoc
228   {
229     // The transaction failed
230
// Remove the requests with this transactionId from the database
231
removeQueriesOfTransactionFromQueue(transactionId);
232     PreparedStatement JavaDoc stmt = null;
233     try
234     {
235       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
236           "DELETE FROM " + recoveryLog.getLogTableName()
237               + " WHERE transaction_id=?");
238       stmt.setLong(1, transactionId);
239       stmt.executeUpdate();
240     }
241     catch (SQLException JavaDoc e)
242     {
243       throw new SQLException JavaDoc(Translate.get(
244           "recovery.jdbc.transaction.remove.failed", new String JavaDoc[]{
245               String.valueOf(transactionId), e.getMessage()}));
246     }
247     finally
248     {
249       try
250       {
251         if (stmt != null)
252           stmt.close();
253       }
254       catch (Exception JavaDoc ignore)
255       {
256       }
257     }
258   }
259
260   /**
261    * Delete all entries from the CheckpointTable.
262    *
263    * @throws SQLException if an error occurs
264    */

265   public void deleteCheckpointTable() throws SQLException JavaDoc
266   {
267     // First delete from the checkpoint table
268
PreparedStatement JavaDoc stmt = null;
269     try
270     {
271       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
272           "DELETE FROM " + recoveryLog.getCheckpointTableName());
273       stmt.executeUpdate();
274     }
275     catch (SQLException JavaDoc e)
276     {
277       String JavaDoc msg = "Failed to delete checkpoint table";
278       logger.warn(msg, e);
279       throw new SQLException JavaDoc(msg);
280     }
281     finally
282     {
283       try
284       {
285         if (stmt != null)
286           stmt.close();
287       }
288       catch (Exception JavaDoc ignore)
289       {
290       }
291     }
292   }
293
294   /**
295    * Looks like a copy paste from RecoveryLog#storeCheckpoint(String, long), but
296    * does not wait for queue completion to store the checkpoint. Moreover, in
297    * case of error, additionely closes and invalidates log and unlog statements
298    * (internal) before calling RecoveryLog#invalidateInternalConnection().
299    *
300    * @param checkpointName checkpoint name to insert
301    * @param checkpointId checkpoint request identifier
302    * @throws SQLException if a database access error occurs
303    * @see RecoveryLog#storeCheckpoint(String, long)
304    * @see #invalidateLogStatements()
305    */

306   public void storeCheckpoint(String JavaDoc checkpointName, long checkpointId)
307       throws SQLException JavaDoc
308   {
309     PreparedStatement JavaDoc stmt = null;
310     try
311     {
312       if (logger.isDebugEnabled())
313         logger.debug("Storing checkpoint " + checkpointName + " at request id "
314             + checkpointId);
315       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
316           "INSERT INTO " + recoveryLog.getCheckpointTableName()
317               + " VALUES(?,?)");
318       stmt.setString(1, checkpointName);
319       stmt.setLong(2, checkpointId);
320       stmt.executeUpdate();
321     }
322     catch (SQLException JavaDoc e)
323     {
324       invalidateLogStatements();
325       throw new SQLException JavaDoc(Translate.get(
326           "recovery.jdbc.checkpoint.store.failed", new String JavaDoc[]{checkpointName,
327               e.getMessage()}));
328     }
329     finally
330     {
331       try
332       {
333         if (stmt != null)
334           stmt.close();
335       }
336       catch (Exception JavaDoc ignore)
337       {
338       }
339     }
340   }
341
342   /**
343    * Delete all LogEntries with an identifier lower than oldId (inclusive).
344    * oldId is normally derived from a checkpoint name, which marks the last
345    * request before the checkpoint.
346    *
347    * @param oldId the id up to which entries should be removed.
348    * @throws SQLException if an error occurs
349    */

350   public void deleteLogEntriesBeforeId(long oldId) throws SQLException JavaDoc
351   {
352     PreparedStatement JavaDoc stmt = null;
353     try
354     {
355       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
356           "DELETE FROM " + recoveryLog.getLogTableName() + " WHERE id<=?");
357       stmt.setLong(1, oldId);
358       stmt.executeUpdate();
359     }
360     catch (SQLException JavaDoc e)
361     {
362       // TODO: Check error message below
363
throw new SQLException JavaDoc(Translate.get(
364           "recovery.jdbc.transaction.remove.failed", new String JavaDoc[]{
365               String.valueOf(oldId), e.getMessage()}));
366     }
367     finally
368     {
369       try
370       {
371         if (stmt != null)
372           stmt.close();
373       }
374       catch (Exception JavaDoc ignore)
375       {
376       }
377     }
378   }
379
380   /**
381    * Shift LogEntries identifiers from the specified value (value is added to
382    * existing identifiers).
383    *
384    * @param shiftValue the value to shift
385    * @throws SQLException if an error occurs
386    */

387   public void shiftLogEntriesIds(long shiftValue) throws SQLException JavaDoc
388   {
389     PreparedStatement JavaDoc stmt = null;
390     try
391     {
392       stmt = recoveryLog.getDatabaseConnection().prepareStatement(
393           "UPDATE " + recoveryLog.getLogTableName() + " SET id=id+?");
394       stmt.setLong(1, shiftValue);
395       stmt.executeUpdate();
396     }
397     catch (SQLException JavaDoc e)
398     {
399       // TODO: Check error message below
400
throw new SQLException JavaDoc(Translate.get(
401           "recovery.jdbc.transaction.remove.failed", new String JavaDoc[]{
402               String.valueOf(shiftValue), e.getMessage()}));
403     }
404     finally
405     {
406       try
407       {
408         if (stmt != null)
409           stmt.close();
410       }
411       catch (Exception JavaDoc ignore)
412       {
413       }
414     }
415   }
416
417   /**
418    * Log the requests from queue until the thread is explicetly killed. The
419    * logger used is the one of the RecoveryLog.
420    */

421   public void run()
422   {
423     LogEvent event;
424
425     while (!killed)
426     {
427       synchronized (this)
428       {
429         while (getLogQueueIsEmpty() && !killed)
430         {
431           try
432           {
433             wait();
434           }
435           catch (InterruptedException JavaDoc e)
436           {
437             logger.warn(Translate.get("recovery.jdbc.loggerthread.awaken"), e);
438           }
439         }
440         if (killed)
441           break;
442         // Pump first log entry from the queue
443
event = (LogEvent) logQueue.remove(0);
444         event.execute(this);
445       }
446     }
447     logger.debug("JDBC Logger thread ending");
448     invalidateLogStatements();
449   }
450
451   /**
452    * Shutdown the current thread.
453    */

454   public synchronized void shutdown()
455   {
456     killed = true;
457     notify();
458   }
459
460 }
461
Popular Tags