1 21 22 package org.apache.derby.impl.store.raw.log; 23 24 import org.apache.derby.iapi.reference.SQLState; 25 import org.apache.derby.iapi.reference.MessageId; 26 27 import org.apache.derby.impl.store.raw.log.LogCounter; 28 import org.apache.derby.impl.store.raw.log.LogRecord; 29 import org.apache.derby.impl.store.raw.log.StreamLogScan; 30 31 import org.apache.derby.iapi.services.sanity.SanityManager; 32 import org.apache.derby.iapi.error.StandardException; 33 import org.apache.derby.iapi.services.i18n.MessageService; 34 import org.apache.derby.iapi.store.raw.log.LogInstant; 35 import org.apache.derby.iapi.store.raw.log.LogFactory; 36 import org.apache.derby.iapi.store.raw.xact.TransactionId; 37 import org.apache.derby.iapi.services.io.ArrayInputStream; 38 39 import org.apache.derby.io.StorageRandomAccessFile; 40 41 import java.io.IOException ; 42 43 57 public class FlushedScan implements StreamLogScan { 58 59 private StorageRandomAccessFile scan; LogToFile logFactory; 63 boolean open; 65 long currentLogFileNumber; 67 long currentLogFileFirstUnflushedPosition; 68 73 long currentInstant; 77 long firstUnflushed = -1; long firstUnflushedFileNumber; 80 long firstUnflushedFilePosition; 81 82 static final int LOG_REC_LEN_BYTE_LENGTH = 4; 84 85 public FlushedScan(LogToFile logFactory, long startAt) 86 throws StandardException 87 { 88 if (SanityManager.DEBUG) 89 { 90 SanityManager.ASSERT(startAt != LogCounter.INVALID_LOG_INSTANT, 91 "cannot start scan on an invalid log instant"); 92 } 93 94 try 95 { 96 currentLogFileNumber = LogCounter.getLogFileNumber(startAt); 97 this.logFactory = logFactory; 98 scan = logFactory.getLogFileAtPosition(startAt); 99 setFirstUnflushed(); 100 open = true; 101 currentInstant = LogCounter.INVALID_LOG_INSTANT; } 103 104 catch (IOException ioe) 105 { 106 throw logFactory.markCorrupt( 107 StandardException.newException(SQLState.LOG_IO_ERROR, ioe)); 108 } 109 } 110 111 114 115 124 public LogRecord getNextRecord(ArrayInputStream input, 125 TransactionId tranId, 126 int groupmask) 127 throws StandardException 128 { 129 try 130 { 131 boolean candidate; 132 int peekAmount = LogRecord.formatOverhead() + LogRecord.maxGroupStoredSize(); 133 if (tranId != null) 134 peekAmount += LogRecord.maxTransactionIdStoredSize(tranId); 135 int readAmount; 137 LogRecord lr; 138 139 do 140 { 141 if (!open || !positionToNextRecord()) 142 return null; 143 144 int checkLength; 145 146 lr = null; 148 candidate = true; 149 readAmount = -1; 150 151 currentInstant = scan.readLong(); 152 byte[] data = input.getData(); 153 if (data.length < nextRecordLength) 154 { 155 data = new byte[nextRecordLength]; 158 input.setData(data); 159 } 160 161 if (logFactory.databaseEncrypted()) 162 { 163 scan.readFully(data, 0, nextRecordLength); 164 int len = logFactory.decrypt(data, 0, nextRecordLength, data, 0); 165 if (SanityManager.DEBUG) 166 SanityManager.ASSERT(len == nextRecordLength); 167 input.setLimit(0, len); 168 169 } 170 else { 172 if (groupmask == 0 && tranId == null) 173 { 174 scan.readFully(data, 0, nextRecordLength); 176 input.setLimit(0, nextRecordLength); 177 } 178 else 179 { 180 readAmount = (nextRecordLength > peekAmount) ? 185 peekAmount : nextRecordLength; 186 187 scan.readFully(data, 0, readAmount); 189 input.setLimit(0, readAmount); 190 191 } 192 } 193 194 lr = (LogRecord) input.readObject(); 195 196 if (groupmask != 0 || tranId != null) 197 { 198 if (groupmask != 0 && (groupmask & lr.group()) == 0) 199 candidate = false; 201 if (candidate && tranId != null) 202 { 203 TransactionId tid = lr.getTransactionId(); 204 if (!tid.equals(tranId)) candidate = false; } 207 208 214 if (candidate && !logFactory.databaseEncrypted()) 215 { 216 if (SanityManager.DEBUG) 218 SanityManager.ASSERT(readAmount > 0); 219 220 if (readAmount < nextRecordLength) 221 { 222 int inputPosition = input.getPosition(); 227 228 scan.readFully(data, readAmount, 229 nextRecordLength-readAmount); 230 231 input.setLimit(0, nextRecordLength); 232 input.setPosition(inputPosition); 233 } 234 } 235 } 236 237 if (candidate || logFactory.databaseEncrypted()) 238 { 239 checkLength = scan.readInt(); 240 241 if (SanityManager.DEBUG) 242 { 243 SanityManager.ASSERT(checkLength == nextRecordLength, "log currupted"); 244 } 245 } 246 else { 248 long nextRecordStartPosition = 252 LogCounter.getLogFilePosition(currentInstant) + 253 nextRecordLength + LogToFile.LOG_RECORD_OVERHEAD; 254 255 scan.seek(nextRecordStartPosition); 256 } 257 258 } while (candidate == false); 259 260 return lr; 261 } 262 catch (ClassNotFoundException cnfe) 263 { 264 throw logFactory.markCorrupt( 265 StandardException.newException(SQLState.LOG_CORRUPTED, cnfe)); 266 } 267 catch (IOException ioe) 268 { 269 throw logFactory.markCorrupt( 270 StandardException.newException(SQLState.LOG_IO_ERROR, ioe)); 271 } 272 } 273 274 280 public void resetPosition(LogInstant instant) throws IOException 281 { 282 if (SanityManager.DEBUG) 283 { 284 SanityManager.THROWASSERT("Unsupported feature"); 285 } 286 } 287 288 293 public long getLogRecordEnd() 294 { 295 if (SanityManager.DEBUG) 296 { 297 SanityManager.THROWASSERT("Unsupported feature"); 298 } 299 return LogCounter.INVALID_LOG_INSTANT; 300 } 301 302 303 308 public boolean isLogEndFuzzy() 309 { 310 if (SanityManager.DEBUG) 311 { 312 SanityManager.THROWASSERT("Unsupported feature"); 313 } 314 return false; 315 } 316 317 321 public long getInstant() 322 { 323 return currentInstant; 324 } 325 326 330 public LogInstant getLogInstant() 331 { 332 if (currentInstant == LogCounter.INVALID_LOG_INSTANT) 333 return null; 334 else 335 return new LogCounter(currentInstant); 336 } 337 338 341 public void close() 342 { 343 if (scan != null) 344 { 345 try 346 { 347 scan.close(); 348 } 349 catch (IOException ioe) 350 {} 351 352 scan = null; 353 } 354 currentInstant = LogCounter.INVALID_LOG_INSTANT; 355 open = false; 356 } 357 358 361 private void setFirstUnflushed() 362 throws StandardException, IOException 363 { 364 LogInstant firstUnflushedInstant = 365 logFactory.getFirstUnflushedInstant(); 366 firstUnflushed = ((LogCounter)firstUnflushedInstant).getValueAsLong(); 367 firstUnflushedFileNumber = LogCounter.getLogFileNumber(firstUnflushed); 368 firstUnflushedFilePosition = LogCounter.getLogFilePosition(firstUnflushed); 369 370 setCurrentLogFileFirstUnflushedPosition(); 371 } 372 373 private void setCurrentLogFileFirstUnflushedPosition() 374 throws IOException 375 { 376 388 if (currentLogFileNumber == firstUnflushedFileNumber) 389 currentLogFileFirstUnflushedPosition = firstUnflushedFilePosition; 390 else if (currentLogFileNumber < firstUnflushedFileNumber) 391 currentLogFileFirstUnflushedPosition = scan.length(); 392 else 393 { 394 throw new IOException ( 396 MessageService.getTextMessage(MessageId.LOG_BAD_START_INSTANT)); 397 } 398 } 399 400 private void switchLogFile() 401 throws StandardException 402 { 403 try 404 { 405 readNextRecordLength = false; 406 scan.close(); 407 scan = null; 408 scan = logFactory.getLogFileAtBeginning(++currentLogFileNumber); 409 setCurrentLogFileFirstUnflushedPosition(); 410 } 411 412 catch (IOException ioe) 413 { 414 throw logFactory.markCorrupt( 415 StandardException.newException(SQLState.LOG_IO_ERROR, ioe)); 416 } 417 } 418 419 425 int nextRecordLength; 426 427 441 boolean readNextRecordLength; 442 443 private boolean currentLogFileHasUnflushedRecord() 444 throws IOException 445 { 446 if (SanityManager.DEBUG) 447 SanityManager.ASSERT(scan != null, "scan is null"); 448 long curPos = scan.getFilePointer(); 449 450 if (!readNextRecordLength) 451 { 452 if (curPos + LOG_REC_LEN_BYTE_LENGTH > 453 currentLogFileFirstUnflushedPosition) 454 return false; 455 456 nextRecordLength = scan.readInt(); 457 curPos+=4; 458 readNextRecordLength = true; 459 } 460 461 if (nextRecordLength==0) return false; 462 463 int bytesNeeded = 464 nextRecordLength + LOG_REC_LEN_BYTE_LENGTH; 465 466 if (curPos + bytesNeeded > currentLogFileFirstUnflushedPosition) 467 { 468 return false; 469 } 470 else 471 { 472 readNextRecordLength = false; 473 return true; 474 } 475 } 476 477 private boolean positionToNextRecord() 478 throws StandardException, IOException 479 { 480 if (currentLogFileHasUnflushedRecord()) return true; 483 484 setFirstUnflushed(); 486 487 if (currentLogFileHasUnflushedRecord()) return true; 490 491 while(currentLogFileNumber < firstUnflushedFileNumber) 495 { 496 switchLogFile(); 497 if (currentLogFileHasUnflushedRecord()) return true; 498 } 499 500 currentInstant = LogCounter.INVALID_LOG_INSTANT; 502 return false; 503 } 504 } 505 | Popular Tags |