1 22 package org.jboss.tm.recovery; 23 24 import java.io.ByteArrayOutputStream ; 25 import java.io.File ; 26 import java.io.IOException ; 27 import java.io.ObjectOutputStream ; 28 import java.io.RandomAccessFile ; 29 import java.nio.ByteBuffer ; 30 import java.nio.channels.FileChannel ; 31 32 import org.jboss.logging.Logger; 33 34 56 class BatchLog 57 implements TxCompletionHandler 58 { 59 62 private static Logger errorLog = Logger.getLogger(BatchLog.class); 63 64 67 private static final int CLEAN_LENGTH = 16 * 1024; 68 69 72 private static byte[] nulls = new byte[CLEAN_LENGTH]; 73 74 77 private File logFile; 78 79 82 private RandomAccessFile os; 83 84 87 private FileChannel channel; 88 89 96 private volatile int numLoggedTransactions; 97 98 104 private volatile int numEndRecords; 105 106 111 private int numLocalTransactionsCompleted; 112 113 121 private boolean markedForRestart; 122 123 128 private Object header; 129 130 134 private long topFp; 135 136 139 private BatchWriter writer; 140 141 145 private ByteBuffer cleanBuffer = ByteBuffer.wrap(nulls); 146 147 157 BatchLog(BatchWriter writer, Object header, File dir, int fileSize) 158 throws IOException 159 { 160 this.writer = writer; 161 this.header = header; 162 logFile = File.createTempFile("TX_RECOVERY_LOG", ".log", dir); 163 os = new RandomAccessFile (logFile, "rw"); 164 165 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 166 ObjectOutputStream oos = new ObjectOutputStream (baos); 167 oos.writeObject(header); 168 byte[] bytes = baos.toByteArray(); 169 170 os.setLength(fileSize); 171 os.writeInt(bytes.length); 172 os.write(bytes); 173 174 channel = os.getChannel(); 175 channel.force(true); 177 topFp = channel.position(); 178 179 cleanUpLogFile(); 180 } 181 182 188 BatchWriter getBatchWriter() 189 { 190 return writer; 191 } 192 193 201 int getPosition() 202 throws IOException 203 { 204 return (int) channel.position(); 205 } 206 207 212 String getFilename() 213 { 214 return logFile.getName(); 215 } 216 217 224 void write(ByteBuffer record, boolean isEndRecord) 225 throws IOException 226 { 227 channel.write(record); 228 if (isEndRecord) 229 { 230 numEndRecords++; 231 232 synchronized (this) 233 { 234 if (markedForRestart == true && 235 numLoggedTransactions == 236 numLocalTransactionsCompleted + numEndRecords) 237 { 238 writer.restartBatchLog(this); 239 } 240 } 241 } 242 else 243 { 244 channel.force(false); 245 numLoggedTransactions++; 246 } 247 } 248 249 267 void write(ByteBuffer [] records, 268 int offset, 269 int length, 270 int numTransactionRecords) 271 throws IOException 272 { 273 channel.write(records, offset, length); 274 275 if (numTransactionRecords > 0) 276 { 277 channel.force(false); 278 numLoggedTransactions += numTransactionRecords; 279 } 280 281 if (numTransactionRecords < length) 282 { 283 numEndRecords += (length - numTransactionRecords); 284 285 synchronized (this) 286 { 287 if (markedForRestart == true && 288 numLoggedTransactions == 289 numLocalTransactionsCompleted + numEndRecords) 290 { 291 writer.restartBatchLog(this); 292 } 293 } 294 } 295 296 } 297 298 306 public void handleTxCompletion(long localTransactionId) 307 { 308 synchronized (this) 309 { 310 numLocalTransactionsCompleted++; 311 if (markedForRestart == true && 312 numLoggedTransactions == 313 numLocalTransactionsCompleted + numEndRecords) 314 { 315 writer.restartBatchLog(this); 316 } 317 } 318 } 319 320 326 void markForRestart() 327 { 328 synchronized (this) 329 { 330 markedForRestart = true; 331 if (numLoggedTransactions == 332 numLocalTransactionsCompleted + numEndRecords) 333 { 334 writer.restartBatchLog(this); 335 } 336 } 337 } 338 339 349 void restart() 350 throws IOException 351 { 352 channel.position(topFp); 355 cleanUpLogFile(); 356 writer.getNextLogs().add(this); 357 } 358 359 363 void cleanUpLogFile() 364 throws IOException 365 { 366 numLoggedTransactions = 0; 367 numLocalTransactionsCompleted = 0; 368 numEndRecords = 0; 369 markedForRestart = false; 370 371 cleanBuffer.limit(cleanBuffer.capacity()); 373 while (channel.position() <= channel.size() - cleanBuffer.limit()) 374 { 375 cleanBuffer.rewind(); 376 channel.write(cleanBuffer); 377 } 378 cleanBuffer.limit((int) (channel.size() - channel.position())); 379 cleanBuffer.rewind(); 380 channel.write(cleanBuffer); 381 channel.force(false); 382 383 channel.position(topFp); 384 } 385 386 390 void close() 391 { 392 errorLog.info("Closing transaction log " + getFilename() + 393 ", numLoggedTransactions=" + numLoggedTransactions + 394 ", numLocalTransactionsCompleted=" + 395 numLocalTransactionsCompleted + 396 ", numEndRecords=" + numEndRecords); 397 try 398 { 399 if (numLoggedTransactions == 400 numLocalTransactionsCompleted + numEndRecords) 401 { 402 channel.position(topFp); 403 channel.truncate(topFp); 404 } 405 os.close(); 406 } 407 catch (IOException e) 408 { 409 errorLog.error("Error closing transaction log " + getFilename(), e); 410 } 411 } 412 413 } 414 | Popular Tags |