1 22 package org.jboss.tm.recovery; 23 24 import EDU.oswego.cs.dl.util.concurrent.Latch; 25 26 import java.io.File ; 27 import java.io.IOException ; 28 import java.nio.ByteBuffer ; 29 import java.util.ArrayList ; 30 import java.util.Collections ; 31 import java.util.List ; 32 33 import org.jboss.logging.Logger; 34 35 52 class BatchWriter implements Runnable 53 { 54 55 56 private static Logger errorLog = Logger.getLogger(BatchWriter.class); 57 58 59 private static boolean traceEnabled = errorLog.isTraceEnabled(); 60 61 62 private final File dir; 63 64 65 private final int initialCapacity; 66 67 68 private BatchLog log; 69 70 71 private final int fileSize; 72 73 74 private Object header; 75 76 77 private Exception abort; 78 79 80 private boolean running = true; 81 82 83 private final List nextLogs = Collections.synchronizedList(new ArrayList ()); 84 85 86 private Object batchLock = new Object (); 87 88 89 private ArrayList currentQueue; 90 91 92 private Latch currentLatch; 93 94 95 private LogRestarter logRestarter; 96 97 112 BatchWriter(Object header, int initialCapacity, 113 File dir, int fileSize, LogRestarter logRestarter) 114 throws IOException 115 { 116 this.header = header; 117 this.fileSize = fileSize; 118 this.initialCapacity = initialCapacity; 119 this.currentQueue = new ArrayList (initialCapacity); 120 this.currentLatch = new Latch(); 121 this.dir = dir; 122 this.logRestarter = logRestarter; 123 log = new BatchLog(this, header, dir, fileSize); 124 nextLogs.add(new BatchLog(this, header, dir, fileSize)); 125 } 126 127 135 void restartBatchLog(BatchLog logToRestart) 136 { 137 logRestarter.add(logToRestart); 138 } 139 140 150 List getNextLogs() 151 { 152 return nextLogs; 153 } 154 155 void clearAbort() 157 { 158 abort = null; 159 } 160 161 164 void stop() 165 { 166 synchronized (batchLock) 167 { 168 running = false; 169 batchLock.notifyAll(); 170 } 171 } 172 173 186 TxCompletionHandler addBatch(ByteBuffer buffer, boolean expectEndRecord) 187 { 188 PendingWriteRequest request = null; 189 190 if (traceEnabled) 191 { 192 errorLog.trace("Transaction log record:" + 193 HexDump.fromBuffer(buffer.array())); 194 errorLog.trace(LogRecord.toString(buffer)); 195 } 196 197 synchronized (batchLock) 198 { 199 request = new PendingWriteRequest(buffer, 200 currentLatch, 201 expectEndRecord); 202 currentQueue.add(request); 203 batchLock.notify(); 204 } 205 206 TxCompletionHandler completionHandler = request.waitTilDone(); 207 208 if (!expectEndRecord) 209 return completionHandler; 210 else 211 return new TransactionCompletionLogger((BatchLog) completionHandler); 212 } 213 214 229 void addBatch(ByteBuffer buffer, BatchLog destinationLog) 230 { 231 PendingWriteRequest receipt = null; 232 233 if (traceEnabled) 234 { 235 errorLog.trace("Transaction Log record:" + 236 HexDump.fromBuffer(buffer.array())); 237 errorLog.trace(LogRecord.toString(buffer)); 238 } 239 240 synchronized (batchLock) 241 { 242 receipt = new PendingWriteRequest(buffer, 243 currentLatch, 244 destinationLog); 245 currentQueue.add(receipt); 246 batchLock.notify(); 247 } 248 } 249 250 253 public void run() 254 { 255 ArrayList work = null; 256 Latch workLatch = null; 257 258 while (running) 259 { 260 synchronized (batchLock) 261 { 262 if (currentQueue.size() > 0) 263 { 264 if (work == null) 265 work = new ArrayList (initialCapacity); 266 267 270 ArrayList tmp = work; 271 work = currentQueue; 272 currentQueue = tmp; 273 workLatch = currentLatch; 274 currentLatch = new Latch(); 275 } 276 else 277 { 278 try 280 { 281 batchLock.wait(); 282 } 283 catch (InterruptedException ignored) 284 { 285 if (!running) break; 286 Thread.interrupted(); } 288 continue; 289 } 290 } 291 292 try 293 { 294 doWork(work); 296 } 297 catch (Exception e) 298 { 299 break; 300 } 301 finally 302 { 303 workLatch.release(); 305 work.clear(); 306 } 307 } 308 cleanup(); 309 } 310 311 317 private void doWork(ArrayList work) 318 { 319 if (abort != null) 320 { 321 for (int i = 0; i < work.size(); i++) 322 { 323 PendingWriteRequest request = (PendingWriteRequest) work.get(i); 324 request.setFailure(abort); 325 } 326 return; 327 } 328 329 ByteBuffer [] records = new ByteBuffer [work.size()]; 330 int offset = 0; 331 try 332 { 333 int length = records.length; 334 int usedSize = log.getPosition(); 335 int numTransactions; 336 boolean done = false; 337 338 while (!done) 339 { 340 int j; 341 numTransactions = 0; 342 343 for (int i = j = offset; i < length; i++) 344 { 345 PendingWriteRequest request = (PendingWriteRequest) work.get(i); 346 int type = request.getType(); 347 records[j] = request.getBuffer(); 348 if (type != PendingWriteRequest.TYPE_END) 349 { 350 usedSize += records[j].remaining(); 351 if (type == PendingWriteRequest.TYPE_TX_MULTI_TM) 352 usedSize += LogRecord.TX_END_LEN; 353 354 if (usedSize > fileSize) 355 { 356 length = i; 358 } 359 else 360 { 361 numTransactions++; 362 j++; 363 } 364 } 365 else 366 { 367 BatchLog requestedLog = request.getLogger(); 369 370 if (requestedLog != log) 371 { 372 requestedLog.write(records[j], true); 374 } 376 else 377 { 378 j++; 380 } 381 } 382 383 } 384 done = (length == records.length); 385 length = length - offset; 386 log.write(records, offset, j - offset, numTransactions); 387 setCompletionHandler(offset, length, work); 388 if (!done) 389 { 390 restart(); 391 offset = offset + length; 392 length = records.length; 393 usedSize = log.getPosition(); 394 } 395 } 396 } 397 catch (IOException failure) 398 { 399 for (int i = offset; i < records.length - offset; i++) 400 { 401 PendingWriteRequest request = (PendingWriteRequest) work.get(i); 402 request.setFailure(failure); 403 } 404 if (abort == null) 405 restart(); 406 } 407 } 408 409 419 private void setCompletionHandler(int offset, int length, ArrayList work) 420 { 421 for (int i = offset; i < length; i++) 422 { 423 PendingWriteRequest request = (PendingWriteRequest) work.get(i); 424 if (request.getType() != PendingWriteRequest.TYPE_END) 425 request.setCompletionHandler(log); 426 } 427 } 428 429 432 private void cleanup() 433 { 434 synchronized (nextLogs) 435 { 436 437 for (int i = 0; i < nextLogs.size(); i++) 438 { 439 BatchLog nextLog = (BatchLog) nextLogs.get(i); 440 nextLog.close(); 441 } 442 } 443 log.close(); 444 } 445 446 449 private void restart() 450 { 451 log.markForRestart(); 452 453 if (nextLogs.size() > 0) 454 log = (BatchLog) nextLogs.remove(0); 455 else 456 { 457 try 458 { 459 log = new BatchLog(this, header, dir, fileSize); 460 } 461 catch (IOException e) 462 { 463 abort = new Exception ("FAILED TO RESTART RECOVERY LOG " + 464 "AFTER BEING FULL", e); 465 } 466 } 467 } 468 469 } 470 | Popular Tags |