KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > kaha > impl > async > DataFileAppender


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.kaha.impl.async;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InterruptedIOException JavaDoc;
22 import java.io.RandomAccessFile JavaDoc;
23 import java.util.concurrent.ConcurrentHashMap JavaDoc;
24 import java.util.concurrent.CountDownLatch JavaDoc;
25
26 import org.apache.activemq.util.ByteSequence;
27 import org.apache.activemq.util.DataByteArrayOutputStream;
28 import org.apache.activemq.util.LinkedNode;
29
30 /**
31  * An optimized writer to do batch appends to a data file. This object is thread safe
32  * and gains throughput as you increase the number of concurrent writes it does.
33  *
34  * @version $Revision: 1.1.1.1 $
35  */

36 class DataFileAppender {
37     
38     protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
39     protected static final String JavaDoc SHUTDOWN_COMMAND = "SHUTDOWN";
40     int MAX_WRITE_BATCH_SIZE = 1024*1024*4;
41     
42     static public class WriteKey {
43         private final int file;
44         private final long offset;
45         private final int hash;
46
47         public WriteKey(Location item){
48             file = item.getDataFileId();
49             offset = item.getOffset();
50             // TODO: see if we can build a better hash
51
hash = (int) (file ^ offset);
52         }
53      
54         public int hashCode() {
55             return hash;
56         }
57         
58         public boolean equals(Object JavaDoc obj) {
59             WriteKey di = (WriteKey)obj;
60             return di.file == file && di.offset == offset;
61         }
62     }
63     
64     public class WriteBatch {
65         
66         public final DataFile dataFile;
67         public final WriteCommand first;
68         public final CountDownLatch JavaDoc latch = new CountDownLatch JavaDoc(1);
69         public int size;
70         
71         public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException JavaDoc {
72             this.dataFile=dataFile;
73             this.first=write;
74             size+=write.location.getSize();
75         }
76         
77         public boolean canAppend(DataFile dataFile, WriteCommand write) {
78             if( dataFile != this.dataFile )
79                 return false;
80             if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE )
81                 return false;
82             return true;
83         }
84         
85         public void append(WriteCommand write) throws IOException JavaDoc {
86             this.first.getTailNode().linkAfter(write);
87             size+=write.location.getSize();
88         }
89     }
90     
91     public static class WriteCommand extends LinkedNode {
92         public final Location location;
93         public final ByteSequence data;
94         final boolean sync;
95         
96         public WriteCommand(Location location, ByteSequence data, boolean sync) {
97             this.location = location;
98             this.data = data;
99             this.sync = sync;
100         }
101     }
102     
103     protected final AsyncDataManager dataManager;
104     
105     protected final ConcurrentHashMap JavaDoc<WriteKey, WriteCommand> inflightWrites;
106     
107     protected final Object JavaDoc enqueueMutex = new Object JavaDoc();
108     protected WriteBatch nextWriteBatch;
109     
110     private boolean running;
111     protected boolean shutdown;
112     protected IOException JavaDoc firstAsyncException;
113     protected final CountDownLatch JavaDoc shutdownDone = new CountDownLatch JavaDoc(1);
114     private Thread JavaDoc thread;
115     
116     /**
117      * Construct a Store writer
118      *
119      * @param fileId
120      */

121     public DataFileAppender(AsyncDataManager dataManager){
122         this.dataManager=dataManager;
123         this.inflightWrites = this.dataManager.getInflightWrites();
124     }
125     
126     /**
127      * @param type
128      * @param marshaller
129      * @param payload
130      * @param type
131      * @param sync
132      * @return
133      * @throws IOException
134      * @throws
135      * @throws
136      */

