KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > util > change > ChangeLog


1 /*
2  * CoadunationLib: The coaduntion implementation library.
3  * Copyright (C) 2007 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * ChangeLog.java
20  */

21
22 // package path
23
package com.rift.coad.util.change;
24
25 // java imports
26
import java.io.Serializable JavaDoc;
27 import java.io.File JavaDoc;
28 import java.io.InputStream JavaDoc;
29 import java.io.IOException JavaDoc;
30 import java.io.FileInputStream JavaDoc;
31 import java.io.FileOutputStream JavaDoc;
32 import java.io.ObjectInputStream JavaDoc;
33 import java.io.ObjectOutputStream JavaDoc;
34 import java.io.ObjectStreamClass JavaDoc;
35 import java.util.ArrayList JavaDoc;
36 import java.util.HashMap JavaDoc;
37 import java.util.Iterator JavaDoc;
38 import java.util.List JavaDoc;
39 import java.util.Map JavaDoc;
40 import java.util.Queue JavaDoc;
41 import java.util.concurrent.ConcurrentHashMap JavaDoc;
42 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
43 import javax.naming.Context JavaDoc;
44 import javax.naming.InitialContext JavaDoc;
45 import javax.transaction.UserTransaction JavaDoc;
46 import javax.transaction.xa.XAException JavaDoc;
47 import javax.transaction.xa.XAResource JavaDoc;
48 import javax.transaction.xa.Xid JavaDoc;
49
50 // logging import
51
import org.apache.log4j.Logger;
52
53 // coadunation imports
54
import com.rift.coad.lib.configuration.Configuration;
55 import com.rift.coad.lib.configuration.ConfigurationException;
56 import com.rift.coad.lib.configuration.ConfigurationFactory;
57 import com.rift.coad.lib.thread.CoadunationThread;
58 import com.rift.coad.lib.thread.ThreadStateMonitor;
59 import com.rift.coad.util.transaction.TransactionManager;
60 import com.rift.coad.util.transaction.UserTransactionWrapper;
61
62 /**
63  * This object is responsible for applying changes to the database the message
64  * objects.
65  *
66  * @author Brett Chaldecott
67  */

