KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > tm > recovery > BatchWriter


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.tm.recovery;
23
24 import EDU.oswego.cs.dl.util.concurrent.Latch;
25
26 import java.io.File JavaDoc;
27 import java.io.IOException JavaDoc;
28 import java.nio.ByteBuffer JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Collections JavaDoc;
31 import java.util.List JavaDoc;
32
33 import org.jboss.logging.Logger;
34
35 /**
36  * This class batches log write requests in order to minimize disk forcing
37  * activity. A <code>BatchWriter</code> has a current <code>BatchLog</code>
38  * instance, to which it writes log records, plus a pool of clean
39  * <code>BatchLog</code> instances, to be used when the current
40  * <code>BatchLog</code> gets full. It maintains a queue of write requests
41  * and implements a writer thread that takes write requests from the queue
42  * and batches them together in order to minimize disk forces.
43  * <p>
44  * The <code>BatchLog</code> instances owned by a <code>BatchWriter</code>
45  * know their owner <code>BatchWriter</code> and call the methods
46  * <code>restartBatchLog</code> and <code>getNextLogs</code> on their owner.
47  *
48  * @author <a HREF="mailto:bill@jboss.org">Bill Burke</a>
49  * @author <a HREF="mailto:reverbel@ime.usp.br">Francisco Reverbel</a>
50  * @version $Revision: 37459 $
51  */

