KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > server > journal > impl > SimpleJournal


1 package com.ubermq.jms.server.journal.impl;
2
3 import com.ubermq.jms.common.datagram.*;
4 import com.ubermq.kernel.*;
5 import java.io.*;
6 import java.nio.*;
7 import java.util.*;
8
9 import com.ubermq.jms.server.journal.IJournal;
10 import com.ubermq.kernel.overflow.DropIncoming;
11 import java.nio.channels.FileChannel JavaDoc;
12
13 /**
14  * A simple implementation of a message journal that
15  * keeps track of the current checkpoint and limit of valid data
16  * as two separate integer offsets at the beginning of the log file
17  * provided to us by the durable proxy.
18  * <P>
19  * This is not a rolling log implementation and thus creates a lot of
20  * disk seeks because it continually writes to the same
21  * file location.
22  * <P>
23  */

24 public final class SimpleJournal
25     implements IJournal
26 {
27     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SimpleJournal.class);
28     
29     private RandomAccessFile logFile;
30     private FileChannel JavaDoc journalChannel;
31     private MappedByteBuffer logfileBuffer;
32     private IntBuffer offsetBuffer;
33     private ByteBuffer journalBuffer;
34     private Map messageOffsets;
35
36     private IDatagramFactory factory;
37
38     private static final int POSITION_INDEX = 0;
39     private static final int LIMIT_INDEX = 1;
40     private static final int OFFSET_DATA_SIZE = 8;
41
42     private static final int OFFSET_INITIAL_MAP_SIZE = 50;
43
44     /**
45      * Creates a journalling system using the specified
46      * file and a maximum size that the file can become.
47      * This implementation immediately creates the file
48      * to be maximum size if it does not exist.
49      * @param f a file, that may or may not exist
50      * @param factory the datagram factory used to interpret
51      * the contents of the journal file.
52      * @param maximumSize the largest size that the file may
53      * occupy.
54      */

55     public SimpleJournal(File f,
56                          IDatagramFactory factory,
57                          long maximumSize)
58         throws IOException
59     {
60         this.factory = factory;
61         init(f, maximumSize);
62     }
63
64     private void init(File f, long size)
65         throws IOException
66     {
67         this.logFile = new RandomAccessFile(f, "rw");
68
69         // open the log file channel and the checkpoint file channel.
70
// the log file contains datagrams, one after another.
71
this.journalChannel = logFile.getChannel();
72
73         // map the journal file in its entirety.
74
// the first OFFSET_DATA_SIZE bytes are header bytes.
75
this.logfileBuffer = journalChannel.map(FileChannel.MapMode.READ_WRITE, 0, size);
76         this.offsetBuffer = ((ByteBuffer)logfileBuffer.slice().limit(OFFSET_DATA_SIZE)).asIntBuffer();
77         logfileBuffer.position(OFFSET_DATA_SIZE);
78         this.journalBuffer = logfileBuffer.slice();
79
80         // go back to where we were (if anywhere).
81
restoreWritePosition();
82
83         // create offset map
84
messageOffsets = new HashMap(OFFSET_INITIAL_MAP_SIZE);
85     }
86
87     /**
88      * Permanently destroys the journal infrastructure and eliminates
89      * all data stored in it.
90      */

91     public void destroy()
92     {
93         checkpoint(0);
94         limit(0);
95     }
96
97     public void close()
98     {
99         try
100         {
101             journalChannel.close();
102             logFile.close();
103         }
104         catch (IOException e) {
105             log.error("", e);
106         }
107     }
108
109     /**
110      * This method is key important. If a datagram takes us past the EOF,
111      * we undo the put & compact.
112      *
113      * If the compaction made no difference in
114      * our file position, we call the overflow handler.
115      */

