1 package com.quadcap.sql.file; 2 3 40 41 import java.io.InputStream ; 42 import java.io.IOException ; 43 import java.io.OutputStream ; 44 45 import java.util.Properties ; 46 47 import com.quadcap.util.Debug; 48 import com.quadcap.util.Util; 49 50 55 public class LogBuffer { 56 RandomAccess ra; 57 58 61 int bX; 62 63 66 int eX; 67 68 71 int cX; 72 73 76 int maxSize = 128 * 1024 * 1024; 77 78 81 static final int headerSize = 16; 82 83 86 public LogBuffer() {} 87 88 91 public void init(RandomAccess ra, Properties props) throws IOException { 92 this.ra = ra; 93 this.bX = ra.readInt(0); 94 this.eX = ra.readInt(4); 95 this.cX = ra.readInt(8); 96 maxSize = Integer.parseInt(props.getProperty("maxLogSize", "" + maxSize)); 97 } 98 99 105 public void init(RandomAccess ra, int m) throws IOException { 106 this.ra = ra; 107 this.bX = 0; 108 this.eX = 0; 109 this.cX = 0; 110 this.maxSize = m; 111 sync(); 112 } 113 114 118 public InputStream getInputStream(int pos) { 119 return new LogInputStream(this, pos, this.eX); 120 } 121 122 126 public OutputStream getOutputStream() { 127 return new LogOutputStream(this); 128 } 129 130 134 public int getEnd() { return eX; } 135 136 140 public int getBegin() { return bX; } 141 142 145 public int getCheckpoint() { return cX; } 146 147 150 public void checkpoint() { this.cX = this.eX; } 151 152 155 public void setBegin(int b) { 156 if (Trace.bit(13)) { 158 Debug.println(toString() + ".setBegin(" + b + ")"); 159 } 160 bX = b; 162 } 163 164 167 public void reset() { 168 if (Trace.bit(13)) { 170 Debug.println(toString() + ".reset() -----------------------"); 171 } 172 bX = 0; 174 eX = 0; 175 cX = 0; 176 } 177 178 182 public void truncate() throws IOException { 183 ra.resize(size() + headerSize); 184 } 185 186 189 public int size() { 190 return eX - bX; 191 } 192 193 196 public int read(int pos, byte[] buf, int off, int cnt) 197 throws IOException 198 { 199 int ret = 0; 200 if (cnt > 0) { 201 ra.read(pos + headerSize, buf, off, cnt); 202 ret = cnt; 203 } 204 if (Trace.bit(13)) { 206 Debug.println(toString() + ".read(" + pos + ", " + cnt + ") = " + 207 ret + ", " + 208 Util.hexBytes(buf, off, ret)); 209 } 210 return ret; 212 } 213 214 217 public int readByte(int pos) throws IOException { 218 int ret = ra.readByte(pos + headerSize); 219 if (Trace.bit(13)) { 221 Debug.println(toString() + ".read(" + pos + ") = " + ret); 222 } 223 return ret; 225 } 226 227 231 public void write(byte[] buf, int off, int cnt) 232 throws IOException 233 { 234 if (Trace.bit(13)) { 236 Debug.println(toString() + ".write(" + 237 Util.hexBytes(buf, off, cnt) + ")"); 238 } 239 if (eX + cnt > maxSize) { 241 Debug.println("eX: " + eX + ", write(" + off + ": " + 243 Util.strBytes(buf, off, cnt)); 244 throw new IOException ("LogBuffer full: " + eX + " + " + cnt + " bytes"); 246 } 247 if (cnt > 0) { 248 ra.write(eX + headerSize, buf, off, cnt); 249 eX += cnt; 250 } 251 } 252 253 256 public void writeByte(int pos, int b) throws IOException { 257 if (Trace.bit(13)) { 259 Debug.println(toString() + ".writeByte(" + 260 pos + ", " + b + ")"); 261 } 262 ra.writeByte(pos + headerSize, b); 264 } 265 266 269 public void sync() throws IOException { 270 if (Trace.bit(13) || Trace.bit(24)) { 272 Debug.println(toString() + ".sync()"); 273 } 274 ra.writeInt(0, bX); 276 ra.writeInt(4, eX); 277 ra.writeInt(8, cX); 278 ra.writeInt(12, maxSize); 279 ra.flush(); 280 } 281 282 285 public void close() throws IOException { 286 if (Trace.bit(13)) { 288 Debug.println(toString() + ".close()"); 289 } 290 sync(); 292 ra.close(); 293 } 294 295 298 public int addPos(int pos, int amt) { 299 int ret = pos + amt; 300 return ret; 301 } 302 303 306 class LogInputStream extends InputStream { 307 LogBuffer b; 308 int pos; 309 int lim; 310 311 LogInputStream(LogBuffer b, int pos, int lim) { 312 if (Trace.bit(13)) { 314 Debug.println("CIS(" + pos + "-" + lim + ")"); 315 } 316 this.b = b; 318 this.pos = pos; 319 this.lim = lim; 320 } 321 322 public int read(byte[] buf, int off, int cnt) 323 throws IOException 324 { 325 int xpos = pos; 327 int xpos2 = pos + cnt; 329 if (xpos2 > lim) { 330 cnt -= (xpos2 - lim); 331 } 332 int ret = -1; 333 if (cnt > 0) { 334 ret = b.read(pos, buf, off, cnt); 335 if (ret > 0) { 336 pos = b.addPos(pos, ret); 337 } else { 338 ret = -1; 339 } 340 } 341 if (Trace.bit(14)) { 343 Debug.println("CIS[" + xpos + "-" + lim + 344 "].read() = " + ret + ": " + 345 Util.hexBytes(buf, off, ret)); 346 } 347 return ret; 349 } 350 351 public int read() throws IOException { 352 int xpos = pos; 354 int ret = b.readByte(pos); 356 if (ret >= 0) { 357 pos = b.addPos(pos, 1); 358 } 359 if (Trace.bit(14)) { 361 Debug.println("CIS[" + xpos + "].read() = " + ret); 362 } 363 return ret; 365 } 366 } 367 368 371 class LogOutputStream extends OutputStream { 372 LogBuffer b; 373 374 LogOutputStream(LogBuffer b) { 375 this.b = b; 376 } 377 378 public void write(byte[] buf, int off, int cnt) throws IOException { 379 if (Trace.bit(14)) { 381 Debug.println("COS[" + b.eX + "].write(" + 382 Util.hexBytes(buf, off, cnt) + ")"); 383 } 384 b.write(buf, off, cnt); 386 } 387 388 public void write(int x) throws IOException { 389 if (Trace.bit(14)) { 391 Debug.println("COS[" + b.eX + "].write(" + x + ")"); 392 } 393 if (false) { 395 b.writeByte(b.eX, x); 396 b.eX = addPos(b.eX, 1); 397 } else { 398 byte[] fu = new byte[1]; 399 fu[0] = (byte)x; 400 write(fu, 0, 1); 401 } 402 } 403 } 404 405 407 public String toString() { 408 return "LogBuffer(" + maxSize + "," + bX + "," + eX + "," + cX + ")"; 409 } 410 411 public static void main(String [] args) { 412 try { 413 test1(); 414 } catch (Throwable t) { 415 Debug.print(t); 416 } 417 } 418 419 static void test1() throws Exception { 420 int size = 100000; 421 ByteArrayRandomAccess bra = new ByteArrayRandomAccess(size + 12); 422 BufferedRandomAccess bbra = new BufferedRandomAccess(bra); 423 LogBuffer cb = new LogBuffer(); 424 cb.init(bbra, size); 425 426 438 int xs = 12; 439 int[] poss = new int[xs]; 440 int[] sizz = new int[xs]; 441 int beg = cb.getBegin(); 442 byte[] buf = new byte[4096]; 443 byte[] rbuf = new byte[4096]; 444 OutputStream os = cb.getOutputStream(); 445 int wsize = 1; 446 for (int i = 0; i < xs; i++) { 447 poss[i] = cb.getEnd(); 448 sizz[i] = wsize+1; 449 wsize += wsize; 450 makeBuf(buf, i, sizz[i]); 451 os.write(buf, 0, sizz[i]); 452 for (int j = 0; j < i; j++) { 453 makeBuf(buf, j, sizz[j]); 454 InputStream is = cb.getInputStream(poss[j]); 455 is.read(rbuf, 0, sizz[i]); 456 if (Util.compareBytes(buf, 0, sizz[j], rbuf, 0, sizz[j]) != 0){ 457 throw new Exception ("R " + i + ": " + 458 Util.hexBytes(buf, 0, sizz[i]) + 459 " vs " + j + ": " + 460 Util.hexBytes(rbuf, 0, sizz[i])); 461 } 462 } 463 } 464 465 } 466 467 static void makeBuf(byte[] buf, int x, int siz) { 468 x *= 13; 469 for (int i = 0; i < siz; i++) { 470 buf[i] = (byte)(x++); 471 } 472 } 473 474 476 } 477 | Popular Tags |