1 20 package com.presumo.jms.persistence; 21 22 import com.presumo.jms.message.JmsMessage; 23 import com.presumo.jms.plugin.MessageQueue; 24 import com.presumo.jms.resources.Resources; 25 import com.presumo.util.log.Logger; 26 import com.presumo.util.log.LoggerFactory; 27 28 import java.io.BufferedInputStream ; 29 import java.io.BufferedOutputStream ; 30 import java.io.DataInputStream ; 31 import java.io.DataOutputStream ; 32 import java.io.EOFException ; 33 import java.io.IOException ; 34 import java.io.File ; 35 import java.io.FileInputStream ; 36 import java.io.FileOutputStream ; 37 38 import java.util.HashSet ; 39 import java.util.Iterator ; 40 import java.util.LinkedList ; 41 42 43 47 public class PersistentQueue implements MessageQueue 48 { 49 private File directory; 50 private String file_prefix; 51 private int max_log_file_size; 52 53 54 private File checkPointFile; 55 private File tempCheckPointFile; 56 private File logFile; 57 58 final static String CP_SUFFIX = "_CheckPoint.dat"; 60 final static String TEMP_CP_SUFFIX = "_TempCheckPoint.dat"; 61 final static String LOG_SUFFIX = "_LogFile.dat"; 62 63 private DataOutputStream log_file_dout; 65 private FileOutputStream log_file_fout; 66 private BufferedOutputStream log_file_bout; 67 68 private LinkedList mainQueue; 70 private LinkedList pendingDelete; 71 private HashSet persistentPendingDelete; 72 73 private int version = 0; 75 76 77 78 82 public PersistentQueue(File directory, 83 String file_prefix, 84 int max_log_file_size) 85 { 86 logger.entry("PersistentQueue", new Object [] 87 { directory, file_prefix, new Integer (max_log_file_size)}); 88 89 this.directory = directory; 90 this.file_prefix = file_prefix; 91 this.max_log_file_size = max_log_file_size; 92 93 this.checkPointFile = new File (directory,file_prefix+CP_SUFFIX); 94 this.tempCheckPointFile = new File (directory,file_prefix+TEMP_CP_SUFFIX); 95 this.logFile = new File (directory,file_prefix+LOG_SUFFIX); 96 97 logger.exit("PersistentQueue", this); 98 } 99 100 101 102 106 109 public boolean isPersistent() 110 { 111 return true; 112 } 113 114 118 public synchronized void open() throws IOException 119 { 120 try { 121 logger.entry("open"); 122 if (!directory.exists()) 123 directory.mkdirs(); 124 runToSteadyState(); 125 } 126 finally { 127 logger.exit("open"); 128 } 129 } 130 131 134 public synchronized void close() throws IOException 135 { 136 try { 137 logger.entry("close"); 138 139 if (pendingDelete != null && 141 mainQueue != null && 142 log_file_dout != null && 143 getNumPersistent(mainQueue) == 0 && 144 getNumPersistent(pendingDelete) == 0) { 145 146 if (persistentPendingDelete != null && 147 persistentPendingDelete.size() != 0) { 148 logger.warn("PJMSW6002"); 149 } 150 151 runToSteadyState(); 153 154 closeLogFileIfOpen(); 156 157 if (!logFile.delete()) { 159 throw new IOException ("Couldn't delete file: "+logFile); 160 } 161 162 if (!checkPointFile.delete()) { 164 throw new IOException ("Couldn't delete file: "+checkPointFile); 165 } 166 167 mainQueue = null; 168 pendingDelete = null; 169 } 170 else { 172 closeLogFileIfOpen(); 173 mainQueue = null; 174 pendingDelete = null; 175 } 176 } 177 finally { 178 logger.exit("close"); 179 } 180 } 182 183 188 public synchronized void forceCheckPoint() throws IOException 189 { 190 assertOpen(); 191 runToSteadyState(); 192 } 193 194 198 public synchronized void setMaxLogFileSize(int size) throws IOException 199 { 200 max_log_file_size = size; 201 if (log_file_dout != null) { 202 checkPointIfNeeded(); 203 } 204 } 205 206 211 public synchronized void push(JmsMessage [] msgs) throws IOException 212 { 213 assertOpen(); 214 processOp(new LogFileEntryInsert(msgs)); 215 checkPointIfNeeded(); 216 } 217 218 219 223 public synchronized int getNumPendingDelete() 224 { 225 assertOpen(); 226 return pendingDelete.size(); 227 } 228 229 232 public synchronized int getNumNotPendingDelete() 233 { 234 assertOpen(); 235 return mainQueue.size(); 236 } 237 238 241 public int size() 242 { 243 return getNumNotPendingDelete(); 244 } 245 246 247 253 public synchronized void delete(String [] keys) throws IOException 254 { 255 assertOpen(); 256 processOp(new LogFileEntryDelete(keys)); 257 checkPointIfNeeded(); 258 } 259 260 263 public synchronized JmsMessage [] getNext(int max_num) throws IOException 264 { 265 assertOpen(); 266 int rv_size = mainQueue.size(); 267 if (rv_size > max_num) { 268 rv_size = max_num; 269 } 270 271 JmsMessage [] rv = new JmsMessage[rv_size]; 272 Iterator it = mainQueue.iterator(); 273 for (int i = 0; i < rv.length; i++) { 274 rv[i] = (JmsMessage)it.next(); 275 } 276 277 processOp(new LogFileEntryGetNext(max_num)); 278 checkPointIfNeeded(); 279 return rv; 280 } 281 282 283 public synchronized JmsMessage [] getPendingDelete() throws IOException 284 { 285 assertOpen(); 286 JmsMessage [] temp = new JmsMessage[pendingDelete.size()]; 287 Iterator it = pendingDelete.iterator(); 288 int pos = 0; 289 while(it.hasNext()) { 290 temp[pos++] = (JmsMessage)it.next(); 291 } 292 return temp; 293 } 294 295 298 public void push(JmsMessage msg) throws IOException 299 { 300 push(new JmsMessage[]{msg}); 301 } 302 303 306 public void delete(String key) throws IOException 307 { 308 delete(new String []{key}); 309 } 310 311 312 315 public JmsMessage getNext() throws IOException 316 { 317 JmsMessage [] temp = getNext(1); 318 if (temp.length == 0) { 319 return null; 320 } 321 else { 322 return temp[0]; 323 } 324 } 325 326 327 328 333 public File getDirectory() 334 { 335 return directory; 336 } 337 338 339 343 public String getFilePrefix() 344 { 345 return file_prefix; 346 } 347 348 349 353 public int getMaxLogFileSize() 354 { 355 return max_log_file_size; 356 } 357 358 359 360 364 373 private int getState() 374 { 375 int state = 0; 376 if (checkPointFile.exists()) { 377 state |= 0x4; 378 } 379 if (logFile.exists()) { 380 state |= 0x2; 381 } 382 if (tempCheckPointFile.exists()) { 383 state |= 0x1; 384 } 385 return state; 386 } 387 388 private static final int FRESH_INIT = 0x0; private static final int CREATING_CP = 0x4; private static final int READY_CP = 0x6; private static final int CREATING_TEMP = 0x7; private static final int TEMP_CORRECT_1 = 0x3; private static final int TEMP_CORRECT_2 = 0x1; private static final int COPYING_TO_CP = 0x5; 396 397 private void runToSteadyState() throws IOException 398 { 399 boolean done = false; 400 try { 401 logger.entry("runToSteadyState"); 402 int init_state = getState(); 403 boolean ready_cp_done = init_state != READY_CP; 404 405 while (!done) { 406 int state = getState(); 407 logger.info("PJMSI6002", new Integer (state)); 408 409 switch(state) { 410 case FRESH_INIT: 411 handle_FRESH_INIT(); 412 break; 413 case CREATING_CP: 414 handle_CREATING_CP(); 415 break; 416 case READY_CP: 417 if (!ready_cp_done) { 418 handle_READY_CP(); 419 ready_cp_done=true; 420 } 421 else { 422 done = true; 423 } 424 break; 425 case CREATING_TEMP: 426 handle_CREATING_TEMP(); 427 if (getState() == READY_CP) { 428 ready_cp_done = false; 429 } 430 break; 431 case TEMP_CORRECT_1: 432 handle_TEMP_CORRECT_1(); 433 break; 434 case TEMP_CORRECT_2: 435 handle_TEMP_CORRECT_2(); 436 break; 437 case COPYING_TO_CP: 438 handle_COPYING_TO_CP(); 439 break; 440 default: 441 RuntimeException re = new RuntimeException ("Invalid state:"+state); 442 logger.exception(re); 443 throw re; 444 } 445 } 446 } 447 finally { 448 logger.exit("runToSteadyState"); 449 } 450 } 451 452 private void handle_FRESH_INIT() throws IOException 456 { 457 mainQueue = new LinkedList (); 460 pendingDelete = new LinkedList (); 461 persistentPendingDelete = new HashSet (); 462 writeCheckPoint(checkPointFile); 463 } 464 465 private void handle_CREATING_CP() throws IOException 466 { 467 try { 468 loadFromCheckPointFileIfNeeded(checkPointFile); 471 openLogFile(); 472 } 473 catch (EOFException e) { 474 logger.exception(e); 476 if (!checkPointFile.delete()) { 477 IOException ioe = new IOException ("Couldn't delete "+checkPointFile); 478 logger.exception(ioe); 479 throw ioe; 480 } 481 } 482 } 483 484 private void handle_READY_CP() throws IOException 485 { 486 closeLogFileIfOpen(); 487 loadFromLogFileIfNeeded(); 488 writeCheckPoint(tempCheckPointFile); 489 } 490 491 private void handle_CREATING_TEMP() throws IOException 492 { 493 try { 494 loadFromCheckPointFileIfNeeded(tempCheckPointFile); 495 if (!checkPointFile.delete()) { 497 throw new IOException ("Couldn't delete file: "+checkPointFile); 498 } 499 } 500 catch (EOFException e) { 501 logger.exception(e); 502 if (!tempCheckPointFile.delete()) { 504 throw new IOException ("Couldn't delete file: "+tempCheckPointFile); 505 } 506 } 507 } 508 509 private void handle_TEMP_CORRECT_1() throws IOException 510 { 511 loadFromCheckPointFileIfNeeded(tempCheckPointFile); 512 if (!logFile.delete()) { 513 throw new IOException ("Couldn't delete file: "+logFile); 514 } 515 } 516 517 private void handle_TEMP_CORRECT_2() throws IOException 518 { 519 loadFromCheckPointFileIfNeeded(tempCheckPointFile); 520 writeCheckPoint(checkPointFile); 521 } 522 523 private void handle_COPYING_TO_CP() throws IOException 524 { 525 try { 526 loadFromCheckPointFileIfNeeded(checkPointFile); 527 if (!tempCheckPointFile.delete()) { 529 throw new IOException ("Couldn't delete file: "+tempCheckPointFile); 530 } 531 } 532 catch (EOFException e) { 533 logger.exception(e); 534 if (!checkPointFile.delete()) { 536 throw new IOException ("Couldn't delete file: "+checkPointFile); 537 } 538 } 539 } 540 541 545 546 private void writeCheckPoint(File cpf) throws IOException 547 { 548 CheckPointFile.write(cpf, 549 mainQueue, 550 pendingDelete); 551 552 } 553 554 555 private void loadFromCheckPointFileIfNeeded(File cpf) throws IOException 556 { 557 if (this.mainQueue == null || 558 this.pendingDelete == null || 559 this.persistentPendingDelete == null) { 560 LinkedList tempMain = new LinkedList (); 561 LinkedList tempPend = new LinkedList (); 562 HashSet tempPer = new HashSet (); 563 CheckPointFile.read(cpf, 564 tempMain, 565 tempPend, 566 tempPer); 567 568 this.mainQueue = tempMain; 569 this.pendingDelete = tempPend; 570 this.persistentPendingDelete = tempPer; 571 } 572 } 573 574 575 private void loadFromLogFileIfNeeded() throws IOException 576 { 577 if (mainQueue == null || 578 pendingDelete == null || 579 persistentPendingDelete == null) { 580 581 LinkedList tempMain = new LinkedList (); 582 LinkedList tempPend = new LinkedList (); 583 HashSet tempPer = new HashSet (); 584 585 CheckPointFile.read(checkPointFile, 586 tempMain, 587 tempPend, 588 tempPer); 589 590 boolean done = false; 591 FileInputStream fin = null; 592 BufferedInputStream bin = null; 593 DataInputStream din = null; 594 595 try { 596 fin = new FileInputStream (logFile); 597 bin = new BufferedInputStream (fin,1024); 598 din = new DataInputStream (bin); 599 600 this.version = din.readInt(); 601 602 while(!done) { 603 try { 604 LogFileEntry ent; 605 ent = LogFileEntry.deserialize(din); 606 ent.restore(tempMain,tempPend,tempPer); 607 } 608 catch (EOFException e) { 609 done = true; 610 } 611 } 612 } 613 finally { 614 if (din != null) { 615 din.close(); 616 } 617 if (bin != null) { 618 bin.close(); 619 } 620 if (fin != null) { 621 fin.close(); 622 } 623 } 624 mainQueue = tempMain; 625 pendingDelete = tempPend; 626 persistentPendingDelete = tempPer; 627 628 } 629 } 630 631 632 private void checkPointIfNeeded() throws IOException 633 { 634 if (logFile.length() > max_log_file_size) { 635 runToSteadyState(); 636 } 637 } 638 639 private void processOp(LogFileEntry ent) throws IOException 640 { 641 boolean written = ent.writeAndProcess(mainQueue, 642 pendingDelete, 643 persistentPendingDelete, 644 log_file_dout); 645 if (written) { 646 logger.debug("sync"); 647 flushLogFile(); 648 } 649 } 650 651 652 private void assertOpen() 653 { 654 if (log_file_dout == null) { 655 throw new IllegalArgumentException ("Cannot use queue which isn't open"); 656 } 657 } 658 659 660 private void openLogFile() throws IOException 661 { 662 try { 663 log_file_fout = new FileOutputStream (logFile); 664 log_file_bout = new BufferedOutputStream (log_file_fout,1024); 665 log_file_dout = new DataOutputStream (log_file_bout); 666 log_file_dout.writeInt(this.version); flushLogFile(); 668 } 669 catch (IOException io) { 670 closeLogFileIfOpen(); 671 throw io; 672 } 673 catch (RuntimeException r) { 674 closeLogFileIfOpen(); 675 throw r; 676 } 677 678 } 679 680 private void closeLogFileIfOpen() throws IOException 681 { 682 if (log_file_dout != null) { 683 log_file_dout.close(); 684 log_file_dout = null; 685 } 686 if (log_file_bout != null) { 687 log_file_bout.close(); 688 log_file_bout = null; 689 } 690 if (log_file_fout != null) { 691 log_file_fout.close(); 692 log_file_fout = null; 693 } 694 } 695 696 private void flushLogFile() throws IOException 697 { 698 log_file_dout.flush(); 699 log_file_fout.getFD().sync(); 700 } 701 702 703 706 static int getNumPersistent(JmsMessage [] list) 707 { 708 int num = 0; 709 for (int i = 0; i < list.length; i++) { 710 if (isMessagePersistent(list[i])) { 711 num++; 712 } 713 } 714 return num; 715 } 716 717 720 static int getNumPersistent(LinkedList list) 721 { 722 Iterator it = list.iterator(); 723 int num = 0; 724 while (it.hasNext()) { 725 JmsMessage msg = (JmsMessage)it.next(); 726 if (isMessagePersistent(msg)) 727 num++; 728 } 729 return num; 730 } 731 732 735 static boolean isMessagePersistent(JmsMessage msg) 736 { 737 return msg.getJMSDeliveryMode() == javax.jms.DeliveryMode.PERSISTENT; 738 } 739 740 private static Logger logger = 742 LoggerFactory.getLogger(PersistentQueue.class, Resources.getBundle()); 743 745 } 746 | Popular Tags |