KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > kaha > impl > async > NIODataFileAppender


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.kaha.impl.async;
19
20 import java.io.IOException JavaDoc;
21 import java.io.RandomAccessFile JavaDoc;
22 import java.nio.ByteBuffer JavaDoc;
23 import java.nio.channels.FileChannel JavaDoc;
24
25 /**
26  * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently
27  * copy data to files.
28  *
29  * @version $Revision: 1.1.1.1 $
30  */

31 class NIODataFileAppender extends DataFileAppender {
32     
33     public NIODataFileAppender(AsyncDataManager fileManager) {
34         super(fileManager);
35     }
36
37     /**
38      * The async processing loop that writes to the data files and
39      * does the force calls.
40      *
41      * Since the file sync() call is the slowest of all the operations,
42      * this algorithm tries to 'batch' or group together several file sync() requests
43      * into a single file sync() call. The batching is accomplished attaching the
44      * same CountDownLatch instance to every force request in a group.
45      *
46      */

47     protected void processQueue() {
48         DataFile dataFile=null;
49         RandomAccessFile JavaDoc file=null;
50         FileChannel JavaDoc channel=null;
51
52         try {
53             
54             ByteBuffer JavaDoc header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
55             ByteBuffer JavaDoc footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
56             ByteBuffer JavaDoc buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
57             
58             // Populate the static parts of the headers and footers..
59
header.putInt(0); // size
60
header.put((byte) 0); // type
61
header.put(RESERVED_SPACE); // reserved
62
header.put(AsyncDataManager.ITEM_HEAD_SOR);
63             footer.put(AsyncDataManager.ITEM_HEAD_EOR);
64             
65             while( true ) {
66                 
67                 Object JavaDoc o = null;
68
69                 // Block till we get a command.
70
synchronized(enqueueMutex) {
71                     while( true ) {
72                         if( shutdown ) {
73                             o = SHUTDOWN_COMMAND;
74                             break;
75                         }
76                         if( nextWriteBatch!=null ) {
77                             o = nextWriteBatch;
78                             nextWriteBatch=null;
79                             break;
80                         }
81                         enqueueMutex.wait();
82                     }
83                     enqueueMutex.notify();
84                 }
85                 
86                 
87                 if( o == SHUTDOWN_COMMAND ) {
88                     break;
89                 }
90                 
91                 WriteBatch wb = (WriteBatch) o;
92                 if( dataFile != wb.dataFile ) {
93                     if( file!=null ) {
94                         dataFile.closeRandomAccessFile(file);
95                     }
96                     dataFile = wb.dataFile;
97                     file = dataFile.openRandomAccessFile(true);
98                     channel = file.getChannel();
99                 }
100                 
101                 WriteCommand write = wb.first;
102                 
103                 // Write all the data.
104
// Only need to seek to first location.. all others
105
// are in sequence.
106
file.seek(write.location.getOffset());
107      
108                 //
109
// is it just 1 big write?
110
if( wb.size == write.location.getSize() ) {
111                     
112                     header.clear();
113                     header.putInt(write.location.getSize());
114                     header.put(write.location.getType());
115                     header.clear();
116                     transfer(header, channel);
117                     ByteBuffer JavaDoc source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
118                     transfer(source, channel);
119                     footer.clear();
120                     transfer(footer, channel);
121                     
122                 } else {
123                     
124                     // Combine the smaller writes into 1 big buffer
125
while( write!=null ) {
126                     
127                         header.clear();
128                         header.putInt(write.location.getSize());
129                         header.put(write.location.getType());
130                         header.clear();
131                         copy(header, buffer);
132                         assert !header.hasRemaining();
133                         
134                         ByteBuffer JavaDoc source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
135                         copy(source, buffer);
136                         assert !source.hasRemaining();
137
138                         footer.clear();
139                         copy(footer, buffer);
140                         assert !footer.hasRemaining();
141     
142                         write = (WriteCommand) write.getNext();
143                     }
144                     
145                     // Fully write out the buffer..
146
buffer.flip();
147                     transfer(buffer, channel);
148                     buffer.clear();
149                 }
150                     
151                 file.getChannel().force(false);
152
153                 WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
154                 dataManager.setLastAppendLocation( lastWrite.location );
155
156                 // Signal any waiting threads that the write is on disk.
157
if( wb.latch!=null ) {
158                     wb.latch.countDown();
159                 }
160                 
161                 // Now that the data is on disk, remove the writes from the in flight
162
// cache.
163
write = wb.first;
164                 while( write!=null ) {
165                     if( !write.sync ) {
166                         inflightWrites.remove(new WriteKey(write.location));
167                     }
168                     write = (WriteCommand) write.getNext();
169                 }
170             }
171             
172         } catch (IOException JavaDoc e) {
173             synchronized( enqueueMutex ) {
174                 firstAsyncException = e;
175             }
176         } catch (InterruptedException JavaDoc e) {
177         } finally {
178             try {
179                 if( file!=null ) {
180                     dataFile.closeRandomAccessFile(file);
181                 }
182             } catch (IOException JavaDoc e) {
183             }
184             shutdownDone.countDown();
185         }
186     }
187
188     /**
189      * Copy the bytes in header to the channel.
190      * @param header - source of data
191      * @param channel - destination where the data will be written.
192      * @throws IOException
193      */

194     private void transfer(ByteBuffer JavaDoc header, FileChannel JavaDoc channel) throws IOException JavaDoc {
195         while (header.hasRemaining()) {
196             channel.write(header);
197         }
198     }
199
200     private int copy(ByteBuffer JavaDoc src, ByteBuffer JavaDoc dest) {
201         int rc = Math.min(dest.remaining(), src.remaining());
202         if( rc > 0 ) {
203             // Adjust our limit so that we don't overflow the dest buffer.
204
int limit = src.limit();
205             src.limit(src.position()+rc);
206             dest.put(src);
207             // restore the limit.
208
src.limit(limit);
209         }
210         return rc;
211     }
212         
213 }
214
Popular Tags