1 25 27 package de.nava.informa.impl.hibernate; 28 29 import java.io.FileInputStream ; 30 import java.io.FileNotFoundException ; 31 import java.io.FileOutputStream ; 32 import java.io.IOException ; 33 import java.io.InputStream ; 34 import java.net.URL ; 35 import java.util.ArrayList ; 36 import java.util.Iterator ; 37 import java.util.List ; 38 import java.util.Properties ; 39 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 import de.nava.informa.core.ChannelIF; 44 import de.nava.informa.core.ItemIF; 45 import de.nava.informa.utils.InformaTestCase; 46 import de.nava.informa.utils.PersistChanGrpMgr; 47 import de.nava.informa.utils.PersistChanGrpMgrObserverIF; 48 import de.nava.informa.utils.RssUrlTestData; 49 50 55 public final class TestHibernateStressTest extends InformaTestCase { 56 57 static Log logger = LogFactory.getLog(TestHibernateStressTest.class); 58 59 private SessionHandler scaleSessHandler; 60 protected PersistChanGrpMgr managers[]; 61 int nManagers; 62 private int nChans; 63 private int itemMax; 64 boolean activeSemaphore; 65 protected List itemLog; 66 private List channelLog; 67 private ItemDeleter itemDeleterThreads[]; 68 69 private String scaleDbPath; 70 71 76 public TestHibernateStressTest(String testname) { 77 super("TestHibernateStressTest", testname); 78 System.setProperty("sun.net.client.defaultReadTimeout", "10000"); 79 System.setProperty("sun.net.client.defaultConnectTimeout", "10000"); 80 } 81 82 88 public void testgetNVerify() throws Exception { 89 informaGetNVerify(10, 5, 600); 90 } 91 92 97 public void testAddDelete() throws Exception { 98 informaAddDelete(5, 5, 100, 3, 50); 99 } 100 101 111 private void informaGetNVerify(int mancount, int chanCount, int itemCount) throws Exception { 112 this.nManagers = mancount; 113 this.nChans = chanCount; 114 this.itemMax = itemCount; 115 scaleDbPath = "test/scaletest"; 116 117 initLoggers(); 118 openScaleDatabase(true); createScaleChannelGroups(); 120 runUntilNitems(itemCount); 121 closeScaleDatabase(); 122 123 clearChannelGroups(); 125 openScaleDatabase(false); restoreScaleChannelGroups(); 127 verifyChannelLogEntryValidity(); 128 verifyItemLogValidity(); 129 } 130 131 141 private void informaAddDelete(int mancount, int chanCount, int phase1itemCount, 142 int deleterThreadsCount, int phase2itemCount) throws Exception { 143 144 this.nManagers = mancount; 145 this.nChans = chanCount; 146 scaleDbPath = "test/scaletest"; 147 148 initLoggers(); 149 150 logger.debug("Start Phase 1"); 152 this.itemMax = phase1itemCount; 153 openScaleDatabase(true); createScaleChannelGroups(); runUntilNitems(phase1itemCount); 157 logger.debug("Start Phase 2"); 159 createDeleterThreads(deleterThreadsCount); runUntilNitems(phase2itemCount); 162 logger.debug("Start Phase 3"); 164 waitForDeleterThreadsToBeDone(); 165 166 logger.debug("Test complete."); 167 } 168 169 174 private void createDeleterThreads(int count) { 175 itemDeleterThreads = new ItemDeleter[count]; 176 for (int i = 0; i < count; i++) { 177 itemDeleterThreads[i] = new ItemDeleter(); 178 itemDeleterThreads[i].start(); 179 itemDeleterThreads[i].setName("Deleter: " + i); 180 } 181 } 182 183 189 private void waitForDeleterThreadsToBeDone() { 190 for (int i = 0; i < itemDeleterThreads.length; i++) { 207 itemDeleterThreads[i].waitForFinish(); 208 } 209 } 210 211 215 private void verifyChannelLogEntryValidity() { 216 synchronized (channelLog) { 217 Iterator chanLogIter = channelLog.iterator(); 218 while (chanLogIter.hasNext()) { 219 ChannelLogEntry next = (ChannelLogEntry) chanLogIter.next(); 220 assertTrue("we got channel from disk which we didn't see from the net:" + next.theKey, 224 !(next.recordCounter == 0 && next.verifyCounter > 0)); 225 226 assertTrue("we got a channel from the net, which we then didn't see from disk:" 230 + next.theKey, !(next.recordCounter > 0 && next.verifyCounter == 0)); 231 232 assertTrue("we got a channel from disk more often than from memory:" + next.theKey, 236 !(next.recordCounter < next.verifyCounter)); 237 238 assertTrue("we got a channel from disk more than once:" + next.theKey, 240 !(next.verifyCounter > 1)); 241 242 } 243 } 244 } 245 246 253 private void verifyItemLogValidity() { 254 int counter = 0; 255 synchronized (itemLog) { 256 Iterator itemLogIter = itemLog.iterator(); 257 while (itemLogIter.hasNext() && counter < itemMax) { 258 counter++; 259 ItemLogEntry next = (ItemLogEntry) itemLogIter.next(); 260 String msgHdr = next.theItem.getChannel() + ":" + next.theKey; 261 262 final boolean diskNotNet = next.recordCounter == 0 && next.verifyCounter > 0; 264 if (diskNotNet) { 265 assertTrue("we saw an item from disk which we didn't see from the net: " + msgHdr, 266 !diskNotNet); 267 268 } 269 270 final boolean netNotDisk = next.recordCounter > 0 && next.verifyCounter == 0; 271 if (netNotDisk) { 272 assertTrue("we saw an item from the Net which we didn't see from disk: " + msgHdr, 273 !netNotDisk); 274 } 275 276 } 277 } 278 } 279 280 283 private void clearChannelGroups() { 284 deActivateProcesses(); 285 managers = null; 286 } 287 288 291 private void createScaleChannelGroups() { 292 managers = new PersistChanGrpMgr[nManagers]; 294 int channelIndex = 0; 295 296 for (int i = 0; i < nManagers; i++) { 297 managers[i] = new PersistChanGrpMgr(scaleSessHandler, true); 298 assertNotNull(managers[i]); 299 managers[i].createGroup(generateChanGrpName(i)); 300 managers[i].setGlobalObserver(new ChannelLogRecordObserver()); 301 for (int chans = 0; chans < nChans; chans++) { 302 Channel nextChan = managers[i].addChannel(RssUrlTestData.get(channelIndex++)); 303 managers[i].notifyItems(nextChan); 304 } 305 managers[i].notifyChannels(); 306 } 307 } 308 309 317 private void restoreScaleChannelGroups() throws Exception { 318 managers = new PersistChanGrpMgr[nManagers]; 319 int channelIndex = 0; 320 321 for (int i = 0; i < nManagers; i++) { 322 managers[i] = new PersistChanGrpMgr(scaleSessHandler, true); 323 assertNotNull(managers[i]); 324 managers[i].createGroup(generateChanGrpName(i)); 325 managers[i].setGlobalObserver(new informaLogObserver()); 326 for (int chans = 0; chans < nChans; chans++) { 327 Channel nextChan = managers[i].addChannel(RssUrlTestData.get(channelIndex++)); 328 managers[i].notifyItems(nextChan); 329 } 330 managers[i].notifyChannels(); 331 } 332 } 333 334 341 private void runUntilNitems(int N) throws Exception { 342 setActiveSemaphor(true); 343 activateProcesses(); 344 while (getActiveSemaphor()) { 345 Thread.sleep(1000); 346 setActiveSemaphor(itemLog.size() < N); 347 } 348 deActivateProcesses(); 349 } 350 351 357 synchronized boolean getActiveSemaphor() { 358 return activeSemaphore; 359 } 360 361 367 synchronized private boolean setActiveSemaphor(boolean newval) { 368 boolean oldval = activeSemaphore; 369 activeSemaphore = newval; 370 return oldval; 371 } 372 373 376 private void activateProcesses() { 377 for (int i = 0; i < nManagers; i++) { 378 managers[i].activate(); 379 } 380 } 381 382 385 private void deActivateProcesses() { 386 for (int i = 0; i < nManagers; i++) { 387 managers[i].deActivate(); 388 } 389 } 390 391 398 private boolean getVirginDb() throws FileNotFoundException { 399 boolean fileOneIsOK, fileTwoIsOK; 400 fileTwoIsOK = copyFiles("informa.script", scaleDbPath + ".script"); 401 fileOneIsOK = copyFiles("informa.properties", scaleDbPath + ".properties"); 402 return fileOneIsOK && fileTwoIsOK; 403 } 404 405 413 private boolean copyFiles(String src, String dest) throws FileNotFoundException { 414 boolean result = false; 415 InputStream srcStream = new FileInputStream (src); 416 if (srcStream != null) { 417 try { 418 FileOutputStream dstStream = new FileOutputStream (dest); 420 int ch; while ((ch = srcStream.read()) != -1) { 422 dstStream.write(ch); 423 } 424 srcStream.close(); 425 dstStream.close(); 426 result = true; 427 } catch (IOException e) { 428 result = false; 429 } 430 } 431 if (!result) fail("Failed to copy File " + src + " to " + dest); 432 return result; 433 } 434 435 441 void openScaleDatabase(boolean virgin) throws Exception { 442 if (virgin) getVirginDb(); 443 444 Properties hibernateProps = new Properties (); 445 hibernateProps.setProperty("hibernate.connection.url", "jdbc:hsqldb:" + scaleDbPath); 446 scaleSessHandler = SessionHandler.getInstance(hibernateProps); 447 assertNotNull(scaleSessHandler); 448 } 449 450 455 private void closeScaleDatabase() throws Exception { 456 scaleSessHandler.getSession().flush(); 457 scaleSessHandler.getSession().close(); 458 } 459 460 466 private String generateChanGrpName(int i) { 467 return "Channel Group" + i; 468 } 469 470 487 void chanLogUpdate(ChannelIF channel, boolean recordMode) { 488 assertNotNull("ChannelRetrieved returned Null", channel); 489 URL locU = channel.getLocation(); 490 String key = locU == null ? "<not yet>" : locU.toString(); 491 492 Iterator channelIt = channelLog.iterator(); 493 ChannelLogEntry found = null; 494 495 while (channelIt.hasNext()) { 498 ChannelLogEntry tmp = (ChannelLogEntry) channelIt.next(); 499 if (key.equals(tmp.theKey)) { 500 found = tmp; 501 break; 502 } 503 } 504 if (found == null) { 505 found = new ChannelLogEntry(channel, key); 506 synchronized (channelLog) { 507 channelLog.add(found); 508 } 509 } 510 511 if (recordMode) { 514 found.recordCounter++; 515 } else { 516 found.verifyCounter++; 517 } 518 } 519 520 526 void itemLogUpdate(Item item, boolean recordMode) { 527 assertNotNull("itemRetrieved returned Null", item); 528 String atitle = item.getTitle(); 529 530 Iterator itemIt = itemLog.iterator(); 531 ItemLogEntry found = null; 532 533 while (itemIt.hasNext()) { 535 ItemLogEntry tmp = (ItemLogEntry) itemIt.next(); 536 if (atitle.equals(tmp.theKey)) { 537 found = tmp; 538 break; 539 } 540 } 541 if (found == null) { 542 found = new ItemLogEntry(item, atitle); 543 synchronized (itemLog) { 544 itemLog.add(found); 545 } 546 } 547 548 if (recordMode) { 551 found.recordCounter++; 552 } else { 553 found.verifyCounter++; 554 } 555 } 556 557 560 private void initLoggers() { 561 itemLog = new ArrayList (); 562 channelLog = new ArrayList (); 563 } 564 565 569 573 class ChannelLogRecordObserver implements PersistChanGrpMgrObserverIF { 574 575 580 public void channelRetrieved(ChannelIF chan) { 581 chanLogUpdate(chan, true); 582 } 583 584 589 public void itemAdded(ItemIF newItem) { 590 itemLogUpdate((Item) newItem, true); 591 } 592 593 600 public void pollingNow(String name, int count, boolean now) { 601 } 603 } 604 605 609 class informaLogObserver implements PersistChanGrpMgrObserverIF { 610 611 617 public void channelRetrieved(ChannelIF chan) { 618 chanLogUpdate(chan, false); 619 } 620 621 626 public void itemAdded(ItemIF newItem) { 627 itemLogUpdate((Item) newItem, false); 628 } 629 630 637 public void pollingNow(String name, int count, boolean now) { 638 } 640 641 } 642 643 646 class ChannelLogEntry { 647 648 int recordCounter; 649 650 int verifyCounter; 651 652 String theKey; 653 654 ChannelIF theChan; 655 656 ChannelLogEntry(ChannelIF achan, String akey) { 657 theChan = achan; 658 theKey = akey; 659 recordCounter = 0; 660 verifyCounter = 0; 661 } 662 663 668 public String toString() { 669 return "rec/ver: " + recordCounter + "/" + verifyCounter + ":" + theKey; 670 } 671 } 672 673 676 public class ItemLogEntry { 677 678 Item theItem; 679 String theKey; 680 String channelName; 681 int recordCounter; 682 int verifyCounter; 683 boolean deleted; 684 boolean unread; 685 686 ItemLogEntry(Item anitem, String akey) { 687 theItem = anitem; 688 theKey = akey; 689 channelName = anitem.getChannel().getTitle(); 690 recordCounter = 0; 691 verifyCounter = 0; 692 deleted = false; 693 unread = true; 694 } 695 696 int getPersistChanGrpMgrIdx() { 697 int result = -1; 698 for (int i = 0; i < managers.length; i++) { 699 if (managers[i].hasChannel((Channel) theItem.getChannel())) { 700 assertTrue("Same channel in two groups", result == -1); 701 result = i; 702 } 703 } 704 assertTrue("Channel not found in any group", result != -1); 705 return result; 706 } 707 708 713 public String toString() { 714 return "rec/ver: " + recordCounter + "/" + verifyCounter + ":" + theKey + "(" + channelName 715 + ")"; 716 } 717 718 } 719 720 724 class ItemDeleter extends Thread { 725 726 boolean keepGoing; 727 728 731 ItemDeleter() { 732 keepGoing = true; 733 } 734 735 738 public synchronized void waitForFinish() { 739 try { 740 while (keepGoing) { 741 wait(); 742 } 743 } catch (InterruptedException e) { 744 } 747 } 748 749 756 public void run() { 757 758 while (keepGoing) { 760 761 763 boolean foundOne = false; 764 ItemLogEntry[] copyItemLog = (ItemLogEntry[]) itemLog.toArray(new ItemLogEntry[0]); 765 766 for (int i = 0; i < copyItemLog.length; i++) { 767 ItemLogEntry entry = copyItemLog[i]; 768 synchronized (entry) { 769 770 if (!entry.deleted) { 771 PersistChanGrpMgr theGrp = managers[entry.getPersistChanGrpMgrIdx()]; 773 774 Channel theChan = (Channel) entry.theItem.getChannel(); 775 theGrp.deActivate(); 776 int before = theGrp.getItemCount(theChan); 777 int after = theGrp.deleteItemFromChannel(theChan, entry.theItem); 778 779 if (getActiveSemaphor()) theGrp.activate(); 780 logger.debug("Deleted. Count before =" + before + " /after: " + after); 781 assertEquals("Not the rigth number of items", before, after + 1); 782 783 entry.deleted = true; 784 foundOne = true; 785 } 786 } 787 try { 788 Thread.sleep(50); 789 } catch (InterruptedException e) { 790 return; 791 } 792 } 793 if (!foundOne) keepGoing = false; 795 796 } 797 synchronized (this) { 800 notifyAll(); 801 } 802 } 803 } 804 } | Popular Tags |