52 class BatchWriter implements Runnable JavaDoc
53 {
54
55    /** Class <code>Logger</code> for trace messages. */
56    private static Logger errorLog = Logger.getLogger(BatchWriter.class);
57
58    /** True if trace messages should be logged. */
59    private static boolean traceEnabled = errorLog.isTraceEnabled();
60
61    /** The directory in which log files will be created. */
62    private final File JavaDoc dir;
63    
64    /** The initial capacity of the queue of write requests. */
65    private final int initialCapacity;
66    
67    /** The current <code>BatchLog</code> instance. */
68    private BatchLog log;
69    
70    /** The constant size of a recovery log file. */
71    private final int fileSize;
72    
73    /** The header object written to the beginning of a log file. */
74    private Object JavaDoc header;
75
76    /** Exception thrown when switching the current log file. */
77    private Exception JavaDoc abort;
78    
79    /** This flag can be set to false to stop the writer thread. */
80    private boolean running = true;
81    
82    /** Pool of clean <code>BatchLog</code> instances. */
83    private final List JavaDoc nextLogs = Collections.synchronizedList(new ArrayList JavaDoc());
84    
85    /** Synchronizes accesses to the <code>currentQueue</code> of requests. */
86    private Object JavaDoc batchLock = new Object JavaDoc();
87    
88    /** The current queue of write requests */
89    private ArrayList JavaDoc currentQueue;
90    
91    /** Latch released when a batch of write requests are performed. */
92    private Latch currentLatch;
93    
94    /** For asynchronously restarting <code>BatchLog</code> instances. */
95    private LogRestarter logRestarter;
96
97    /**
98     * Constructs a <code>BatchWriter</code>.
99     *
100     * @param header the header object to be placed at the beginning of the
101     * <code>BatchLog</code>s owned by this
102     * <code>BatchWriter</code>
103     * @param initialCapacity the initial capacity (in bytes) of the queue of
104     * write requests
105     * @param dir the directory in which log files will be created
106     * @param fileSize the constant size (in bytes) of a log file
107     * @param logRestarter <code>LogRestarter</code> instance that the
108     * <code>BatchWriter</code> will use for asynchronously
109     * restarting <code>BatchLog</code> instances.
110     * @throws IOException if a log file could not be created or initialized.
111     */

112    BatchWriter(Object JavaDoc header, int initialCapacity,
113                File JavaDoc dir, int fileSize, LogRestarter logRestarter)
114          throws IOException JavaDoc
115    {
116       this.header = header;
117       this.fileSize = fileSize;
118       this.initialCapacity = initialCapacity;
119       this.currentQueue = new ArrayList JavaDoc(initialCapacity);
120       this.currentLatch = new Latch();
121       this.dir = dir;
122       this.logRestarter = logRestarter;
123       log = new BatchLog(this, header, dir, fileSize);
124       nextLogs.add(new BatchLog(this, header, dir, fileSize));
125    }
126
127    /**
128     * Asynchronously restarts a given <code>BatchLog</code>. The
129     * <code>BatchLog</code> instances owned by this <code>BatchWriter</code>
130     * call this method to asynchronously restart themselves.
131     *
132     * @param logToRestart the <code>BatchLog</code> to be asynchronously
133     * restarted.
134     */

135    void restartBatchLog(BatchLog logToRestart)
136    {
137       logRestarter.add(logToRestart);
138    }
139
140    /**
141     * Gets the <code>List</code> that implements the pool of clean
142     * <code>BatchLog</code> instances of this <code>BatchWriter</code>.
143     * The <code>BatchLog</code> instances owned by this
144     * <code>BatchWriter</code> call this method after clean up, to add
145     * themselves to pool of clean <code>BatchLog</code> instances.
146     *
147     * @return the <code>List</code> of clean <code>BatchLog</code> instances
148     * kept by this <code>BatchWriter</code>.
149     */

150    List JavaDoc getNextLogs()
151    {
152       return nextLogs;
153    }
154
155    // FIXME: nobody calls this method
156
void clearAbort()
157    {
158       abort = null;
159    }
160
161    /**
162     * Stops the writer thread.
163     */

164    void stop()
165    {
166       synchronized (batchLock)
167       {
168          running = false;
169          batchLock.notifyAll();
170       }
171    }
172
173    /**
174     * Force-writes a "transaction committed" or "transaction prepared" log
175     * record to the current <code>BatchLog</code>.
176     *
177     * @param buffer a <code>ByteBuffer</code> containing a log record to
178     * be written to the current <code>BatchLog</code>
179     * @param expectEndRecord true if the log record requires another log record
180     * (a <code>TX_END</code> record) to be eventually written
181     * to the same <code>BatchLog</code>.
182     * @return a <code>TxCompletionHandler</code> that should be invoked at the
183     * end of the second phase of the two-phase commit protocol
184     * for the transaction that generated the log record.
185     */

186    TxCompletionHandler addBatch(ByteBuffer JavaDoc buffer, boolean expectEndRecord)
187    {
188       PendingWriteRequest request = null;
189       
190       if (traceEnabled)
191       {
192          errorLog.trace("Transaction log record:" +
193                         HexDump.fromBuffer(buffer.array()));
194          errorLog.trace(LogRecord.toString(buffer));
195       }
196       
197       synchronized (batchLock)
198       {
199          request = new PendingWriteRequest(buffer,
200                                            currentLatch,
201                                            expectEndRecord);
202          currentQueue.add(request);
203          batchLock.notify();
204       }
205       
206       TxCompletionHandler completionHandler = request.waitTilDone();
207       
208       if (!expectEndRecord)
209          return completionHandler;
210       else
211          return new TransactionCompletionLogger((BatchLog) completionHandler);
212    }
213
214    /**
215     * Writes a "transaction completed" (<code>TX_END</code>) record to the
216     * specified <code>BatchLog</code>. Each <code>TX_END</code> record is
217     * paired with a "transaction committed" or with a "transaction prepared"
218     * record and should be written to the same <code>BatchLog</code> as the
219     * record it is paired with. This method does not necessarily force the
220     * <code>TX_END</code> record to disk, but it might do so if that record
221     * is batched together with other records, which may require a forced
222     * write.
223     *
224     * @param buffer a <code>ByteBuffer</code> containing a <code>TX_END</code>
225     * record to be written to a given <code>BatchLog</code>
226     * @param destinationLog the <code>BatchLog</code> to which the
227     * <code>TX_END</code> record will be written.
228     */

229    void addBatch(ByteBuffer JavaDoc buffer, BatchLog destinationLog)
230    {
231       PendingWriteRequest receipt = null;
232       
233       if (traceEnabled)
234       {
235          errorLog.trace("Transaction Log record:" +
236                         HexDump.fromBuffer(buffer.array()));
237          errorLog.trace(LogRecord.toString(buffer));
238       }
239    
240       synchronized (batchLock)
241       {
242          receipt = new PendingWriteRequest(buffer,
243                                            currentLatch,
244                                            destinationLog);
245          currentQueue.add(receipt);
246          batchLock.notify();
247       }
248    }
249
250    /**
251     * The writer thread body.
252     */

253    public void run()
254    {
255       ArrayList JavaDoc work = null;
256       Latch workLatch = null;
257
258       while (running)
259       {
260          synchronized (batchLock)
261          {
262             if (currentQueue.size() > 0)
263             {
264                if (work == null)
265                   work = new ArrayList JavaDoc(initialCapacity);
266
267                // Switch currentQueue and work. All write requests that
268
// were in currentQueue will be handled as a batch of work.
269

270                ArrayList JavaDoc tmp = work;
271                work = currentQueue;
272                currentQueue = tmp;
273                workLatch = currentLatch;
274                currentLatch = new Latch();
275             }
276             else
277             {
278                // Wait for a write request.
279
try
280                {
281                   batchLock.wait();
282                }
283                catch (InterruptedException JavaDoc ignored)
284                {
285                   if (!running) break;
286                   Thread.interrupted(); // clear interrupted
287
}
288                continue;
289             }
290          }
291
292          try
293          {
294             // Perform the batch of work.
295
doWork(work);
296          }
297          catch (Exception JavaDoc e)
298          {
299             break;
300          }
301          finally
302          {
303             // Let
304
workLatch.release();
305             work.clear();
306          }
307       }
308       cleanup();
309    }
310
311    /**
312     * Performs a batch of write requests.
313     *
314     * @param work an <code>ArrayList</code> of
315     * <code>PendingWriteRequest</code>s.
316     */

317    private void doWork(ArrayList JavaDoc work)
318    {
319       if (abort != null)
320       {
321          for (int i = 0; i < work.size(); i++)
322          {
323             PendingWriteRequest request = (PendingWriteRequest) work.get(i);
324             request.setFailure(abort);
325          }
326          return;
327       }
328
329       ByteBuffer JavaDoc[] records = new ByteBuffer JavaDoc[work.size()];
330       int offset = 0;
331       try
332       {
333          int length = records.length;
334          int usedSize = log.getPosition();
335          int numTransactions;
336          boolean done = false;
337          
338          while (!done)
339          {
340             int j;
341             numTransactions = 0;
342             
343             for (int i = j = offset; i < length; i++)
344             {
345                PendingWriteRequest request = (PendingWriteRequest) work.get(i);
346                int type = request.getType();
347                records[j] = request.getBuffer();
348                if (type != PendingWriteRequest.TYPE_END)
349                {
350                   usedSize += records[j].remaining();
351                   if (type == PendingWriteRequest.TYPE_TX_MULTI_TM)
352                      usedSize += LogRecord.TX_END_LEN;
353                   
354                   if (usedSize > fileSize)
355                   {
356                      // will leave the for loop with work to do after restart
357
length = i;
358                   }
359                   else
360                   {
361                      numTransactions++;
362                      j++;
363                   }
364                }
365                else
366                {
367                   // TX_END record: which log should we use?
368
BatchLog requestedLog = request.getLogger();
369                   
370                   if (requestedLog != log)
371                   {
372                      // Use requestedLog instead of the current log
373
requestedLog.write(records[j], true);
374                      // (Do not increment j in this case!)
375
}
376                   else
377                   {
378                      // Let the record go to the current log
379
j++;
380                   }
381                }
382                   
383             }
384             done = (length == records.length);
385             length = length - offset;
386             log.write(records, offset, j - offset, numTransactions);
387             setCompletionHandler(offset, length, work);
388             if (!done)
389             {
390                restart();
391                offset = offset + length;
392                length = records.length;
393                usedSize = log.getPosition();
394             }
395          }
396       }
397       catch (IOException JavaDoc failure)
398       {
399          for (int i = offset; i < records.length - offset; i++)
400          {
401             PendingWriteRequest request = (PendingWriteRequest) work.get(i);
402             request.setFailure(failure);
403          }
404          if (abort == null)
405             restart();
406       }
407    }
408
409    /**
410     * Sets to the current <code>BatchLog</code> the completion handler of a
411     * range of requests in an <code>ArrayList</code> of
412     * <code>PendingWriteRequest</code>s.
413     *
414     * @param offset index of the first request in the range
415     * @param length number of requests in the range
416     * @param work an <code>ArrayList</code> of
417     * <code>PendingWriteRequest</code>s
418     */

419    private void setCompletionHandler(int offset, int length, ArrayList JavaDoc work)
420    {
421       for (int i = offset; i < length; i++)
422       {
423          PendingWriteRequest request = (PendingWriteRequest) work.get(i);
424          if (request.getType() != PendingWriteRequest.TYPE_END)
425             request.setCompletionHandler(log);
426       }
427    }
428
429    /**
430     * Closes all <code>BatchLog</code>s owned by this <code>BatchWriter</code>.
431     */

432    private void cleanup()
433    {
434       synchronized (nextLogs)
435       {
436
437          for (int i = 0; i < nextLogs.size(); i++)
438          {
439             BatchLog nextLog = (BatchLog) nextLogs.get(i);
440             nextLog.close();
441          }
442       }
443       log.close();
444    }
445
446    /**
447     * Switch the current <code>BatchLog</code>.
448     */

449    private void restart()
450    {
451       log.markForRestart();
452
453       if (nextLogs.size() > 0)
454          log = (BatchLog) nextLogs.remove(0);
455       else
456       {
457          try
458          {
459             log = new BatchLog(this, header, dir, fileSize);
460          }
461          catch (IOException JavaDoc e)
462          {
463             abort = new Exception JavaDoc("FAILED TO RESTART RECOVERY LOG " +
464                                   "AFTER BEING FULL", e);
465          }
466       }
467    }
468    
469 }
470
Popular Tags