116     private synchronized int handleOverflow(IDatagram d, int position, IOverflowHandler h)
117     {
118         log.debug("handleOverflow enetered.");
119
120         // backoff the last output and compact the buffer.
121
restoreCheckpoint();
122         journalBuffer.compact();
123
124         // check if we made any room
125
int action = IOverflowHandler.ACTION_RETRY;
126         int moved = journalBuffer.position() - position;
127         if (moved >= 0 && position > 0)
128             action = h.overflow(d);
129
130         // ok we now have room. in the process though, we've
131
// changed the offsets of all of the items in the offset map.
132
Iterator iter = messageOffsets.keySet().iterator();
133         while (iter.hasNext())
134         {
135             Object JavaDoc mid = iter.next();
136
137             // the new offset is the difference in offsets (a negative number) plus the
138
// old offset.
139
int oldOffset = ((Integer JavaDoc)messageOffsets.get(mid)).intValue();
140             int newOffset = oldOffset + moved;
141
142             // if the new offset is still inside the buffer,
143
// keep it around.
144
if (newOffset >= 0)
145                 messageOffsets.put(mid, new Integer JavaDoc(newOffset));
146             else iter.remove();
147         }
148
149         compact(moved);
150         return action;
151     }
152
153     private synchronized void restoreCheckpoint()
154     {
155         int position = offsetBuffer.get(POSITION_INDEX);
156         int limit = offsetBuffer.get(LIMIT_INDEX);
157
158         journalBuffer.position(position);
159         journalBuffer.limit(limit);
160
161         log.debug("restored checkpoint to " + journalBuffer.position() + " to " + journalBuffer.limit());
162     }
163
164     private synchronized void restoreWritePosition()
165     {
166         int limit = offsetBuffer.get(LIMIT_INDEX);
167
168         journalBuffer.limit(journalBuffer.capacity());
169         journalBuffer.position(limit);
170
171         log.debug("restored write position to " + journalBuffer.position());
172     }
173
174     private synchronized void checkpoint(int checkpt)
175     {
176         int writing = Math.min(offsetBuffer.get(LIMIT_INDEX),
177                                Math.max(offsetBuffer.get(POSITION_INDEX), checkpt));
178         log.debug("checkpointing " + writing + " requested = " + checkpt);
179         offsetBuffer.put(POSITION_INDEX, writing);
180         logfileBuffer.force();
181     }
182
183     private synchronized void limit(int limit)
184     {
185         log.debug("limit is " + limit);
186         offsetBuffer.put(LIMIT_INDEX, limit);
187         logfileBuffer.force();
188     }
189
190     private synchronized void compact(int delta)
191     {
192         log.debug("compacting by " + delta);
193         offsetBuffer.put(POSITION_INDEX, offsetBuffer.get(POSITION_INDEX) + delta);
194         offsetBuffer.put(LIMIT_INDEX, offsetBuffer.get(LIMIT_INDEX) + delta);
195         logfileBuffer.force();
196     }
197
198     /**
199      * Acknowledges a message that was previously output.
200      */

201     public void ack(MessageId id)
202     {
203         log.debug("waiting for ack on: " + messageOffsets );
204
205         // we just got an ack for this message.
206
// if we care about it, we can advance the last known good
207
// offset to whatever we stored when we send this message to the client.
208
Integer JavaDoc offset = (Integer JavaDoc)messageOffsets.get(id);
209         if (offset != null) {
210             checkpoint(offset.intValue());
211
212             // remove the offset from the set
213
messageOffsets.remove(id);
214         }
215     }
216
217     /**
218      * Determines whether the destination node is available for output.
219      * @return true if the output dest node is available
220      */

221     public boolean isOpen()
222     {
223         return true;
224     }
225
226     /**
227      * Writes the current committed state out to the journal
228      * so that resumption from a failure does not require full analysis
229      * of all transactions that occurred on the system.
230      * <P>
231      * The current implementation is constantly checkpointed due to its
232      * design, so this does not do anything.
233      */

