KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > util > DBTransaction


1 /*
2  * Copyright (C) 2006 ScalAgent Distributed Technologies
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA.
18  *
19  * Initial developer(s): ScalAgent Distributed Technologies
20  * Contributor(s):
21  */

22 package fr.dyade.aaa.util;
23
24 import java.io.*;
25 import java.util.*;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.DriverManager JavaDoc;
29 import java.sql.Statement JavaDoc;
30 import java.sql.PreparedStatement JavaDoc;
31 import java.sql.ResultSet JavaDoc;
32 import java.sql.SQLException JavaDoc;
33
34 import org.objectweb.util.monolog.api.BasicLevel;
35 import org.objectweb.util.monolog.api.Logger;
36
37 import fr.dyade.aaa.agent.Debug;
38
39 public final class DBTransaction implements Transaction, DBTransactionMBean {
40   // Logging monitor
41
private static Logger logmon = null;
42
43   File dir = null;
44
45   /** Log context associated with each Thread using DBTransaction. */
46   private class Context {
47     Hashtable log = null;
48     ByteArrayOutputStream bos = null;
49     ObjectOutputStream oos = null;
50
51     Context() {
52       log = new Hashtable(15);
53       bos = new ByteArrayOutputStream(256);
54     }
55   }
56
57   /**
58    * ThreadLocal variable used to get the log to associate state with each
59    * thread. The log contains all operations do by the current thread since
60    * the last <code>commit</code>. On commit, its content is added to current
61    * log (memory + disk), then it is freed.
62    */

63   private ThreadLocal JavaDoc perThreadContext = null;
64
65   /**
66    * Number of pooled operation, by default 1000.
67    * This value can be adjusted for a particular server by setting
68    * <code>DBLogThresholdOperation</code> specific property.
69    * <p>
70    * These property can be fixed either from <code>java</code> launching
71    * command, or in <code>a3servers.xml</code> configuration file.
72    */

73   static int LogThresholdOperation = 1000;
74
75   /**
76    * Returns the pool size for <code>operation</code> objects, by default 1000.
77    *
78    * @return The pool size for <code>operation</code> objects.
79    */

80   public int getLogThresholdOperation() {
81     return LogThresholdOperation;
82   }
83
84   long startTime = 0L;
85
86   /**
87    * Returns the starting time.
88    *
89    * @return The starting time.
90    */

91   public long getStartTime() {
92     return startTime;
93   }
94
95   String JavaDoc driver = "org.apache.derby.jdbc.EmbeddedDriver";
96   String JavaDoc connurl = "jdbc:derby:";
97
98   Connection JavaDoc conn = null;
99
100   PreparedStatement JavaDoc insertStmt = null;
101   PreparedStatement JavaDoc updateStmt = null;
102   PreparedStatement JavaDoc deleteStmt = null;
103
104   public DBTransaction() {}
105
106   public void init(String JavaDoc path) throws IOException {
107     phase = INIT;
108
109     logmon = Debug.getLogger(Debug.A3Debug + ".Transaction");
110     if (logmon.isLoggable(BasicLevel.INFO))
111       logmon.log(BasicLevel.INFO, "DBTransaction, init()");
112
113     dir = new File(path);
114     if (!dir.exists()) dir.mkdir();
115     if (!dir.isDirectory())
116       throw new FileNotFoundException(path + " is not a directory.");
117
118     // Saves the transaction classname in order to prevent use of a
119
// different one after restart (see AgentServer.init).
120
DataOutputStream ldos = null;
121     try {
122       File tfc = new File(dir, "TFC");
123       if (! tfc.exists()) {
124         ldos = new DataOutputStream(new FileOutputStream(tfc));
125         ldos.writeUTF(getClass().getName());
126         ldos.flush();
127       }
128     } finally {
129       if (ldos != null) ldos.close();
130     }
131
132     try {
133       Class.forName(driver).newInstance();
134
135       Properties props = new Properties();
136       props.put("user", "user1");
137       props.put("password", "user1");
138
139       conn = DriverManager.getConnection(connurl + new File(dir, "JoramDB").getPath() + ";create=true", props);
140       conn.setAutoCommit(false);
141 // break;
142
} catch (IllegalAccessException JavaDoc exc) {
143         throw new IOException(exc.getMessage());
144       } catch (ClassNotFoundException JavaDoc exc) {
145         throw new IOException(exc.getMessage());
146       } catch (InstantiationException JavaDoc exc) {
147         throw new IOException(exc.getMessage());
148       } catch (SQLException JavaDoc sqle) {
149         throw new IOException(sqle.getMessage());
150     }
151
152     try {
153         // Creating a statement lets us issue commands against the connection.
154
Statement JavaDoc s = conn.createStatement();
155         // We create the table.
156
s.execute("CREATE TABLE JoramDB (name VARCHAR(256), content LONG VARCHAR FOR BIT DATA, PRIMARY KEY(name))");
157         s.close();
158         conn.commit();
159     } catch (SQLException JavaDoc sqle) {
160       if (logmon.isLoggable(BasicLevel.INFO))
161         logmon.log(BasicLevel.INFO, "DBTransaction, init() DB already exists");
162     }
163
164     try {
165       insertStmt = conn.prepareStatement("INSERT INTO JoramDB VALUES (?, ?)");
166       updateStmt = conn.prepareStatement("UPDATE JoramDB SET content=? WHERE name=?");
167       deleteStmt = conn.prepareStatement("DELETE FROM JoramDB WHERE name=?");
168     } catch (SQLException JavaDoc sqle) {
169       sqle.printStackTrace();
170       throw new IOException(sqle.getMessage());
171     }
172
173     perThreadContext = new ThreadLocal JavaDoc() {
174         protected synchronized Object JavaDoc initialValue() {
175           return new Context();
176         }
177       };
178
179     startTime = System.currentTimeMillis();
180
181     if (logmon.isLoggable(BasicLevel.INFO))
182       logmon.log(BasicLevel.INFO, "DBTransaction, initialized " + startTime);
183
184     /* The Transaction subsystem is ready */
185     setPhase(FREE);
186   }
187
188   public final File getDir() {
189     return dir;
190   }
191
192   /**
193    * Returns the path of persistence directory.
194    *
195    * @return The path of persistence directory.
196    */

197   public String JavaDoc getPersistenceDir() {
198     return dir.getPath();
199   }
200
201   // State of the transaction monitor.
202
private int phase = INIT;
203   String JavaDoc phaseInfo = PhaseInfo[phase];
204
205   /**
206    *
207    */

208   public int getPhase() {
209     return phase;
210   }
211
212   public String JavaDoc getPhaseInfo() {
213     return phaseInfo;
214   }
215
216   private final void setPhase(int newPhase) {
217     phase = newPhase;
218     phaseInfo = PhaseInfo[phase];
219   }
220
221   public final synchronized void begin() throws IOException {
222     while (phase != FREE) {
223       try {
224     wait();
225       } catch (InterruptedException JavaDoc exc) {
226       }
227     }
228     // Change the transaction state.
229
setPhase(RUN);
230   }
231
232   /**
233    * Returns an array of strings naming the persistent objects denoted by
234    * a name that satisfy the specified prefix. Each string is an object name.
235    *
236    * @param prefix the prefix
237    * @return An array of strings naming the persistent objects
238    * denoted by a name that satisfy the specified prefix. The
239    * array will be empty if no names match.
240    */

241   public final synchronized String JavaDoc[] getList(String JavaDoc prefix) {
242     try {
243       // Creating a statement lets us issue commands against the connection.
244
Statement JavaDoc s = conn.createStatement();
245       ResultSet JavaDoc rs = s.executeQuery("SELECT name FROM JoramDB WHERE name LIKE '" + prefix + "%'");
246
247       Vector v = new Vector();
248       while (rs.next()) {
249         v.add(rs.getString(1));
250       }
251       rs.close();
252       s.close();
253
254       String JavaDoc[] result = new String JavaDoc[v.size()];
255       result = (String JavaDoc[]) v.toArray(result);
256
257       if (logmon.isLoggable(BasicLevel.DEBUG))
258         logmon.log(BasicLevel.DEBUG, "DBTransaction, getList: " + v);
259
260       return result;
261     } catch (SQLException JavaDoc sqle) {
262       // TODO: AF
263
}
264     return null;
265   }
266
267   /**
268    * Tests if the Transaction component is persistent.
269    *
270    * @return true.
271    */

272   public boolean isPersistent() {
273     return true;
274   }
275
276   final String JavaDoc fname(String JavaDoc dirName, String JavaDoc name) {
277     if (dirName == null) {
278       return name;
279     } else {
280       return new StringBuffer JavaDoc(dirName).append('/').append(name).toString();
281     }
282   }
283
284   static private final byte[] OOS_STREAM_HEADER = {
285     (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF),
286     (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF),
287     (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF),
288     (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF)
289   };
290
291   public void save(Serializable obj,
292                    String JavaDoc dirName, String JavaDoc name) throws IOException {
293     save(obj, fname(dirName, name));
294   }
295
296   public void save(Serializable obj, String JavaDoc name) throws IOException{
297     if (logmon.isLoggable(BasicLevel.DEBUG))
298       logmon.log(BasicLevel.DEBUG, "DBTransaction, save(" + name + ")");
299
300     Context ctx = (Context) perThreadContext.get();
301     if (ctx.oos == null) {
302       ctx.bos.reset();
303       ctx.oos = new ObjectOutputStream(ctx.bos);
304     } else {
305       ctx.oos.reset();
306       ctx.bos.reset();
307       ctx.bos.write(OOS_STREAM_HEADER, 0, 4);
308     }
309     ctx.oos.writeObject(obj);
310     ctx.oos.flush();
311
312     saveInLog(ctx.bos.toByteArray(), name, ctx.log, false);
313   }
314
315   public void saveByteArray(byte[] buf,
316                             String JavaDoc dirName, String JavaDoc name) throws IOException {
317     save(buf, fname(dirName, name));
318   }
319
320   public void saveByteArray(byte[] buf, String JavaDoc name) throws IOException{
321     if (logmon.isLoggable(BasicLevel.DEBUG))
322       logmon.log(BasicLevel.DEBUG, "DBTransaction, saveByteArray(" + name + ")");
323
324     Context ctx = (Context) perThreadContext.get();
325     saveInLog(buf, name, ((Context) perThreadContext.get()).log, true);
326   }
327
328   private final void saveInLog(byte[] buf,
329                                String JavaDoc name,
330                                Hashtable log,
331                                boolean copy) throws IOException {
332     DBOperation op = DBOperation.alloc(DBOperation.SAVE, name, buf);
333     DBOperation old = (DBOperation) log.put(name, op);
334     if (copy) {
335       if ((old != null) &&
336           (old.type == DBOperation.SAVE) &&
337           (old.value.length == buf.length)) {
338         // reuse old buffer
339
op.value = old.value;
340       } else {
341         // alloc a new one
342
op.value = new byte[buf.length];
343       }
344       System.arraycopy(buf, 0, op.value, 0, buf.length);
345     }
346     if (old != null) old.free();
347   }
348
349   public Object JavaDoc load(String JavaDoc dirName, String JavaDoc name) throws IOException, ClassNotFoundException JavaDoc {
350     return load(fname(dirName, name));
351   }
352
353   public Object JavaDoc load(String JavaDoc name) throws IOException, ClassNotFoundException JavaDoc {
354     byte[] buf = loadByteArray(name);
355     if (buf != null) {
356       ByteArrayInputStream bis = new ByteArrayInputStream(buf);
357       ObjectInputStream ois = new ObjectInputStream(bis);
358       return ois.readObject();
359     }
360     return null;
361   }
362
363   public byte[] loadByteArray(String JavaDoc dirName, String JavaDoc name) throws IOException {
364     return loadByteArray(fname(dirName, name));
365   }
366
367   public synchronized byte[] loadByteArray(String JavaDoc name) throws IOException {
368     if (logmon.isLoggable(BasicLevel.DEBUG))
369       logmon.log(BasicLevel.DEBUG, "DBTransaction, loadByteArray(" + name + ")");
370
371     // Searchs in the log a new value for the object.
372
Hashtable log = ((Context) perThreadContext.get()).log;
373     DBOperation op = (DBOperation) log.get(name);
374     if (op != null) {
375       if (op.type == DBOperation.SAVE) {
376     return op.value;
377       } else if (op.type == DBOperation.DELETE) {
378     // The object was deleted.
379
return null;
380       }
381     }
382
383     try {
384       // Creating a statement lets us issue commands against the connection.
385
Statement JavaDoc s = conn.createStatement();
386       //
387
ResultSet JavaDoc rs = s.executeQuery("SELECT content FROM JoramDB WHERE name='" + name + "'");
388
389       if (!rs.next()) return null;
390
391        byte[] content = rs.getBytes(1);
392
393        rs.close();
394        s.close();
395
396        return content;
397     } catch (SQLException JavaDoc sqle) {
398       throw new IOException(sqle.getMessage());
399     }
400   }
401
402   public void delete(String JavaDoc dirName, String JavaDoc name) {
403     delete(fname(dirName, name));
404   }
405
406   public void delete(String JavaDoc name) {
407     if (logmon.isLoggable(BasicLevel.DEBUG))
408       logmon.log(BasicLevel.DEBUG,
409                  "DBTransaction, delete(" + name + ")");
410
411     Hashtable log = ((Context) perThreadContext.get()).log;
412     DBOperation op = DBOperation.alloc(DBOperation.DELETE, name);
413     op = (DBOperation) log.put(name, op);
414     if (op != null) op.free();
415   }
416
417   public final synchronized void commit() throws IOException {
418     if (phase != RUN)
419       throw new IllegalStateException JavaDoc("Can not commit.");
420
421     if (logmon.isLoggable(BasicLevel.DEBUG))
422       logmon.log(BasicLevel.DEBUG, "DBTransaction, commit");
423     
424     Hashtable log = ((Context) perThreadContext.get()).log;
425     if (! log.isEmpty()) {
426       DBOperation op = null;
427       for (Enumeration e = log.elements(); e.hasMoreElements(); ) {
428         op = (DBOperation) e.nextElement();
429         if (op.type == DBOperation.SAVE) {
430           if (logmon.isLoggable(BasicLevel.DEBUG))
431             logmon.log(BasicLevel.DEBUG,
432                        "DBTransaction, commit.save (" + op.name + ')');
433
434           try {
435             insertStmt.setString(1, op.name);
436             insertStmt.setBytes(2, op.value);
437             insertStmt.executeUpdate();
438           } catch (SQLException JavaDoc sqle1) {
439             try {
440               updateStmt.setBytes(1, op.value);
441               updateStmt.setString(2, op.name);
442               updateStmt.executeUpdate();
443             } catch (SQLException JavaDoc sqle) {
444               throw new IOException(sqle.getMessage());
445             }
446           }
447         } else if (op.type == DBOperation.DELETE) {
448           if (logmon.isLoggable(BasicLevel.DEBUG))
449             logmon.log(BasicLevel.DEBUG,
450                        "DBTransaction, commit.delete (" + op.name + ')');
451
452           try {
453             deleteStmt.setString(1, op.name);
454             deleteStmt.executeUpdate();
455           } catch (SQLException JavaDoc sqle) {
456             throw new IOException(sqle.getMessage());
457           }
458         }
459         op.free();
460       }
461       log.clear();
462
463       try {
464         conn.commit();
465       } catch (SQLException JavaDoc sqle) {
466         throw new IOException(sqle.getMessage());
467       }
468     }
469
470     if (logmon.isLoggable(BasicLevel.DEBUG))
471       logmon.log(BasicLevel.DEBUG, "DBTransaction, committed");
472
473     setPhase(COMMIT);
474   }
475
476   public final synchronized void rollback() throws IOException {
477     if (phase != RUN)
478       throw new IllegalStateException JavaDoc("Can not rollback.");
479
480     if (logmon.isLoggable(BasicLevel.DEBUG))
481       logmon.log(BasicLevel.DEBUG, "DBTransaction, rollback");
482
483     setPhase(ROLLBACK);
484     ((Context) perThreadContext.get()).log.clear();
485   }
486
487   public final synchronized void release() throws IOException {
488     if ((phase != RUN) && (phase != COMMIT) && (phase != ROLLBACK))
489       throw new IllegalStateException JavaDoc("Can not release transaction.");
490
491     // Change the transaction state.
492
setPhase(FREE);
493     // wake-up an eventually user's thread in begin
494
notify();
495   }
496
497   /**
498    * Stops the transaction module.
499    * It waits all transactions termination, then the module is kept
500    * in a FREE 'ready to use' state.
501    */

502   public synchronized void stop() {
503     if (logmon.isLoggable(BasicLevel.INFO))
504       logmon.log(BasicLevel.INFO, "DBTransaction, stops");
505
506     while (phase != FREE) {
507       // Wait for the transaction subsystem to be free
508
try {
509         wait();
510       } catch (InterruptedException JavaDoc exc) {
511       }
512     }
513     setPhase(FINALIZE);
514
515 // server.shutdown();
516
try {
517 // org.hsqldb.DatabaseManager.closeDatabases(0);
518
// Creating a statement lets us issue commands against the connection.
519
Statement JavaDoc s = conn.createStatement();
520       // .
521
// s.execute("CHECKPOINT DEFRAG");
522
conn.commit();
523     logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#3");
524       s.executeUpdate("SHUTDOWN COMPACT");
525     logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#4");
526       s.close();
527     logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#5");
528     } catch (SQLException JavaDoc sqle) {
529 // AF: TODO
530
// throw new IOException(sqle.getMessage());
531
logmon.log(BasicLevel.ERROR, "DBTransaction, stop#6", sqle);
532     } catch (Throwable JavaDoc t) {
533       logmon.log(BasicLevel.ERROR, "DBTransaction, stop#7", t);
534     } finally {
535       logmon.log(BasicLevel.INFO, "DBTransaction, stop#8");
536     }
537     logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#9");
538     setPhase(FREE);
539
540     if (logmon.isLoggable(BasicLevel.INFO)) {
541       logmon.log(BasicLevel.INFO, "NTransaction, stopped: ");
542     }
543   }
544
545   /**
546    * Close the transaction module.
547    * It waits all transactions termination, the module will be initialized
548    * anew before reusing it.
549    */

550   public synchronized void close() {
551     if (logmon.isLoggable(BasicLevel.INFO))
552       logmon.log(BasicLevel.INFO, "DBTransaction, close");
553
554     if (phase == INIT) return;
555
556     while (phase != FREE) {
557       // Wait for the transaction subsystem to be free
558
try {
559         wait();
560       } catch (InterruptedException JavaDoc exc) {
561       }
562     }
563
564     setPhase(FINALIZE);
565     try {
566       // Creating a statement lets us issue commands against the connection.
567
Statement JavaDoc s = conn.createStatement();
568       // .
569
s.execute("SHUTDOWN COMPACT");
570       s.close();
571     } catch (SQLException JavaDoc sqle) {
572 // AF: TODO
573
// throw new IOException(sqle.getMessage());
574
logmon.log(BasicLevel.ERROR, "DBTransaction, close", sqle);
575     }
576     setPhase(INIT);
577
578     if (logmon.isLoggable(BasicLevel.INFO)) {
579       logmon.log(BasicLevel.INFO, "DBTransaction, closed: ");
580     }
581   }
582 }
583
584 final class DBOperation implements Serializable {
585   static final int SAVE = 1;
586   static final int DELETE = 2;
587   static final int COMMIT = 3;
588   static final int END = 127;
589  
590   int type;
591   String JavaDoc name;
592   byte[] value;
593
594   private DBOperation(int type, String JavaDoc name, byte[] value) {
595     this.type = type;
596     this.name = name;
597     this.value = value;
598   }
599
600   /**
601    * Returns a string representation for this object.
602    *
603    * @return A string representation of this object.
604    */

605   public String JavaDoc toString() {
606     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
607
608     strbuf.append('(').append(super.toString());
609     strbuf.append(",type=").append(type);
610     strbuf.append(",name=").append(name);
611     strbuf.append(')');
612     
613     return strbuf.toString();
614   }
615
616   private static Pool pool = null;
617
618   static {
619     pool = new Pool("DBTransaction$Operation",
620                     Integer.getInteger("DBLogThresholdOperation",
621                                        DBTransaction.LogThresholdOperation).intValue());
622   }
623
624   static DBOperation alloc(int type, String JavaDoc name) {
625     return alloc(type, name, null);
626   }
627
628   static DBOperation alloc(int type, String JavaDoc name, byte[] value) {
629     DBOperation op = null;
630     
631     try {
632       op = (DBOperation) pool.allocElement();
633     } catch (Exception JavaDoc exc) {
634       return new DBOperation(type, name, value);
635     }
636     op.type = type;
637     op.name = name;
638     op.value = value;
639
640     return op;
641   }
642
643   void free() {
644     /* to let gc do its work */
645     name = null;
646     value = null;
647     pool.freeElement(this);
648   }
649 }
650
Popular Tags