1 22 package org.jboss.tm.recovery; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.DataInputStream ; 26 import java.io.File ; 27 import java.io.FileInputStream ; 28 import java.io.FileNotFoundException ; 29 import java.io.IOException ; 30 import java.io.ObjectInputStream ; 31 import java.io.RandomAccessFile ; 32 import java.nio.ByteBuffer ; 33 import java.nio.channels.FileChannel ; 34 import java.util.HashMap ; 35 import java.util.Iterator ; 36 import java.util.List ; 37 import java.util.Map ; 38 39 import org.jboss.logging.Logger; 40 import org.jboss.tm.XidFactoryBase; 41 42 54 class BatchRecoveryLogReader 55 implements RecoveryLogReader 56 57 { 58 59 private static Logger errorLog = 60 Logger.getLogger(BatchRecoveryLogReader.class); 61 62 63 private File logFile; 64 65 66 private XidFactoryBase xidFactory; 67 68 75 BatchRecoveryLogReader(File logFile, XidFactoryBase xidFactory) 76 { 77 this.logFile = logFile; 78 this.xidFactory = xidFactory; 79 } 80 81 91 private Object readHeaderObject(DataInputStream dis) 92 throws IOException , 93 ClassNotFoundException 94 { 95 int num = dis.readInt(); 96 byte[] bytes = new byte[num]; 97 dis.read(bytes); 98 ByteArrayInputStream bais = new ByteArrayInputStream (bytes); 99 ObjectInputStream ois = new ObjectInputStream (bais); 100 return ois.readObject(); 101 } 102 103 105 110 public String getLogFileName() 111 { 112 return logFile.toString(); 113 } 114 115 120 public String getBranchQualifier() 121 { 122 FileInputStream fis; 123 DataInputStream dis; 124 125 try 126 { 127 fis = new FileInputStream (logFile); 128 dis = new DataInputStream (fis); 129 try 130 { 131 return (String ) readHeaderObject(dis); 132 } 133 finally 134 { 135 fis.close(); 136 } 137 } 138 catch (Exception e) 139 { 140 throw new RuntimeException (e); 141 } 142 } 143 144 167 public void recover(List committedSingleTmTransactions, 168 List committedMultiTmTransactions, 169 List inDoubtTransactions, 170 List inDoubtJcaTransactions) 171 { 172 Map activeMultiTmTransactions = new HashMap (); 173 FileInputStream fis; 174 DataInputStream dis; 175 CorruptedLogRecordException corruptedLogRecordException = null; 176 177 try 178 { 179 fis = new FileInputStream (logFile); 180 dis = new DataInputStream (fis); 181 182 } 183 catch (IOException e) 184 { 185 throw new RuntimeException (e); 186 } 187 188 try 189 { 190 readHeaderObject(dis); 192 if (fis.available() < LogRecord.FULL_HEADER_LEN) 193 return; 194 195 FileChannel channel = fis.getChannel(); 196 ByteBuffer buf = ByteBuffer.allocate(LogRecord.FULL_HEADER_LEN); 197 channel.read(buf); 198 199 LogRecord.Data data; 200 int len = LogRecord.getNextRecordLength(buf, 0); 201 202 while (len > 0) 203 { 204 buf = ByteBuffer.allocate(len + LogRecord.FULL_HEADER_LEN); 205 if (channel.read(buf) < len) 206 { 207 errorLog.info("Unexpected end of file in transaction log file " + 208 logFile.getName()); 209 break; 210 } 211 buf.flip(); 212 data = new LogRecord.Data(); 213 try 214 { 215 LogRecord.getData(buf, len, data); 216 } 217 catch (CorruptedLogRecordException e) 218 { 219 if (corruptedLogRecordException == null) 221 corruptedLogRecordException = e; 222 long corruptedRecordPos = 223 channel.position() - buf.limit() - LogRecord.FULL_HEADER_LEN; 224 long nextPos = scanForward(corruptedRecordPos + 1); 225 if (nextPos == 0) 226 { 227 errorLog.info("LOG CORRUPTION AT THE END OF LOG FILE " + 228 logFile.getName()); 229 break; 230 } 231 else 232 { 233 errorLog.info("LOG CORRUPTION IN THE MIDDLE OF LOG FILE " + 234 logFile.getName() + ". Skipping " + 235 (nextPos - corruptedRecordPos) + " bytes" + 236 ". Disabling presumed rollback."); 237 channel.position(nextPos); 238 buf = ByteBuffer.allocate(LogRecord.FULL_HEADER_LEN); 239 channel.read(buf); 240 len = LogRecord.getNextRecordLength(buf, 0); 241 corruptedLogRecordException.disablePresumedRollback = true; 242 continue; 243 } 244 } 245 switch (data.recordType) 246 { 247 case LogRecord.TX_COMMITTED: 248 data.globalTransactionId = 249 xidFactory.localIdToGlobalId(data.localTransactionId); 250 committedSingleTmTransactions.add(data); 251 break; 252 case LogRecord.MULTI_TM_TX_COMMITTED: 253 data.globalTransactionId = 254 xidFactory.localIdToGlobalId(data.localTransactionId); 255 case LogRecord.TX_PREPARED: 257 case LogRecord.JCA_TX_PREPARED: 258 activeMultiTmTransactions.put(new Long (data.localTransactionId), 259 data); 260 break; 261 case LogRecord.TX_END: 262 activeMultiTmTransactions.remove( 263 new Long (data.localTransactionId)); 264 break; 265 default: 266 errorLog.warn("INVALID TYPE IN LOG RECORD."); 267 break; 268 } 269 try 270 { 271 len = LogRecord.getNextRecordLength(buf, len); 272 } 273 catch (CorruptedLogRecordException e) 274 { 275 if (corruptedLogRecordException == null) 277 corruptedLogRecordException = e; 278 long corruptedRecordPos = 279 channel.position() - buf.limit() - LogRecord.FULL_HEADER_LEN; 280 long nextPos = scanForward(corruptedRecordPos + 1); 281 if (nextPos == 0) 282 { 283 errorLog.info("LOG CORRUPTION AT THE END OF LOG FILE " + 284 logFile.getName()); 285 len = 0; 286 } 287 else 288 { 289 errorLog.info("LOG CORRUPTION IN THE MIDDLE OF LOG FILE " + 290 logFile.getName() + ". Skipping " + 291 (nextPos - corruptedRecordPos) + " bytes" + 292 ". Disabling presumed rollback."); 293 channel.position(nextPos); 294 buf = ByteBuffer.allocate(LogRecord.FULL_HEADER_LEN); 295 channel.read(buf); 296 len = LogRecord.getNextRecordLength(buf, 0); 297 corruptedLogRecordException.disablePresumedRollback = true; 298 } 299 } 300 } 301 302 Iterator iter = activeMultiTmTransactions.values().iterator(); 303 while (iter.hasNext()) 304 { 305 data = (LogRecord.Data) iter.next(); 306 307 switch (data.recordType) 308 { 309 case LogRecord.MULTI_TM_TX_COMMITTED: 310 committedMultiTmTransactions.add(data); 311 break; 312 case LogRecord.TX_PREPARED: 313 inDoubtTransactions.add(data); 314 break; 315 case LogRecord.JCA_TX_PREPARED: 316 inDoubtJcaTransactions.add(data); 317 break; 318 default: 319 errorLog.warn("INCONSISTENT STATE."); 320 break; 321 } 322 } 323 324 if (corruptedLogRecordException != null) 325 throw corruptedLogRecordException; 326 } 327 catch (IOException e) 328 { 329 errorLog.warn("Unexpected exception in recover:", e); 330 } 331 catch (ClassNotFoundException e) 332 { 333 errorLog.warn("Unexpected exception in recover:", e); 334 } 335 try 336 { 337 fis.close(); 338 } 339 catch (IOException e) 340 { 341 throw new RuntimeException (e); 342 } 343 } 344 345 348 public void finishRecovery() 349 { 350 logFile.delete(); 351 } 352 353 361 private long scanForward(long pos) 362 { 363 errorLog.trace("entering scanForward"); 364 RandomAccessFile file = null; 365 366 try 367 { 368 file = new RandomAccessFile (logFile, "r"); 369 370 while (pos + LogRecord.FULL_HEADER_LEN < logFile.length()) 371 { 372 if (match(file, pos, LogRecord.HEADER)) 373 { 374 errorLog.trace("scanForward: match at pos=" + pos); 375 file.seek(pos + LogRecord.HEADER_LEN); 376 short recLen = file.readShort(); 377 errorLog.trace("scanForward: recLen=" + recLen); 378 if (pos + LogRecord.FULL_HEADER_LEN + recLen < logFile.length()) 379 { 380 byte[] buf = new byte[recLen]; 381 file.seek(pos + LogRecord.FULL_HEADER_LEN); 382 file.read(buf, 0, recLen); 383 if (LogRecord.hasValidChecksum(buf)) 384 { 385 errorLog.trace("scanForward: returning " + pos); 386 return pos; 387 } 388 else 389 { 390 errorLog.trace("scanForward: " + 391 "bad checksum in record at pos=" + pos); 392 pos += LogRecord.HEADER_LEN; 393 } 394 } 395 else 396 pos =+ LogRecord.HEADER_LEN; 397 } 398 else 399 pos++; 400 } 401 errorLog.trace("scanForward: returning 0"); 402 return 0; 403 } 404 catch (FileNotFoundException e) 405 { 406 errorLog.warn("Unexpected exception in scanForward:", e); 407 return 0; 408 } 409 catch (IOException e) 410 { 411 errorLog.warn("Unexpected exception in scanForward:", e); 412 return 0; 413 } 414 finally 415 { 416 if (file != null) 417 { 418 try 419 { 420 file.close(); 421 } 422 catch (IOException e) 423 { 424 errorLog.warn("Unexpected exception in scanForward:", e); 425 } 426 } 427 } 428 } 429 430 442 private static boolean match(RandomAccessFile file, long pos, byte[] pattern) 443 throws IOException 444 { 445 for (int i = 0; i < pattern.length; i++) 446 { 447 file.seek(pos + i); 448 if (file.readByte() != pattern[i]) 449 return false; 450 } 451 return true; 452 } 453 454 } 455 | Popular Tags |