68 public class ChangeLog implements XAResource JavaDoc {
69     
70     /**
71      * This class overrides the resolve to use the class loader on the
72      * thread to find the specified class.
73      */

74     public static class ClassLoaderObjectInputStream extends ObjectInputStream JavaDoc {
75         /**
76          * This default constructor of the class loader object input stream.
77          *
78          * @exception IOException
79          */

80         public ClassLoaderObjectInputStream() throws IOException JavaDoc {
81             super();
82         }
83         
84         
85         /**
86          * This default constructor of the class loader object input stream.
87          *
88          * @param in The input stream for this object.
89          * @exception IOException
90          */

91         public ClassLoaderObjectInputStream(InputStream JavaDoc in) throws IOException JavaDoc {
92             super(in);
93         }
94         
95         
96         /**
97          * This method returns the class definition for the requested object.
98          *
99          * @return The class definition for the requested object.
100          * @param desc The description of the object.
101          * @exception IOException
102          * @exception ClassNotFoundException
103          */

104         protected Class JavaDoc resolveClass(ObjectStreamClass JavaDoc desc) throws IOException JavaDoc,
105                 ClassNotFoundException JavaDoc {
106             try {
107                 return Class.forName(desc.getName());
108             } catch (Exception JavaDoc ex) {
109                 return Thread.currentThread().getContextClassLoader().loadClass(
110                         desc.getName());
111             }
112         }
113         
114     }
115     
116     
117     /**
118      * This object is responsible for processing entries in the change log.
119      */

120     public class ChangeLogProcessor extends CoadunationThread {
121         
122         // private member variables
123
private ThreadStateMonitor state = new ThreadStateMonitor();
124         private Context JavaDoc context = null;
125         private UserTransactionWrapper utw = null;
126         private boolean process = false;
127         
128         /**
129          * The contructor of the change log processor.
130          *
131          * @exception Exception
132          */

133         public ChangeLogProcessor() throws Exception JavaDoc {
134             try {
135                 utw = new UserTransactionWrapper();
136             } catch (Exception JavaDoc ex) {
137                 throw new ChangeException(
138                         "Failed to init the change log processor : " +
139                         ex.getMessage(),ex);
140             }
141         }
142         
143         
144         /**
145          * This method replaces the run method in the BasicThread.
146          *
147          * @exception Exception
148          */

149         public void process() throws Exception JavaDoc {
150             synchronized(this) {
151                 if (process == false) {
152                     try {
153                         wait();
154                     } catch(Exception JavaDoc ex) {
155                         log.error("Wait threw and exception : " +
156                                 ex.getMessage(),ex);
157                     }
158                 }
159             }
160             while(!state.isTerminated()) {
161                 ChangeEntry change = null;
162                 synchronized(changes) {
163                     change = (ChangeEntry)changes.poll();
164                     if (change == null) {
165                         try {
166                             changes.wait(500);
167                         } catch (Exception JavaDoc ex) {
168                             log.error("Failed to wait : " + ex.getMessage(),ex);
169                         }
170                         continue;
171                     }
172                 }
173                 while(true) {
174                     try {
175                         utw.begin();
176                         change.applyChanges();
177                         utw.commit();
178                         break;
179                     } catch (Exception JavaDoc ex) {
180                         log.error("Failed to apply the changes : " +
181                                 ex.getMessage(),ex);
182                     } finally {
183                         utw.release();
184                     }
185                     try {
186                         Thread.sleep(1000);
187                     } catch(Exception JavaDoc ex2) {
188                         log.error("Failed to back off : " +
189                                 ex2.getMessage(),ex2);
190                     }
191                 }
192             }
193         }
194         
195         
196         /**
197          * This method will be implemented by child objects to terminate the
198          * processing of this thread.
199          */

200         public void terminate() {
201             state.terminate(true);
202             synchronized(this) {
203                 notify();
204             }
205         }
206         
207         
208         /**
209          * This method starts the processing
210          */

211         public synchronized void startProcessing() {
212             process = true;
213             notify();
214         }
215     }
216     
217     /**
218      * This object tracks the changes to do with a transaction.
219      */

220     public static class ChangeEntry implements Serializable JavaDoc {
221         
222         // private member variables
223
private List JavaDoc changes = new ArrayList JavaDoc();
224         
225         
226         /**
227          * The constructor of
228          */

229         public ChangeEntry() {
230             
231         }
232         
233         
234         /**
235          * This method adds a change to the list of changes for this change
236          * change entry.
237          *
238          * @param change The object representing the change object.
239          */

240         public void addChange(Change change) {
241             changes.add(change);
242         }
243         
244         
245         /**
246          * This method applys the list of changes.
247          *
248          * @exception ChangeException
249          */

250         public void applyChanges() throws ChangeException {
251             for (Iterator JavaDoc iter = changes.iterator(); iter.hasNext();) {
252                 Change change = (Change)iter.next();
253                 change.applyChanges();
254             }
255         }
256     }
257     
258     // class constants
259
private final static String JavaDoc USERNAME = "changelog_username";
260     private final static String JavaDoc DATA_DIR = "changelog_data_dir";
261     private final static String JavaDoc DATA_FILE = "changelog.dmp";
262     
263     // the logger reference
264
protected Logger log =
265             Logger.getLogger(ChangeLog.class.getName());
266     
267     // class singleton
268
private static Map JavaDoc singletons = new HashMap JavaDoc();
269     
270     // class member variables
271
private ThreadStateMonitor state = new ThreadStateMonitor();
272     private Map JavaDoc changesMap = new ConcurrentHashMap JavaDoc();
273     private ThreadLocal JavaDoc currentChange = new ThreadLocal JavaDoc();
274     private ChangeLogProcessor processor = null;
275     private Queue JavaDoc changes = new ConcurrentLinkedQueue JavaDoc();
276     private String JavaDoc dataDirectory = null;
277     private UserTransactionWrapper utw = null;
278     
279     
280     /**
281      * Creates a new instance of MessageChangeLog
282      *
283      * @param username The name of the user that this object will run as.
284      */

285     private ChangeLog(Class JavaDoc configInfo) throws ChangeException {
286         try {
287             utw = new UserTransactionWrapper();
288             Configuration configuration = ConfigurationFactory.getInstance().
289                     getConfig(configInfo);
290             dataDirectory = configuration.getString(DATA_DIR);
291             loadData();
292             applyChanges();
293             processor = new ChangeLogProcessor();
294             processor.start(configuration.getString(USERNAME));
295         } catch (Exception JavaDoc ex) {
296             log.error("Failed to instanciate the change " +
297                     "log object : " + ex.getMessage(),ex);
298             throw new ChangeException("Failed to instanciate the change " +
299                     "log object : " + ex.getMessage(),ex);
300         }
301     }
302     
303     
304     /**
305      * This method is responsible for instanciating the MessageChangeLog.
306      *
307      * @param username The name of the user to this object will run as.
308      * @exception ChangeException
309      */

310     public synchronized static void init(Class JavaDoc configInfo) throws
311             ChangeException {
312         synchronized (singletons) {
313             ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
314             if (!singletons.containsKey(loader)) {
315                 ChangeLog changeLog = new ChangeLog(configInfo);
316                 singletons.put(loader,changeLog);
317             }
318         }
319     }
320     
321     
322     /**
323      * This method returns a reference to the singelton instance.
324      *
325      * @return A reference to the singleton instance.
326      * @throws ChangeException
327      */

328     public static ChangeLog getInstance() throws
329             ChangeException {
330         synchronized(singletons) {
331             ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
332             ChangeLog changeLog = (ChangeLog)singletons.get(loader);
333             if (changeLog == null) {
334                 throw new ChangeException(
335                         "The change log has not been instanciated.");
336             }
337             return changeLog;
338         }
339     }
340     
341     
342     /**
343      * The singleton method used to terminate the change log.
344      */

345     public static void terminate() throws
346             ChangeException {
347         ChangeLog changeLog = null;
348         synchronized(singletons) {
349             ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
350             changeLog = (ChangeLog)singletons.get(loader);
351             if (changeLog == null) {
352                 throw new ChangeException(
353                         "The change log has not been instanciated.");
354             }
355             singletons.remove(loader);
356         }
357         changeLog.terminateChangeLog();
358         
359     }
360     
361     /**
362      * This method is called to terminate the change log.
363      */

364     protected void terminateChangeLog() {
365         try {
366             state.terminate(false);
367             processor.terminate();
368             processor.join();
369             storeData();
370         } catch (Exception JavaDoc ex) {
371             log.error("Failed to terminate the change log object : " +
372                     ex.getMessage(),ex);
373         }
374     }
375     
376     
377     /**
378      * This method starts the change log processing the changes.
379      */

380     public void start() throws ChangeException {
381         processor.startProcessing();
382     }
383     
384     
385     /**
386      * This method adds an object to the list of changes.
387      *
388      * @param change The object containing the changes to apply.
389      * @exception ChangeException
390      */

391     public void addChange(Change change) throws ChangeException {
392         if (state.isTerminated()) {
393             log.error("The change log has been terminated cannot accept " +
394                     "anymore changes.");
395             throw new ChangeException(
396                     "The change log has been terminated cannot accept " +
397                     "anymore changes.");
398         }
399         try {
400             TransactionManager.getInstance().bindResource(this,false);
401             ChangeEntry changeEntry = (ChangeEntry)currentChange.get();
402             changeEntry.addChange(change);
403         } catch (Exception JavaDoc ex) {
404             log.error("Failed to add the change to the list :"
405                     + ex.getMessage(),ex);
406             throw new ChangeException("Failed to add the change to the list :"
407                     + ex.getMessage(),ex);
408         }
409     }
410     
411     
412     /**
413      * This method is called to commit the specified transaction.
414      *
415      * @param xid The id of the transaction to commit.
416      * @param onePhase If true a one phase commit should be used.
417      * @exception XAException
418      */

419     public void commit(Xid JavaDoc xid, boolean b) throws XAException JavaDoc {
420         synchronized (changes) {
421             changes.add(changesMap.remove(xid));
422             changes.notify();
423         }
424     }
425     
426     
427     /**
428      * The resource manager has dissociated this object from the transaction.
429      *
430      * @param xid The id of the transaction that is getting ended.
431      * @param flags The flags associated with this operation.
432      * @exception XAException
433      */

434     public void end(Xid JavaDoc xid, int i) throws XAException JavaDoc {
435     }
436     
437     
438     /**
439      * The transaction has been completed and must be forgotten.
440      *
441      * @param xid The id of the transaction to forget.
442      * @exception XAException
443      */

444     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
445         changesMap.remove(xid);
446     }
447     
448     
449     /**
450      * This method returns the transaction timeout for this object.
451      *
452      * @return The int containing the transaction timeout.
453      * @exception XAException
454      */

