KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > persistence > PersistentQueue


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Copyright (c) 2001 Rob Cauble
19  */

20 package com.presumo.jms.persistence;
21
22 import com.presumo.jms.message.JmsMessage;
23 import com.presumo.jms.plugin.MessageQueue;
24 import com.presumo.jms.resources.Resources;
25 import com.presumo.util.log.Logger;
26 import com.presumo.util.log.LoggerFactory;
27
28 import java.io.BufferedInputStream JavaDoc;
29 import java.io.BufferedOutputStream JavaDoc;
30 import java.io.DataInputStream JavaDoc;
31 import java.io.DataOutputStream JavaDoc;
32 import java.io.EOFException JavaDoc;
33 import java.io.IOException JavaDoc;
34 import java.io.File JavaDoc;
35 import java.io.FileInputStream JavaDoc;
36 import java.io.FileOutputStream JavaDoc;
37
38 import java.util.HashSet JavaDoc;
39 import java.util.Iterator JavaDoc;
40 import java.util.LinkedList JavaDoc;
41
42
43 /**
44  *
45  *
46  */

47 public class PersistentQueue implements MessageQueue
48 {
49   private File JavaDoc directory;
50   private String JavaDoc file_prefix;
51   private int max_log_file_size;
52
53
54   private File JavaDoc checkPointFile;
55   private File JavaDoc tempCheckPointFile;
56   private File JavaDoc logFile;
57     
58   // Suffixes for the various files used in the persistent queue.
59
final static String JavaDoc CP_SUFFIX = "_CheckPoint.dat";
60   final static String JavaDoc TEMP_CP_SUFFIX = "_TempCheckPoint.dat";
61   final static String JavaDoc LOG_SUFFIX = "_LogFile.dat";
62
63   // Output stream for the log
64
private DataOutputStream JavaDoc log_file_dout;
65   private FileOutputStream JavaDoc log_file_fout;
66   private BufferedOutputStream JavaDoc log_file_bout;
67
68   // In memory data structures
69
private LinkedList JavaDoc mainQueue;
70   private LinkedList JavaDoc pendingDelete;
71   private HashSet JavaDoc persistentPendingDelete;
72
73   // Version of the PersistentQueue
74
private int version = 0;
75
76
77  
78     /////////////////////////////////////////////////////////////////////////
79
// Constructors //
80
/////////////////////////////////////////////////////////////////////////
81

82   public PersistentQueue(File JavaDoc directory,
83                          String JavaDoc file_prefix,
84                          int max_log_file_size)
85   {
86     logger.entry("PersistentQueue", new Object JavaDoc []
87                  { directory, file_prefix, new Integer JavaDoc(max_log_file_size)});
88
89     this.directory = directory;
90     this.file_prefix = file_prefix;
91     this.max_log_file_size = max_log_file_size;
92
93     this.checkPointFile = new File JavaDoc(directory,file_prefix+CP_SUFFIX);
94     this.tempCheckPointFile = new File JavaDoc(directory,file_prefix+TEMP_CP_SUFFIX);
95     this.logFile = new File JavaDoc(directory,file_prefix+LOG_SUFFIX);
96
97     logger.exit("PersistentQueue", this);
98   }
99
100
101     
102     /////////////////////////////////////////////////////////////////////////
103
// Public Methods //
104
/////////////////////////////////////////////////////////////////////////
105

106   /**
107    * @see MessageQueue#isPersistent
108    */

109   public boolean isPersistent()
110   {
111     return true;
112   }
113
114   /**
115    * Opens up the persistent queue and prepares it for operation. Any
116    * previous queue is read in, or a new queue is created.
117    */

118   public synchronized void open() throws IOException JavaDoc
119   {
120     try {
121       logger.entry("open");
122       if (!directory.exists())
123         directory.mkdirs();
124       runToSteadyState();
125     }
126     finally {
127       logger.exit("open");
128     }
129   }
130
131   /**
132    * Closes the persistent queue.
133    */

134   public synchronized void close() throws IOException JavaDoc
135   {
136     try {
137       logger.entry("close");
138       
139       //if both queues are empty, it's safe to delete the files
140
if (pendingDelete != null &&
141           mainQueue != null &&
142           log_file_dout != null &&
143           getNumPersistent(mainQueue) == 0 &&
144           getNumPersistent(pendingDelete) == 0) {
145         
146         if (persistentPendingDelete != null &&
147             persistentPendingDelete.size() != 0) {
148           logger.warn("PJMSW6002");
149         }
150
151         //will get to an empty check point file and empty log file
152
runToSteadyState();
153                 
154         //close the log file
155
closeLogFileIfOpen();
156                 
157         //delete the log file (gets us back to CREATING_CP)
158
if (!logFile.delete()) {
159           throw new IOException JavaDoc("Couldn't delete file: "+logFile);
160         }
161                 
162         //delete the checkpoint file (gets us back to FRESH_INIT)
163
if (!checkPointFile.delete()) {
164           throw new IOException JavaDoc("Couldn't delete file: "+checkPointFile);
165         }
166                 
167         mainQueue = null;
168         pendingDelete = null;
169       }
170       //otherwise let's just close everything (but not delete it)
171
else {
172         closeLogFileIfOpen();
173         mainQueue = null;
174         pendingDelete = null;
175       }
176     }
177     finally {
178       logger.exit("close");
179     }
180   } // end close()
181

182
183   /**
184    * Forces a checkpoint, which causes the current log to be checked
185    * to another file, minus all deleted files. Essentially flushes
186    * deleted files from the system completely.
187    */

188   public synchronized void forceCheckPoint() throws IOException JavaDoc
189   {
190     assertOpen();
191     runToSteadyState();
192   }
193
194   /**
195    * Set the maximum size of the the log file before it is sent to
196    * checkpoint.
197    */

198   public synchronized void setMaxLogFileSize(int size) throws IOException JavaDoc
199   {
200     max_log_file_size = size;
201     if (log_file_dout != null) {
202       checkPointIfNeeded();
203     }
204   }
205
206   /**
207    * Pushes the messages contained within the array onto the persistent
208    * queue. Persistent messages will be transactionally stored on disk,
209    * and non-peresistent messages will simply be maintained in memory.
210    */

211   public synchronized void push(JmsMessage [] msgs) throws IOException JavaDoc
212   {
213     assertOpen();
214     processOp(new LogFileEntryInsert(msgs));
215     checkPointIfNeeded();
216   }
217
218
219   /**
220    * Returns the number of messages that have been deleted but have not
221    * been flushed with a checkpoint.
222    */

223   public synchronized int getNumPendingDelete()
224   {
225     assertOpen();
226     return pendingDelete.size();
227   }
228
229   /**
230    * Get the number of messages on the queue that are not pending deletion
231    */

232   public synchronized int getNumNotPendingDelete()
233   {
234     assertOpen();
235     return mainQueue.size();
236   }
237
238   /**
239    * Return the size of queue in terms of valid messages not deleted
240    */

241   public int size()
242   {
243     return getNumNotPendingDelete();
244   }
245   
246
247   /**
248    * Deletes the messages identified by the given keys from the queue. The
249    * messages will still consume resources until the next checkpoint, but
250    * will be transactionally gone from the queue (even if the system crashes
251    * before the next checkpoint).
252    */

253   public synchronized void delete(String JavaDoc [] keys) throws IOException JavaDoc
254   {
255     assertOpen();
256     processOp(new LogFileEntryDelete(keys));
257     checkPointIfNeeded();
258   }
259
260   /**
261    * Returns messages from the queue, the messages must still be deleted.
262    */

263   public synchronized JmsMessage [] getNext(int max_num) throws IOException JavaDoc
264   {
265     assertOpen();
266     int rv_size = mainQueue.size();
267     if (rv_size > max_num) {
268       rv_size = max_num;
269     }
270
271     JmsMessage [] rv = new JmsMessage[rv_size];
272     Iterator JavaDoc it = mainQueue.iterator();
273     for (int i = 0; i < rv.length; i++) {
274       rv[i] = (JmsMessage)it.next();
275     }
276
277     processOp(new LogFileEntryGetNext(max_num));
278     checkPointIfNeeded();
279     return rv;
280   }
281   
282   
283   public synchronized JmsMessage [] getPendingDelete() throws IOException JavaDoc
284   {
285     assertOpen();
286     JmsMessage [] temp = new JmsMessage[pendingDelete.size()];
287     Iterator JavaDoc it = pendingDelete.iterator();
288     int pos = 0;
289     while(it.hasNext()) {
290       temp[pos++] = (JmsMessage)it.next();
291     }
292     return temp;
293   }
294
295   /**
296    *
297    */

298   public void push(JmsMessage msg) throws IOException JavaDoc
299   {
300     push(new JmsMessage[]{msg});
301   }
302     
303   /**
304    *
305    */

306   public void delete(String JavaDoc key) throws IOException JavaDoc
307   {
308     delete(new String JavaDoc[]{key});
309   }
310     
311
312   /**
313    *
314    */

315   public JmsMessage getNext() throws IOException JavaDoc
316   {
317     JmsMessage [] temp = getNext(1);
318     if (temp.length == 0) {
319       return null;
320     }
321     else {
322       return temp[0];
323     }
324   }
325   
326
327
328   /**
329    * Gets the directory on the filesystem being used by the persistent
330    * queue. All files being used will be in this directory and will
331    * start with the prefix returned by <code>getFilePrefix()</code>
332    */

333   public File JavaDoc getDirectory()
334   {
335     return directory;
336   }
337
338
339   /**
340    * Gets the prefix that will be in front of all files used by this
341    * instance.
342    */

343   public String JavaDoc getFilePrefix()
344   {
345     return file_prefix;
346   }
347
348
349   /**
350    * Gets the maximum size of a given log file on disk for this
351    * persistent queue.
352    */

353   public int getMaxLogFileSize()
354   {
355     return max_log_file_size;
356   }
357    
358
359
360     /////////////////////////////////////////////////////////////////////////
361
// Private Methods //
362
/////////////////////////////////////////////////////////////////////////
363

364   //State transitions are as follows:
365
//FRESH_INIT: !CP && !LF && !TEMP -> CREATING_CP
366
//CREATING_CP: CP && !LF && !TEMP -> READY_CP or FRESH_INIT
367
//READY_CP: CP && LF && !TEMP -> CREATING_TEMP
368
//CREATING_TEMP: CP && LF && TEMP -> TEMP_CORRECT_1 or READY_CP
369
//TEMP_CORRECT_1: !CP && LF && TEMP -> TEMP_CORRECT_2
370
//TEMP_CORRECT_2: !CP && !LF && TEMP -> COPYING_TO_CP
371
//COPYING_TO_CP: CP && !LF && TEMP -> TEMP_CORRECT_2 or CREATING_CP
372

373   private int getState()
374   {
375     int state = 0;
376     if (checkPointFile.exists()) {
377       state |= 0x4;
378     }
379     if (logFile.exists()) {
380       state |= 0x2;
381     }
382     if (tempCheckPointFile.exists()) {
383       state |= 0x1;
384     }
385     return state;
386   }
387
388   private static final int FRESH_INIT = 0x0; //000
389
private static final int CREATING_CP = 0x4; //100
390
private static final int READY_CP = 0x6; //110
391
private static final int CREATING_TEMP = 0x7; //111
392
private static final int TEMP_CORRECT_1 = 0x3; //011
393
private static final int TEMP_CORRECT_2 = 0x1; //001
394
private static final int COPYING_TO_CP = 0x5; //101
395

396
397   private void runToSteadyState() throws IOException JavaDoc
398   {
399     boolean done = false;
400     try {
401       logger.entry("runToSteadyState");
402       int init_state = getState();
403       boolean ready_cp_done = init_state != READY_CP;
404
405       while (!done) {
406         int state = getState();
407         logger.info("PJMSI6002", new Integer JavaDoc(state));
408         
409         switch(state) {
410         case FRESH_INIT:
411           handle_FRESH_INIT();
412           break;
413         case CREATING_CP:
414           handle_CREATING_CP();
415           break;
416         case READY_CP:
417           if (!ready_cp_done) {
418             handle_READY_CP();
419             ready_cp_done=true;
420           }
421           else {
422             done = true;
423           }
424           break;
425         case CREATING_TEMP:
426           handle_CREATING_TEMP();
427           if (getState() == READY_CP) {
428             ready_cp_done = false;
429           }
430           break;
431         case TEMP_CORRECT_1:
432           handle_TEMP_CORRECT_1();
433           break;
434         case TEMP_CORRECT_2:
435           handle_TEMP_CORRECT_2();
436           break;
437         case COPYING_TO_CP:
438           handle_COPYING_TO_CP();
439           break;
440         default:
441           RuntimeException JavaDoc re = new RuntimeException JavaDoc("Invalid state:"+state);
442           logger.exception(re);
443           throw re;
444         }
445       }
446     }
447     finally {
448       logger.exit("runToSteadyState");
449     }
450   }
451
452   //*************************************************************************
453
//******************** State handlers *************************************
454
//*************************************************************************
455
private void handle_FRESH_INIT() throws IOException JavaDoc
456   {
457     //this is the first time up, so just create an empty checkPointFile
458
//to go to CREATING_CP
459
mainQueue = new LinkedList JavaDoc();
460     pendingDelete = new LinkedList JavaDoc();
461     persistentPendingDelete = new HashSet JavaDoc();
462     writeCheckPoint(checkPointFile);
463   }
464     
465   private void handle_CREATING_CP() throws IOException JavaDoc
466   {
467     try {
468       //as long as the checkpoint file is correct proceed to READY_CP
469
//by creating an empty log file
470
loadFromCheckPointFileIfNeeded(checkPointFile);
471       openLogFile();
472     }
473     catch (EOFException JavaDoc e) {
474       //here the check point file is bad, let's go back to FRESH_INIT
475
logger.exception(e);
476       if (!checkPointFile.delete()) {
477         IOException JavaDoc ioe = new IOException JavaDoc("Couldn't delete "+checkPointFile);
478         logger.exception(ioe);
479         throw ioe;
480       }
481     }
482   }
483
484   private void handle_READY_CP() throws IOException JavaDoc
485   {
486     closeLogFileIfOpen();
487     loadFromLogFileIfNeeded();
488     writeCheckPoint(tempCheckPointFile);
489   }
490
491   private void handle_CREATING_TEMP() throws IOException JavaDoc
492   {
493     try {
494       loadFromCheckPointFileIfNeeded(tempCheckPointFile);
495       //if everything's ok, proceed
496
if (!checkPointFile.delete()) {
497         throw new IOException JavaDoc("Couldn't delete file: "+checkPointFile);
498       }
499     }
500     catch (EOFException JavaDoc e) {
501       logger.exception(e);
502       //temp check point is bad, so let's delete it and regenerate it
503
if (!tempCheckPointFile.delete()) {
504         throw new IOException JavaDoc("Couldn't delete file: "+tempCheckPointFile);
505       }
506     }
507   }
508
509   private void handle_TEMP_CORRECT_1() throws IOException JavaDoc
510   {
511     loadFromCheckPointFileIfNeeded(tempCheckPointFile);
512     if (!logFile.delete()) {
513       throw new IOException JavaDoc("Couldn't delete file: "+logFile);
514     }
515   }
516
517   private void handle_TEMP_CORRECT_2() throws IOException JavaDoc
518   {
519     loadFromCheckPointFileIfNeeded(tempCheckPointFile);
520     writeCheckPoint(checkPointFile);
521   }
522
523   private void handle_COPYING_TO_CP() throws IOException JavaDoc
524   {
525     try {
526       loadFromCheckPointFileIfNeeded(checkPointFile);
527       //if everything's ok, proceed to CREATING_CP
528
if (!tempCheckPointFile.delete()) {
529         throw new IOException JavaDoc("Couldn't delete file: "+tempCheckPointFile);
530       }
531     }
532     catch (EOFException JavaDoc e) {
533       logger.exception(e);
534       //check point is bad, so let's delete it and regenerate it
535
if (!checkPointFile.delete()) {
536         throw new IOException JavaDoc("Couldn't delete file: "+checkPointFile);
537       }
538     }
539   }
540
541   //*************************************************************************
542
//******************* Private utilities ***********************************
543
//*************************************************************************
544

545
546   private void writeCheckPoint(File JavaDoc cpf) throws IOException JavaDoc
547   {
548     CheckPointFile.write(cpf,
549                          mainQueue,
550                          pendingDelete);
551   
552   }
553
554
555   private void loadFromCheckPointFileIfNeeded(File JavaDoc cpf) throws IOException JavaDoc
556   {
557     if (this.mainQueue == null ||
558         this.pendingDelete == null ||
559         this.persistentPendingDelete == null) {
560       LinkedList JavaDoc tempMain = new LinkedList JavaDoc();
561       LinkedList JavaDoc tempPend = new LinkedList JavaDoc();
562       HashSet JavaDoc tempPer = new HashSet JavaDoc();
563       CheckPointFile.read(cpf,
564                           tempMain,
565                           tempPend,
566                           tempPer);
567
568       this.mainQueue = tempMain;
569       this.pendingDelete = tempPend;
570       this.persistentPendingDelete = tempPer;
571     }
572   }
573
574
575   private void loadFromLogFileIfNeeded() throws IOException JavaDoc
576   {
577     if (mainQueue == null ||
578         pendingDelete == null ||
579         persistentPendingDelete == null) {
580       
581       LinkedList JavaDoc tempMain = new LinkedList JavaDoc();
582       LinkedList JavaDoc tempPend = new LinkedList JavaDoc();
583       HashSet JavaDoc tempPer = new HashSet JavaDoc();
584
585       CheckPointFile.read(checkPointFile,
586                           tempMain,
587                           tempPend,
588                           tempPer);
589             
590       boolean done = false;
591       FileInputStream JavaDoc fin = null;
592       BufferedInputStream JavaDoc bin = null;
593       DataInputStream JavaDoc din = null;
594
595       try {
596         fin = new FileInputStream JavaDoc(logFile);
597         bin = new BufferedInputStream JavaDoc(fin,1024);
598         din = new DataInputStream JavaDoc(bin);
599
600         this.version = din.readInt();
601
602         while(!done) {
603           try {
604             LogFileEntry ent;
605             ent = LogFileEntry.deserialize(din);
606             ent.restore(tempMain,tempPend,tempPer);
607           }
608           catch (EOFException JavaDoc e) {
609             done = true;
610           }
611         }
612       }
613       finally {
614         if (din != null) {
615           din.close();
616         }
617         if (bin != null) {
618           bin.close();
619         }
620         if (fin != null) {
621           fin.close();
622         }
623       }
624       mainQueue = tempMain;
625       pendingDelete = tempPend;
626       persistentPendingDelete = tempPer;
627
628     }
629   }
630
631
632   private void checkPointIfNeeded() throws IOException JavaDoc
633   {
634     if (logFile.length() > max_log_file_size) {
635       runToSteadyState();
636     }
637   }
638
639   private void processOp(LogFileEntry ent) throws IOException JavaDoc
640   {
641     boolean written = ent.writeAndProcess(mainQueue,
642                                           pendingDelete,
643                                           persistentPendingDelete,
644                                           log_file_dout);
645     if (written) {
646       logger.debug("sync");
647       flushLogFile();
648     }
649   }
650
651
652   private void assertOpen()
653   {
654     if (log_file_dout == null) {
655       throw new IllegalArgumentException JavaDoc("Cannot use queue which isn't open");
656     }
657   }
658
659
660   private void openLogFile() throws IOException JavaDoc
661   {
662     try {
663       log_file_fout = new FileOutputStream JavaDoc(logFile);
664       log_file_bout = new BufferedOutputStream JavaDoc(log_file_fout,1024);
665       log_file_dout = new DataOutputStream JavaDoc(log_file_bout);
666       log_file_dout.writeInt(this.version); //version
667
flushLogFile();
668     }
669     catch (IOException JavaDoc io) {
670       closeLogFileIfOpen();
671       throw io;
672     }
673     catch (RuntimeException JavaDoc r) {
674       closeLogFileIfOpen();
675       throw r;
676     }
677     
678   }
679
680   private void closeLogFileIfOpen() throws IOException JavaDoc
681   {
682     if (log_file_dout != null) {
683       log_file_dout.close();
684       log_file_dout = null;
685     }
686     if (log_file_bout != null) {
687       log_file_bout.close();
688       log_file_bout = null;
689     }
690     if (log_file_fout != null) {
691       log_file_fout.close();
692       log_file_fout = null;
693     }
694   }
695
696   private void flushLogFile() throws IOException JavaDoc
697   {
698     log_file_dout.flush();
699     log_file_fout.getFD().sync();
700   }
701     
702
703   /**
704    * Convienance method for the package.
705    */

706   static int getNumPersistent(JmsMessage [] list)
707   {
708     int num = 0;
709     for (int i = 0; i < list.length; i++) {
710       if (isMessagePersistent(list[i])) {
711         num++;
712       }
713     }
714     return num;
715   }
716
717   /**
718    * Convienance method for the package.
719    */

720   static int getNumPersistent(LinkedList JavaDoc list)
721   {
722     Iterator JavaDoc it = list.iterator();
723     int num = 0;
724     while (it.hasNext()) {
725       JmsMessage msg = (JmsMessage)it.next();
726       if (isMessagePersistent(msg))
727         num++;
728      }
729     return num;
730   }
731
732   /**
733    * Convienance method for the package.
734    */

735   static boolean isMessagePersistent(JmsMessage msg)
736   {
737     return msg.getJMSDeliveryMode() == javax.jms.DeliveryMode.PERSISTENT;
738   }
739   
740   ////////////////////////////// Misc stuff ////////////////////////////////
741
private static Logger logger =
742     LoggerFactory.getLogger(PersistentQueue.class, Resources.getBundle());
743   ///////////////////////////////////////////////////////////////////////////
744

745 }
746
Popular Tags