234     public void checkpoint()
235     {
236     }
237
238     /**
239      * Outputs a datagram. If the datagram causes an overflow,
240      * the given overflow handler is called to determine whether the
241      * operation should be retried.
242      * @param d a datagram
243      * @param h the overflow handler to use
244      */

245     public void output(IDatagram d, IOverflowHandler h)
246     {
247         if (d instanceof IMessageDatagram)
248         {
249             final int position = journalBuffer.position();
250             try {
251                 ByteBuffer write = journalBuffer.slice();
252                 factory.outgoing(write, d);
253                 journalBuffer.position(journalBuffer.position() + write.position());
254                 limit(journalBuffer.position());
255
256                 // save the offset in our local cache.
257
if (d instanceof IMessageDatagram)
258                     messageOffsets.put(((IMessageDatagram)d).getMessageId(), new Integer JavaDoc(journalBuffer.position()));
259             } catch(BufferOverflowException boe) {
260                 // try again.
261
if (handleOverflow(d, position, h) == IOverflowHandler.ACTION_RETRY)
262                     output(d, h);
263             }
264         }
265     }
266
267     public void recover(final IMessageProcessor p)
268     {
269         // get the previous checkpointed state
270
int position = offsetBuffer.get(POSITION_INDEX);
271         int limit = offsetBuffer.get(LIMIT_INDEX);
272         final ByteBuffer recoveryBuffer = journalBuffer.duplicate();
273         recoveryBuffer.position(position).limit(limit);
274
275         // create a recovery manager that forwards messages onto the
276
// message processor specified.
277
TrivialConnectionInfo recoveryManager = new TrivialConnectionInfo(
278             new IMessageProcessor() {
279                 public void remove(IConnectionInfo conn) {p.remove(conn);}
280                 public void accept(IConnectionInfo conn) {p.accept(conn);}
281                 public void process(IConnectionInfo conn, IDatagram read)
282                 {
283                     // save the offset in our local cache.
284
// the actual offset after the datagram is the position in the r/w buffer.
285
// Note: we set the redelivery flag here, even if the messages
286
// were never sent to the disconnected durable subscriber. this is because
287
// we don't reliably know if we tried to recover once before and failed.
288
// so we set the flag to be safe. JP 8/23/02.
289
if (read instanceof IMessageDatagram) {
290                         IMessageDatagram md = (IMessageDatagram)read;
291
292                         messageOffsets.put(md.getMessageId(),
293                                            new Integer JavaDoc(recoveryBuffer.position()));
294
295                     }
296
297                     // deliver to ultimate destination
298
p.process(conn, read);
299                 }
300             },
301             factory,
302             recoveryBuffer);
303
304         // do the recovery
305
recoveryManager.processData();
306
307         // restore the write position
308
restoreWritePosition();
309     }
310
311     /**
312      * The connection descendant that manages recovery (read-only).
313      */

314     private final static class TrivialConnectionInfo
315         extends AbstractConnectionInfo
316     {
317         private ByteBuffer theBuffer;
318
319         private TrivialConnectionInfo(IMessageProcessor mp,
320                                       IDatagramFactory f,
321                                       ByteBuffer b)
322         {
323             super(mp, f, b, b);
324             this.theBuffer = b;
325         }
326
327         public void start()
328         {
329         }
330
331         public void stop()
332         {
333         }
334
335         public void close()
336         {
337         }
338
339         public final int doWrite(ByteBuffer writeBuffer) throws IOException
340         {
341             // NOOP.
342
// we are saving these messages in the memory mapped file buffer
343
// for later delivery. no compaction or flushing is supported.
344
return 0;
345         }
346
347         protected void preProcessData()
348         {
349             // do nothing here. super class flips the buffer, but we
350
// already do that when we call restore checkpoint
351
// we go into the processData(), the only method
352
// that ever reads data from the log file,
353
// with position = lastCheckpt and limit = lastWritten
354
// as we should.
355
}
356
357         protected void postProcessData()
358         {
359         }
360     }
361
362
363 }
364
Popular Tags