455     public int getTransactionTimeout() throws XAException JavaDoc {
456         return -1;
457     }
458     
459     
460     /**
461      * This method returns true if this object is the resource manager getting
462      * queried.
463      *
464      * @return TRUE if this is the resource manager, FALSE if not.
465      * @param xaResource The resource to perform the check against.
466      * @exception XAException
467      */

468     public boolean isSameRM(XAResource JavaDoc xAResource) throws XAException JavaDoc {
469         return this == xAResource;
470     }
471     
472     
473     /**
474      * This is called before a transaction is committed.
475      *
476      * @return The results of the transaction.
477      * @param xid The id of the transaction to check against.
478      * @exception XAException
479      */

480     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
481         return XAResource.XA_OK;
482     }
483     
484     
485     /**
486      * This method returns the list of transaction branches for this resource
487      * manager.
488      *
489      * @return The list of resource branches.
490      * @param flags The flags
491      * @exception XAException
492      */

493     public Xid JavaDoc[] recover(int i) throws XAException JavaDoc {
494         return null;
495     }
496     
497     
498     /**
499      * This method is called to roll back the specified transaction.
500      *
501      * @param xid The id of the transaction to roll back.
502      * @exception XAException
503      */

504     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
505         changesMap.remove(xid);
506     }
507     
508     
509     /**
510      * This method sets the transaction timeout for this resource manager.
511      *
512      * @return TRUE if the transaction timeout can be set successfully.
513      * @param transactionTimeout The new transaction timeout value.
514      * @exception XAException
515      */