137     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException JavaDoc {
138                 
139         // Write the packet our internal buffer.
140
int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
141         
142         final Location location=new Location();
143         location.setSize(size);
144         location.setType(type);
145         
146         WriteBatch batch;
147         WriteCommand write = new WriteCommand(location, data, sync);
148
149         // Locate datafile and enqueue into the executor in sychronized block so that
150
// writes get equeued onto the executor in order that they were assigned by
151
// the data manager (which is basically just appending)
152

153         synchronized(this) {
154             // Find the position where this item will land at.
155
DataFile dataFile=dataManager.allocateLocation(location);
156             batch = enqueue(dataFile, write);
157         }
158         location.setLatch(batch.latch);
159         if( sync ) {
160             try {
161                 batch.latch.await();
162             } catch (InterruptedException JavaDoc e) {
163                 throw new InterruptedIOException JavaDoc();
164             }
165         } else {
166             inflightWrites.put(new WriteKey(location), write);
167         }
168         
169         return location;
170     }
171
172     private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException JavaDoc {
173         synchronized(enqueueMutex) {
174             WriteBatch rc=null;
175             if( shutdown ) {
176                 throw new IOException JavaDoc("Async Writter Thread Shutdown");
177             }
178             if( firstAsyncException !=null )
179                 throw firstAsyncException;
180             
181             if( !running ) {
182                 running=true;
183                 thread = new Thread JavaDoc() {
184                     public void run() {
185                         processQueue();
186                     }
187                 };
188                 thread.setPriority(Thread.MAX_PRIORITY);
189                 thread.setDaemon(true);
190                 thread.setName("ActiveMQ Data File Writer");
191                 thread.start();
192             }
193             
194             if( nextWriteBatch == null ) {
195                 nextWriteBatch = new WriteBatch(dataFile,write);
196                 rc = nextWriteBatch;
197                 enqueueMutex.notify();
198             } else {
199                 // Append to current batch if possible..
200
if( nextWriteBatch.canAppend(dataFile, write) ) {
201                     nextWriteBatch.append(write);
202                     rc = nextWriteBatch;
203                 } else {
204                     // Otherwise wait for the queuedCommand to be null
205
try {
206                         while( nextWriteBatch!=null ) {
207                             enqueueMutex.wait();
208                         }
209                     } catch (InterruptedException JavaDoc e) {
210                         throw new InterruptedIOException JavaDoc();
211                     }
212                     if( shutdown ) {
213                         throw new IOException JavaDoc("Async Writter Thread Shutdown");
214                     }
215                     
216                     // Start a new batch.
217
nextWriteBatch = new WriteBatch(dataFile,write);
218                     rc = nextWriteBatch;
219                     enqueueMutex.notify();
220                 }
221             }
222             return rc;
223         }
224     }
225
226     public void close() throws IOException JavaDoc {
227         synchronized( enqueueMutex ) {
228             if( shutdown == false ) {
229                 shutdown = true;
230                 if( running ) {
231                     enqueueMutex.notifyAll();
232                 } else {
233                     shutdownDone.countDown();
234                 }
235             }
236         }
237         
238         try {
239             shutdownDone.await();
240         } catch (InterruptedException JavaDoc e) {
241             throw new InterruptedIOException JavaDoc();
242         }
243         
244     }
245
246     /**
247      * The async processing loop that writes to the data files and
248      * does the force calls.
249      *
250      * Since the file sync() call is the slowest of all the operations,
251      * this algorithm tries to 'batch' or group together several file sync() requests
252      * into a single file sync() call. The batching is accomplished attaching the
253      * same CountDownLatch instance to every force request in a group.
254      *
255      */

256     protected void processQueue() {
257         DataFile dataFile=null;
258         RandomAccessFile JavaDoc file=null;
259         try {
260             
261             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
262             while( true ) {
263                 
264                 Object JavaDoc o = null;
265
266                 // Block till we get a command.
267
synchronized(enqueueMutex) {
268                     while( true ) {
269                         if( shutdown ) {
270                             o = SHUTDOWN_COMMAND;
271                             break;
272                         }
273                         if( nextWriteBatch!=null ) {
274                             o = nextWriteBatch;
275                             nextWriteBatch=null;
276                             break;
277                         }
278                         enqueueMutex.wait();
279                     }
280                     enqueueMutex.notify();
281                 }
282                 
283                 
284                 if( o == SHUTDOWN_COMMAND ) {
285                     break;
286                 }
287                 
288                 WriteBatch wb = (WriteBatch) o;
289                 if( dataFile != wb.dataFile ) {
290                     if( file!=null ) {
291                         dataFile.closeRandomAccessFile(file);
292                     }
293                     dataFile = wb.dataFile;
294                     file = dataFile.openRandomAccessFile(true);
295                 }
296                 
297                 WriteCommand write = wb.first;
298                 
299                 // Write all the data.
300
// Only need to seek to first location.. all others
301
// are in sequence.
302
file.seek(write.location.getOffset());
303                 
304                 //
305
// is it just 1 big write?
306
if( wb.size == write.location.getSize() ) {
307                     
308                     // Just write it directly..
309
file.writeInt(write.location.getSize());
310                     file.writeByte(write.location.getType());
311                     file.write(RESERVED_SPACE);
312                     file.write(AsyncDataManager.ITEM_HEAD_SOR);
313                     file.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
314                     file.write(AsyncDataManager.ITEM_HEAD_EOR);
315                     
316                 } else {
317                     
318                     // Combine the smaller writes into 1 big buffer
319
while( write!=null ) {
320     
321                         buff.writeInt(write.location.getSize());
322                         buff.writeByte(write.location.getType());
323                         buff.write(RESERVED_SPACE);
324                         buff.write(AsyncDataManager.ITEM_HEAD_SOR);
325                         buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
326                         buff.write(AsyncDataManager.ITEM_HEAD_EOR);
327                         
328                         write = (WriteCommand) write.getNext();
329                     }
330                     
331                     // Now do the 1 big write.
332
ByteSequence sequence = buff.toByteSequence();
333                     file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
334                     buff.reset();
335                 }
336                 
337                 file.getFD().sync();
338                 
339                 WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
340                 dataManager.setLastAppendLocation( lastWrite.location );
341                 
342                 // Signal any waiting threads that the write is on disk.
343
wb.latch.countDown();
344                 
345                 // Now that the data is on disk, remove the writes from the in flight
346
// cache.
347
write = wb.first;
348                 while( write!=null ) {
349                     if( !write.sync ) {
350                         inflightWrites.remove(new WriteKey(write.location));
351                     }
352                     write = (WriteCommand) write.getNext();
353                 }
354             }
355             
356         } catch (IOException JavaDoc e) {
357             synchronized( enqueueMutex ) {
358                 firstAsyncException = e;
359             }
360         } catch (InterruptedException JavaDoc e) {
361         } finally {
362             try {
363                 if( file!=null ) {
364                     dataFile.closeRandomAccessFile(file);
365                 }
366             } catch (IOException JavaDoc e) {
367             }
368             shutdownDone.countDown();
369         }
370     }
371         
372 }
373
Popular Tags