1 package com.quadcap.sql.file; 2 3 40 41 import java.io.BufferedInputStream ; 42 import java.io.EOFException ; 43 import java.io.File ; 44 import java.io.IOException ; 45 import java.io.InputStream ; 46 import java.io.RandomAccessFile ; 47 48 import java.util.Properties ; 49 50 import com.quadcap.sql.io.ObjectOutputStream; 51 import com.quadcap.sql.io.ObjectInputStream; 52 53 import com.quadcap.util.collections.LongMap; 54 55 import com.quadcap.util.Debug; 56 import com.quadcap.util.Util; 57 58 63 public class Logger1 implements Logger { 64 LogBuffer cb; 65 InputStream cbIn = null; 66 ObjectOutputStream oos = null; 67 ByteArrayRandomAccess bra = new ByteArrayRandomAccess(); 68 RandomAccessInputStream ris = new RandomAccessInputStream(bra); 69 ObjectInputStream ois = new ObjectInputStream(ris); 70 byte[] tmp = new byte[8]; 71 72 80 TransMap[] trans = new TransMap[16]; 81 82 87 int numTrans; 88 89 93 int liveTrans = 0; 94 95 98 long prevOp; 99 100 int maxSize = 128 * 1024 * 1024; 101 int minSize = 128 * 1024; 102 103 Log myLog; 104 Datafile db; 105 106 Logger1() {} 107 108 public void init(Log log, boolean create, Properties props) 109 throws IOException 110 { 111 this.myLog = log; 112 this.db = log.getDatafile(); 113 this.cb = new LogBuffer(); 114 125 maxSize = Integer.parseInt(props.getProperty("maxLogSize", "" + maxSize)); 126 127 137 minSize = Integer.parseInt(props.getProperty("minLogSize", "" + minSize)); 138 139 File logfile = new File (db.getDbRootDir(), "logfile"); 140 RandomAccessFile raf = new RandomAccessFile (logfile, "rw"); 141 FileRandomAccess fra = new FileRandomAccess(raf, maxSize); 142 RandomAccess ra = fra; 143 if (create) { 144 cb.init(ra, maxSize); 145 } else { 146 lastSize = ra.size(); 147 cb.init(ra, props); 148 } 149 } 150 151 public void init(File file) throws IOException { 152 RandomAccessFile raf = new RandomAccessFile (file, "rw"); 153 FileRandomAccess fra = new FileRandomAccess(raf, raf.length()); 154 RandomAccess ra = fra; 155 this.cb = new LogBuffer(); 156 lastSize = ra.size(); 157 cb.init(ra, new Properties ()); 158 } 159 160 161 164 class TransMap { 165 long transId; 166 170 int bufStart; 171 175 int bufEnd; 176 boolean complete; 177 178 public void init(long t, int p) { 179 this.transId = t; 180 this.bufStart = p; 181 this.bufEnd = p; 182 this.complete = false; 183 } 184 185 public void init(TransMap t) { 186 this.transId = t.transId; 187 this.bufStart = -1; 188 this.bufEnd = -1; 189 this.complete = t.complete; 190 } 191 192 public void copy(TransMap t) { 193 this.transId = t.transId; 194 this.bufStart = t.bufStart; 195 this.bufEnd = t.bufEnd; 196 this.complete = t.complete; 197 } 198 199 public String toString() { 200 return "TransMap[" + transId + ": " + bufStart + "-" + bufEnd + 201 (complete ? " (COMPLETE)" : "") + "]"; 202 } 203 } 204 205 public String toString() { 206 StringBuffer sb = new StringBuffer ("Logger1 {" + cb.getCheckpoint() + 207 ", " + cb.getEnd() + "} "); 208 for (int i = 0; i < trans.length; i++) { 209 TransMap tm = trans[i]; 210 if (tm != null && !tm.complete) { 211 sb.append(tm.toString()); 212 } 213 } 214 return sb.toString(); 215 } 216 217 public int getCheckpoint() { 218 return cb.getCheckpoint(); 219 } 220 221 public int getEnd() { 222 return cb.getEnd(); 223 } 224 225 public int rput(LogEntry op) throws IOException { 226 int pos = cb.getEnd(); 227 op.setPosition(pos); 228 229 long id = op.getTransactionId(); 230 int tx = findTransaction(id); 231 if (tx >= 0) { 232 TransMap tm = trans[tx]; 233 op.setPrev(tm.bufEnd); 234 tm.bufEnd = pos; 235 } 236 237 try { 238 if (oos == null) { 239 oos = new ObjectOutputStream(cb.getOutputStream()); 240 } 241 oos.writeObject(op); 242 oos.flush(); 243 } catch (IOException ex) { 244 if (tx >= 0) { 245 trans[tx].bufEnd = op.getPrev(); 246 } 247 throw ex; 248 } 249 return pos; 250 } 251 252 public void put(LogEntry op) throws IOException { 253 int pos = rput(op); 254 switch (op.getCode()) { 255 case LogEntry.BEGIN_TRANSACTION: 256 beginTransaction(op.getTransactionId(), pos); 257 break; 258 case LogEntry.COMMIT: 259 endTransaction(op.getTransactionId()); 260 break; 261 } 262 } 265 266 public void setRedoState(LogEntry op, int state) throws IOException { 267 int pos = op.getPosition(); 268 cb.writeByte(pos+1, (byte)state); 270 op.redoState = state; 271 } 272 273 public LogEntry getLastOp(long transId) throws IOException { 274 LogEntry ret = null; 275 int tx = findTransaction(transId); 276 if (tx >= 0) { 277 TransMap tm = trans[tx]; 278 ret = readEntry(tm.bufEnd); 279 } 280 if (ret == null) { 282 Debug.println(Util.stackTrace()); 283 Debug.println("getLastOp(" + transId + ") = null, trans:"); 284 for (int i = 0; i < numTrans; i++) { 285 Debug.println("trans[" + i + "] = " + trans[i]); 286 } 287 } 288 return ret; 290 } 291 292 public LogEntry getPrevOp(LogEntry op) throws IOException { 293 int pos = op.getPrev(); 294 int len = op.getPosition() - pos; 295 LogEntry ret = readEntry(pos, len); 296 return ret; 297 } 298 299 public LogEntry getFirstOp() throws IOException { 300 LogEntry ret = readEntry(cb.getBegin()); 301 return ret; 302 } 303 304 public LogEntry getNextOp() throws IOException { 305 return readEntry(); 306 } 307 308 public void sync() throws IOException { 309 cb.sync(); 310 } 311 312 public void close() throws IOException { 314 cb.close(); 315 } 316 317 long lastSize = 0; 318 public void reset() throws IOException { 319 cb.reset(); 320 if (lastSize - cb.size() > 100 * 1000) { 321 cb.truncate(); 322 } 323 lastSize = cb.size(); 324 } 325 326 public long getOldestTransaction() { 327 if (numTrans > 0) { 328 return trans[0].transId; 329 } else { 330 return -1; 331 } 332 } 333 334 public int getActiveTransactionCount() { 335 return liveTrans; 336 } 337 338 public LongMap getActiveTransactions() { 339 LongMap map = new LongMap(16); 340 for (int i = 0; i < numTrans; i++) { 341 TransMap t = trans[i]; 342 if (!t.complete) { 343 map.put(t.transId, ""); 344 } 345 } 346 return map; 347 } 348 349 public void checkpoint() throws IOException { 350 int offset = 0; 351 for (int i = 0; i < numTrans; i++) { 352 TransMap t = trans[i]; 353 if (t.complete) { 354 offset++; 355 } else if (offset > 0) { 356 trans[i-offset].init(t); 357 } 358 } 359 if (Trace.bit(22)) { 361 Debug.println("BEGIN checkpoint [" + cb.getBegin() + "-" + 362 cb.getEnd() + " (" + numTrans + " transactions)]"); 363 } 364 numTrans -= offset; 366 LogEntry entry = readEntry(0); 368 cb.reset(); 369 if (numTrans > 0) { 370 while (entry != null) { 371 boolean keep = false; 372 long id = entry.getTransactionId(); 373 int tx = findTransaction(id); 374 if (tx >= 0) { 375 TransMap tm = trans[tx]; 376 keep = !tm.complete; 377 if (keep) { 378 if (tm.bufStart < 0) tm.bufStart = cb.getEnd(); 379 } 380 } 381 if (keep) { 382 if (Trace.bit(23)) { 384 Debug.println(" [" + cb.getEnd() + "] -> " + entry); 385 } 386 rput(entry); 388 } else { 389 if (Trace.bit(23)) { 391 Debug.println(" [" + cb.getEnd() + "] -> DISCARD: " + entry); 392 } 393 entry.discard(db); 395 } 396 entry = readEntry(); 397 } 398 } 399 cb.checkpoint(); 400 if (Trace.bit(22)) { 402 Debug.println("END checkpoint [" + cb.getBegin() + "-" + cb.getEnd() + 403 " (" + numTrans + " transactions)]"); 404 } 405 } 407 408 410 private final void beginTransaction(long transId, int pos) { 411 if (numTrans >= trans.length) { 412 int newcap = numTrans + (numTrans/2) + 2; 413 trans = (TransMap[])Util.checkCapacity(trans, newcap); 414 } 415 TransMap t = trans[numTrans]; 416 if (t == null) { 417 t = new TransMap(); 418 trans[numTrans] = t; 419 } 420 421 for (int i = numTrans-1; i >= 0; i--) { 426 if (trans[i].transId > transId) { 427 trans[i+1].copy(trans[i]); 428 t = trans[i]; 429 } else { 430 break; 431 } 432 } 433 numTrans++; 434 liveTrans++; 436 t.init(transId, pos); 437 } 439 440 private final void endTransaction(final long transId) throws IOException { 441 int pos = findTransaction(transId); 443 if (pos >= 0) { 444 trans[pos].complete = true; 449 liveTrans--; 450 if (pos == 0 && cb.size() > minSize) { 451 checkpoint(); 452 } 453 } 454 } 455 456 459 int findTransaction(long transId) { 460 int lo = 0; 461 int hi = numTrans-1; 462 while (hi >= lo) { 463 int mid = (hi+lo) / 2; 464 TransMap t = trans[mid]; 465 if (t.transId == transId) { 466 return mid; 467 } 468 if (t.transId < transId) { 469 lo = mid+1; 470 } else { 471 hi = mid-1; 472 } 473 } 474 return -1; 475 } 476 477 private final LogEntry readEntry(int pos, int len) throws IOException { 478 if (pos < 0) return null; 479 bra.resize(len); 480 byte[] buf = bra.getBytes(); 481 cb.read(pos, buf, 0, len); 482 ris.setPosition(0); 483 ois.setInputStream(ris); 484 ois.setPosition(pos); 485 486 LogEntry ret = readEntry(); 487 return ret; 488 } 489 490 private final LogEntry readEntry(int pos) throws IOException { 491 if (pos < 0) return null; 492 cbIn = new BufferedInputStream (cb.getInputStream(pos)); 493 ois.setInputStream(cbIn); 494 ois.setPosition(pos); 495 496 LogEntry ret = readEntry(); 497 return ret; 498 } 499 500 private final LogEntry readEntry() throws IOException { 501 Object obj = null; 502 try { 503 int pos = (int)ois.getPosition(); 504 LogEntry ret = (LogEntry)(obj = ois.readObject()); 505 if (ret != null) { 506 ret.setPosition(pos); 507 } 508 return ret; 509 } catch (EOFException ex) { 510 return null; 511 } catch (ClassCastException ez) { 512 throw new DatafileException(ez); 513 } catch (ClassNotFoundException ex) { 514 throw new DatafileException(ex); 515 } 516 } 517 } 518 519 | Popular Tags |