516     public boolean setTransactionTimeout(int i) throws XAException JavaDoc {
517         return true;
518     }
519     
520     
521     /**
522      * This method is called to start a transaction on a resource manager.
523      *
524      * @param xid The id of the new transaction.
525      * @param flags The flags associated with the transaction.
526      * @exception XAException
527      */

528     public void start(Xid JavaDoc xid, int i) throws XAException JavaDoc {
529         if (changesMap.containsKey(xid)) {
530             currentChange.set(changesMap.get(xid));
531         } else {
532             ChangeEntry changeEntry = new ChangeEntry();
533             changesMap.put(xid,changeEntry);
534             currentChange.set(changeEntry);
535         }
536     }
537     
538     
539     /**
540      * This method loads the data
541      */

542     private void loadData() throws ChangeException {
543         try {
544             File JavaDoc dataFile = new File JavaDoc(dataDirectory,DATA_FILE);
545             if (!dataFile.exists()) {
546                 return;
547             }
548             FileInputStream JavaDoc in = new FileInputStream JavaDoc(dataFile);
549             ClassLoaderObjectInputStream ois = new
550                     ClassLoaderObjectInputStream(in);
551             changes = (ConcurrentLinkedQueue JavaDoc)ois.readObject();
552             ois.close();
553             in.close();
554         } catch (Exception JavaDoc ex) {
555             log.error("Failed to load the data : " +
556                     ex.getMessage(),ex);
557             throw new ChangeException("Failed to load the data : " +
558                     ex.getMessage(),ex);
559         }
560     }
561     
562     
563     /**
564      * This method stores the data
565      */

566     private void storeData() throws ChangeException {
567         try {
568             File JavaDoc dataFile = new File JavaDoc(dataDirectory,DATA_FILE);
569             if (changes.size() == 0) {
570                 // no data to delete the file if one exists
571
if (dataFile.exists()) {
572                     dataFile.delete();
573                 }
574                 return;
575             }
576             FileOutputStream JavaDoc out = new FileOutputStream JavaDoc(dataFile);
577             ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(out);
578             oos.writeObject(changes);
579             oos.close();
580             out.close();
581         } catch (Exception JavaDoc ex) {
582             log.error("Failed to store the data : " +
583                     ex.getMessage(),ex);
584             throw new ChangeException("Failed to store the data : " +
585                     ex.getMessage(),ex);
586         }
587     }
588     
589     
590     /**
591      * This method is responsible for applying all the changes.
592      */

593     private void applyChanges() throws ChangeException {
594         log.info("Applying changes from change log");
595         while(changes.size() > 0) {
596             ChangeEntry change = (ChangeEntry)changes.poll();
597             while(true) {
598                 try {
599                     utw.begin();
600                     change.applyChanges();
601                     utw.commit();
602                     break;
603                 } catch (Exception JavaDoc ex) {
604                     log.error("Failed to apply the changes : " +
605                             ex.getMessage(),ex);
606                 } finally {
607                     utw.release();
608                 }
609                 try {
610                     Thread.sleep(1000);
611                 } catch(Exception JavaDoc ex2) {
612                     log.error("Failed to back off : " +
613                             ex2.getMessage(),ex2);
614                 }
615             }
616         }
617         log.info("After applying changes from change log");
618     }
619 }
620
Popular Tags