1 5 package org.h2.store; 6 7 import java.lang.ref.WeakReference ; 8 import java.sql.SQLException ; 9 10 import org.h2.engine.Constants; 11 import org.h2.engine.Database; 12 import org.h2.engine.DbObject; 13 import org.h2.index.BtreeIndex; 14 import org.h2.message.Trace; 15 import org.h2.message.TraceSystem; 16 import org.h2.table.Table; 17 import org.h2.util.ObjectArray; 18 import org.h2.util.TempFileDeleter; 19 20 public class WriterThread extends Thread { 21 private volatile WeakReference databaseRef; 28 private int writeDelay; 29 private long lastIndexFlush; 30 private volatile boolean stop; 31 32 private WriterThread(Database database, int writeDelay) { 33 this.databaseRef = new WeakReference (database); 34 this.writeDelay = writeDelay; 35 } 36 37 public void setWriteDelay(int writeDelay) { 38 LogSystem log = getLog(); 39 this.writeDelay = writeDelay; 40 if(writeDelay < Constants.MIN_WRITE_DELAY) { 42 log.setFlushOnEachCommit(true); 43 } else { 44 log.setFlushOnEachCommit(false); 45 } 46 } 47 48 public static WriterThread create(Database database, int writeDelay) { 49 WriterThread thread = new WriterThread(database, writeDelay); 50 thread.setName("H2 Log Writer " + database.getShortName()); 51 thread.setDaemon(true); 52 thread.start(); 53 return thread; 54 } 55 56 private LogSystem getLog() { 57 Database database = (Database)databaseRef.get(); 58 if(database == null) { 59 return null; 60 } 61 LogSystem log = database.getLog(); 62 return log; 63 } 64 65 private void flushIndexes(Database database) { 66 long time = System.currentTimeMillis(); 67 if(lastIndexFlush + Constants.FLUSH_INDEX_DELAY > time) { 68 return; 69 } 70 synchronized(database) { 71 ObjectArray array = database.getAllSchemaObjects(DbObject.INDEX); 72 for(int i=0; i<array.size(); i++) { 73 DbObject obj = (DbObject) array.get(i); 74 if(obj instanceof BtreeIndex) { 75 BtreeIndex idx = (BtreeIndex) obj; 76 if(idx.getLastChange() == 0) { 77 continue; 78 } 79 Table tab = idx.getTable(); 80 if(tab.isLockedExclusively()) { 81 continue; 82 } 83 if(idx.getLastChange() + Constants.FLUSH_INDEX_DELAY > time) { 84 continue; 85 } 86 try { 87 idx.flush(database.getSystemSession()); 88 } catch(SQLException e) { 89 database.getTrace(Trace.DATABASE).error("flush index " +idx.getName(), e); 90 } 91 } 92 } 93 } 94 lastIndexFlush = time; 95 } 96 97 public void run() { 98 while(!stop) { 99 TempFileDeleter.deleteUnused(); 100 Database database = (Database)databaseRef.get(); 101 if(database == null) { 102 break; 103 } 104 if(Constants.FLUSH_INDEX_DELAY != 0) { 105 flushIndexes(database); 106 } 107 LogSystem log = database.getLog(); 108 if(log == null) { 109 break; 110 } 111 try { 112 log.flush(); 113 } catch(SQLException e) { 114 TraceSystem traceSystem = database.getTraceSystem(); 115 if(traceSystem != null) { 116 traceSystem.getTrace(Trace.LOG).error("flush", e); 117 } 118 } 119 int wait = writeDelay > 0 ? writeDelay : 1; 122 try { 123 Thread.sleep(wait); 124 } catch (InterruptedException e) { 125 } 127 } 128 databaseRef = null; 129 } 130 131 public void stopThread() { 132 stop = true; 133 } 134 135 } 136 | Popular Tags |