1 22 package fr.dyade.aaa.util; 23 24 import java.io.*; 25 import java.util.*; 26 27 import java.sql.Connection ; 28 import java.sql.DriverManager ; 29 import java.sql.Statement ; 30 import java.sql.PreparedStatement ; 31 import java.sql.ResultSet ; 32 import java.sql.SQLException ; 33 34 import org.objectweb.util.monolog.api.BasicLevel; 35 import org.objectweb.util.monolog.api.Logger; 36 37 import fr.dyade.aaa.agent.Debug; 38 39 public final class DBTransaction implements Transaction, DBTransactionMBean { 40 private static Logger logmon = null; 42 43 File dir = null; 44 45 46 private class Context { 47 Hashtable log = null; 48 ByteArrayOutputStream bos = null; 49 ObjectOutputStream oos = null; 50 51 Context() { 52 log = new Hashtable(15); 53 bos = new ByteArrayOutputStream(256); 54 } 55 } 56 57 63 private ThreadLocal perThreadContext = null; 64 65 73 static int LogThresholdOperation = 1000; 74 75 80 public int getLogThresholdOperation() { 81 return LogThresholdOperation; 82 } 83 84 long startTime = 0L; 85 86 91 public long getStartTime() { 92 return startTime; 93 } 94 95 String driver = "org.apache.derby.jdbc.EmbeddedDriver"; 96 String connurl = "jdbc:derby:"; 97 98 Connection conn = null; 99 100 PreparedStatement insertStmt = null; 101 PreparedStatement updateStmt = null; 102 PreparedStatement deleteStmt = null; 103 104 public DBTransaction() {} 105 106 public void init(String path) throws IOException { 107 phase = INIT; 108 109 logmon = Debug.getLogger(Debug.A3Debug + ".Transaction"); 110 if (logmon.isLoggable(BasicLevel.INFO)) 111 logmon.log(BasicLevel.INFO, "DBTransaction, init()"); 112 113 dir = new File(path); 114 if (!dir.exists()) dir.mkdir(); 115 if (!dir.isDirectory()) 116 throw new FileNotFoundException(path + " is not a directory."); 117 118 DataOutputStream ldos = null; 121 try { 122 File tfc = new File(dir, "TFC"); 123 if (! tfc.exists()) { 124 ldos = new DataOutputStream(new FileOutputStream(tfc)); 125 ldos.writeUTF(getClass().getName()); 126 ldos.flush(); 127 } 128 } finally { 129 if (ldos != null) ldos.close(); 130 } 131 132 try { 133 Class.forName(driver).newInstance(); 134 135 Properties props = new Properties(); 136 props.put("user", "user1"); 137 props.put("password", "user1"); 138 139 conn = DriverManager.getConnection(connurl + new File(dir, "JoramDB").getPath() + ";create=true", props); 140 conn.setAutoCommit(false); 141 } catch (IllegalAccessException exc) { 143 throw new IOException(exc.getMessage()); 144 } catch (ClassNotFoundException exc) { 145 throw new IOException(exc.getMessage()); 146 } catch (InstantiationException exc) { 147 throw new IOException(exc.getMessage()); 148 } catch (SQLException sqle) { 149 throw new IOException(sqle.getMessage()); 150 } 151 152 try { 153 Statement s = conn.createStatement(); 155 s.execute("CREATE TABLE JoramDB (name VARCHAR(256), content LONG VARCHAR FOR BIT DATA, PRIMARY KEY(name))"); 157 s.close(); 158 conn.commit(); 159 } catch (SQLException sqle) { 160 if (logmon.isLoggable(BasicLevel.INFO)) 161 logmon.log(BasicLevel.INFO, "DBTransaction, init() DB already exists"); 162 } 163 164 try { 165 insertStmt = conn.prepareStatement("INSERT INTO JoramDB VALUES (?, ?)"); 166 updateStmt = conn.prepareStatement("UPDATE JoramDB SET content=? WHERE name=?"); 167 deleteStmt = conn.prepareStatement("DELETE FROM JoramDB WHERE name=?"); 168 } catch (SQLException sqle) { 169 sqle.printStackTrace(); 170 throw new IOException(sqle.getMessage()); 171 } 172 173 perThreadContext = new ThreadLocal () { 174 protected synchronized Object initialValue() { 175 return new Context(); 176 } 177 }; 178 179 startTime = System.currentTimeMillis(); 180 181 if (logmon.isLoggable(BasicLevel.INFO)) 182 logmon.log(BasicLevel.INFO, "DBTransaction, initialized " + startTime); 183 184 185 setPhase(FREE); 186 } 187 188 public final File getDir() { 189 return dir; 190 } 191 192 197 public String getPersistenceDir() { 198 return dir.getPath(); 199 } 200 201 private int phase = INIT; 203 String phaseInfo = PhaseInfo[phase]; 204 205 208 public int getPhase() { 209 return phase; 210 } 211 212 public String getPhaseInfo() { 213 return phaseInfo; 214 } 215 216 private final void setPhase(int newPhase) { 217 phase = newPhase; 218 phaseInfo = PhaseInfo[phase]; 219 } 220 221 public final synchronized void begin() throws IOException { 222 while (phase != FREE) { 223 try { 224 wait(); 225 } catch (InterruptedException exc) { 226 } 227 } 228 setPhase(RUN); 230 } 231 232 241 public final synchronized String [] getList(String prefix) { 242 try { 243 Statement s = conn.createStatement(); 245 ResultSet rs = s.executeQuery("SELECT name FROM JoramDB WHERE name LIKE '" + prefix + "%'"); 246 247 Vector v = new Vector(); 248 while (rs.next()) { 249 v.add(rs.getString(1)); 250 } 251 rs.close(); 252 s.close(); 253 254 String [] result = new String [v.size()]; 255 result = (String []) v.toArray(result); 256 257 if (logmon.isLoggable(BasicLevel.DEBUG)) 258 logmon.log(BasicLevel.DEBUG, "DBTransaction, getList: " + v); 259 260 return result; 261 } catch (SQLException sqle) { 262 } 264 return null; 265 } 266 267 272 public boolean isPersistent() { 273 return true; 274 } 275 276 final String fname(String dirName, String name) { 277 if (dirName == null) { 278 return name; 279 } else { 280 return new StringBuffer (dirName).append('/').append(name).toString(); 281 } 282 } 283 284 static private final byte[] OOS_STREAM_HEADER = { 285 (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF), 286 (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF), 287 (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF), 288 (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF) 289 }; 290 291 public void save(Serializable obj, 292 String dirName, String name) throws IOException { 293 save(obj, fname(dirName, name)); 294 } 295 296 public void save(Serializable obj, String name) throws IOException{ 297 if (logmon.isLoggable(BasicLevel.DEBUG)) 298 logmon.log(BasicLevel.DEBUG, "DBTransaction, save(" + name + ")"); 299 300 Context ctx = (Context) perThreadContext.get(); 301 if (ctx.oos == null) { 302 ctx.bos.reset(); 303 ctx.oos = new ObjectOutputStream(ctx.bos); 304 } else { 305 ctx.oos.reset(); 306 ctx.bos.reset(); 307 ctx.bos.write(OOS_STREAM_HEADER, 0, 4); 308 } 309 ctx.oos.writeObject(obj); 310 ctx.oos.flush(); 311 312 saveInLog(ctx.bos.toByteArray(), name, ctx.log, false); 313 } 314 315 public void saveByteArray(byte[] buf, 316 String dirName, String name) throws IOException { 317 save(buf, fname(dirName, name)); 318 } 319 320 public void saveByteArray(byte[] buf, String name) throws IOException{ 321 if (logmon.isLoggable(BasicLevel.DEBUG)) 322 logmon.log(BasicLevel.DEBUG, "DBTransaction, saveByteArray(" + name + ")"); 323 324 Context ctx = (Context) perThreadContext.get(); 325 saveInLog(buf, name, ((Context) perThreadContext.get()).log, true); 326 } 327 328 private final void saveInLog(byte[] buf, 329 String name, 330 Hashtable log, 331 boolean copy) throws IOException { 332 DBOperation op = DBOperation.alloc(DBOperation.SAVE, name, buf); 333 DBOperation old = (DBOperation) log.put(name, op); 334 if (copy) { 335 if ((old != null) && 336 (old.type == DBOperation.SAVE) && 337 (old.value.length == buf.length)) { 338 op.value = old.value; 340 } else { 341 op.value = new byte[buf.length]; 343 } 344 System.arraycopy(buf, 0, op.value, 0, buf.length); 345 } 346 if (old != null) old.free(); 347 } 348 349 public Object load(String dirName, String name) throws IOException, ClassNotFoundException { 350 return load(fname(dirName, name)); 351 } 352 353 public Object load(String name) throws IOException, ClassNotFoundException { 354 byte[] buf = loadByteArray(name); 355 if (buf != null) { 356 ByteArrayInputStream bis = new ByteArrayInputStream(buf); 357 ObjectInputStream ois = new ObjectInputStream(bis); 358 return ois.readObject(); 359 } 360 return null; 361 } 362 363 public byte[] loadByteArray(String dirName, String name) throws IOException { 364 return loadByteArray(fname(dirName, name)); 365 } 366 367 public synchronized byte[] loadByteArray(String name) throws IOException { 368 if (logmon.isLoggable(BasicLevel.DEBUG)) 369 logmon.log(BasicLevel.DEBUG, "DBTransaction, loadByteArray(" + name + ")"); 370 371 Hashtable log = ((Context) perThreadContext.get()).log; 373 DBOperation op = (DBOperation) log.get(name); 374 if (op != null) { 375 if (op.type == DBOperation.SAVE) { 376 return op.value; 377 } else if (op.type == DBOperation.DELETE) { 378 return null; 380 } 381 } 382 383 try { 384 Statement s = conn.createStatement(); 386 ResultSet rs = s.executeQuery("SELECT content FROM JoramDB WHERE name='" + name + "'"); 388 389 if (!rs.next()) return null; 390 391 byte[] content = rs.getBytes(1); 392 393 rs.close(); 394 s.close(); 395 396 return content; 397 } catch (SQLException sqle) { 398 throw new IOException(sqle.getMessage()); 399 } 400 } 401 402 public void delete(String dirName, String name) { 403 delete(fname(dirName, name)); 404 } 405 406 public void delete(String name) { 407 if (logmon.isLoggable(BasicLevel.DEBUG)) 408 logmon.log(BasicLevel.DEBUG, 409 "DBTransaction, delete(" + name + ")"); 410 411 Hashtable log = ((Context) perThreadContext.get()).log; 412 DBOperation op = DBOperation.alloc(DBOperation.DELETE, name); 413 op = (DBOperation) log.put(name, op); 414 if (op != null) op.free(); 415 } 416 417 public final synchronized void commit() throws IOException { 418 if (phase != RUN) 419 throw new IllegalStateException ("Can not commit."); 420 421 if (logmon.isLoggable(BasicLevel.DEBUG)) 422 logmon.log(BasicLevel.DEBUG, "DBTransaction, commit"); 423 424 Hashtable log = ((Context) perThreadContext.get()).log; 425 if (! log.isEmpty()) { 426 DBOperation op = null; 427 for (Enumeration e = log.elements(); e.hasMoreElements(); ) { 428 op = (DBOperation) e.nextElement(); 429 if (op.type == DBOperation.SAVE) { 430 if (logmon.isLoggable(BasicLevel.DEBUG)) 431 logmon.log(BasicLevel.DEBUG, 432 "DBTransaction, commit.save (" + op.name + ')'); 433 434 try { 435 insertStmt.setString(1, op.name); 436 insertStmt.setBytes(2, op.value); 437 insertStmt.executeUpdate(); 438 } catch (SQLException sqle1) { 439 try { 440 updateStmt.setBytes(1, op.value); 441 updateStmt.setString(2, op.name); 442 updateStmt.executeUpdate(); 443 } catch (SQLException sqle) { 444 throw new IOException(sqle.getMessage()); 445 } 446 } 447 } else if (op.type == DBOperation.DELETE) { 448 if (logmon.isLoggable(BasicLevel.DEBUG)) 449 logmon.log(BasicLevel.DEBUG, 450 "DBTransaction, commit.delete (" + op.name + ')'); 451 452 try { 453 deleteStmt.setString(1, op.name); 454 deleteStmt.executeUpdate(); 455 } catch (SQLException sqle) { 456 throw new IOException(sqle.getMessage()); 457 } 458 } 459 op.free(); 460 } 461 log.clear(); 462 463 try { 464 conn.commit(); 465 } catch (SQLException sqle) { 466 throw new IOException(sqle.getMessage()); 467 } 468 } 469 470 if (logmon.isLoggable(BasicLevel.DEBUG)) 471 logmon.log(BasicLevel.DEBUG, "DBTransaction, committed"); 472 473 setPhase(COMMIT); 474 } 475 476 public final synchronized void rollback() throws IOException { 477 if (phase != RUN) 478 throw new IllegalStateException ("Can not rollback."); 479 480 if (logmon.isLoggable(BasicLevel.DEBUG)) 481 logmon.log(BasicLevel.DEBUG, "DBTransaction, rollback"); 482 483 setPhase(ROLLBACK); 484 ((Context) perThreadContext.get()).log.clear(); 485 } 486 487 public final synchronized void release() throws IOException { 488 if ((phase != RUN) && (phase != COMMIT) && (phase != ROLLBACK)) 489 throw new IllegalStateException ("Can not release transaction."); 490 491 setPhase(FREE); 493 notify(); 495 } 496 497 502 public synchronized void stop() { 503 if (logmon.isLoggable(BasicLevel.INFO)) 504 logmon.log(BasicLevel.INFO, "DBTransaction, stops"); 505 506 while (phase != FREE) { 507 try { 509 wait(); 510 } catch (InterruptedException exc) { 511 } 512 } 513 setPhase(FINALIZE); 514 515 try { 517 Statement s = conn.createStatement(); 520 conn.commit(); 523 logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#3"); 524 s.executeUpdate("SHUTDOWN COMPACT"); 525 logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#4"); 526 s.close(); 527 logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#5"); 528 } catch (SQLException sqle) { 529 logmon.log(BasicLevel.ERROR, "DBTransaction, stop#6", sqle); 532 } catch (Throwable t) { 533 logmon.log(BasicLevel.ERROR, "DBTransaction, stop#7", t); 534 } finally { 535 logmon.log(BasicLevel.INFO, "DBTransaction, stop#8"); 536 } 537 logmon.log(BasicLevel.INFO, "DBTransaction, TBR stop#9"); 538 setPhase(FREE); 539 540 if (logmon.isLoggable(BasicLevel.INFO)) { 541 logmon.log(BasicLevel.INFO, "NTransaction, stopped: "); 542 } 543 } 544 545 550 public synchronized void close() { 551 if (logmon.isLoggable(BasicLevel.INFO)) 552 logmon.log(BasicLevel.INFO, "DBTransaction, close"); 553 554 if (phase == INIT) return; 555 556 while (phase != FREE) { 557 try { 559 wait(); 560 } catch (InterruptedException exc) { 561 } 562 } 563 564 setPhase(FINALIZE); 565 try { 566 Statement s = conn.createStatement(); 568 s.execute("SHUTDOWN COMPACT"); 570 s.close(); 571 } catch (SQLException sqle) { 572 logmon.log(BasicLevel.ERROR, "DBTransaction, close", sqle); 575 } 576 setPhase(INIT); 577 578 if (logmon.isLoggable(BasicLevel.INFO)) { 579 logmon.log(BasicLevel.INFO, "DBTransaction, closed: "); 580 } 581 } 582 } 583 584 final class DBOperation implements Serializable { 585 static final int SAVE = 1; 586 static final int DELETE = 2; 587 static final int COMMIT = 3; 588 static final int END = 127; 589 590 int type; 591 String name; 592 byte[] value; 593 594 private DBOperation(int type, String name, byte[] value) { 595 this.type = type; 596 this.name = name; 597 this.value = value; 598 } 599 600 605 public String toString() { 606 StringBuffer strbuf = new StringBuffer (); 607 608 strbuf.append('(').append(super.toString()); 609 strbuf.append(",type=").append(type); 610 strbuf.append(",name=").append(name); 611 strbuf.append(')'); 612 613 return strbuf.toString(); 614 } 615 616 private static Pool pool = null; 617 618 static { 619 pool = new Pool("DBTransaction$Operation", 620 Integer.getInteger("DBLogThresholdOperation", 621 DBTransaction.LogThresholdOperation).intValue()); 622 } 623 624 static DBOperation alloc(int type, String name) { 625 return alloc(type, name, null); 626 } 627 628 static DBOperation alloc(int type, String name, byte[] value) { 629 DBOperation op = null; 630 631 try { 632 op = (DBOperation) pool.allocElement(); 633 } catch (Exception exc) { 634 return new DBOperation(type, name, value); 635 } 636 op.type = type; 637 op.name = name; 638 op.value = value; 639 640 return op; 641 } 642 643 void free() { 644 645 name = null; 646 value = null; 647 pool.freeElement(this); 648 } 649 } 650 | Popular Tags |