KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > de > nava > informa > impl > hibernate > TestHibernateStressTest


1 // Informa -- RSS Library for Java
2
// Copyright (c) 2002, 2003 by Niko Schmuck
3
//
4
// Niko Schmuck
5
// http://sourceforge.net/projects/informa
6
// mailto:niko_schmuck@users.sourceforge.net
7
//
8
// This library is free software.
9
//
10
// You may redistribute it and/or modify it under the terms of the GNU
11
// Lesser General Public License as published by the Free Software Foundation.
12
//
13
// Version 2.1 of the license should be included with this distribution in
14
// the file LICENSE. If the license is not included with this distribution,
15
// you may find a copy at the FSF web site at 'www.gnu.org' or 'www.fsf.org',
16
// or you may write to the Free Software Foundation, 675 Mass Ave, Cambridge,
17
// MA 02139 USA.
18
//
19
// This library is distributed in the hope that it will be useful,
20
// but WITHOUT ANY WARRANTY; without even the implied waranty of
21
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22
// Lesser General Public License for more details.
23
//
24

25 // $Id: TestHibernateStressTest.java,v 1.10 2004/09/02 09:19:15 spyromus Exp $
26

27 package de.nava.informa.impl.hibernate;
28
29 import java.io.FileInputStream JavaDoc;
30 import java.io.FileNotFoundException JavaDoc;
31 import java.io.FileOutputStream JavaDoc;
32 import java.io.IOException JavaDoc;
33 import java.io.InputStream JavaDoc;
34 import java.net.URL JavaDoc;
35 import java.util.ArrayList JavaDoc;
36 import java.util.Iterator JavaDoc;
37 import java.util.List JavaDoc;
38 import java.util.Properties JavaDoc;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43 import de.nava.informa.core.ChannelIF;
44 import de.nava.informa.core.ItemIF;
45 import de.nava.informa.utils.InformaTestCase;
46 import de.nava.informa.utils.PersistChanGrpMgr;
47 import de.nava.informa.utils.PersistChanGrpMgrObserverIF;
48 import de.nava.informa.utils.RssUrlTestData;
49
50 /**
51  * Stress test of Informa's hibernate backend.
52  *
53  * @author Pito Salas
54  */

55 public final class TestHibernateStressTest extends InformaTestCase {
56
57   static Log logger = LogFactory.getLog(TestHibernateStressTest.class);
58
59   private SessionHandler scaleSessHandler;
60   protected PersistChanGrpMgr managers[];
61   int nManagers;
62   private int nChans;
63   private int itemMax;
64   boolean activeSemaphore;
65   protected List JavaDoc itemLog;
66   private List JavaDoc channelLog;
67   private ItemDeleter itemDeleterThreads[];
68
69   private String JavaDoc scaleDbPath;
70
71   /**
72    * Constructor of this test case
73    *
74    * @param testname
75    */

76   public TestHibernateStressTest(String JavaDoc testname) {
77     super("TestHibernateStressTest", testname);
78     System.setProperty("sun.net.client.defaultReadTimeout", "10000");
79     System.setProperty("sun.net.client.defaultConnectTimeout", "10000");
80   }
81
82   /**
83    * Basic Stress test. Simply Run a bunch of PersistentChannelGroups in parallel for a while and
84    * then verify that the disk info matches what we found the first time through.
85    *
86    * @throws Exception
87    */

88   public void testgetNVerify() throws Exception JavaDoc {
89     informaGetNVerify(10, 5, 600);
90   }
91
92   /**
93    * Add/Delete Stress Test.
94    *
95    * @throws Exception
96    */

97   public void testAddDelete() throws Exception JavaDoc {
98     informaAddDelete(5, 5, 100, 3, 50);
99   }
100
101   /**
102    * Baseline test: Get a bunch of feeds from the internet, persist them, and close the database,
103    * reopen it, and see if all the information is as expected.
104    *
105    * @param mancount - Number of PersistentChannelGroupManagers to create for the test
106    * @param chanCount - Number of channels in each one
107    * @param itemCount - Number of items total to fetch from internet before moving to verification
108    * phase
109    * @throws Exception
110    */

111   private void informaGetNVerify(int mancount, int chanCount, int itemCount) throws Exception JavaDoc {
112     this.nManagers = mancount;
113     this.nChans = chanCount;
114     this.itemMax = itemCount;
115     scaleDbPath = "test/scaletest";
116
117     initLoggers();
118     openScaleDatabase(true); // Create new virgin database
119
createScaleChannelGroups();
120     runUntilNitems(itemCount);
121     closeScaleDatabase();
122
123     // now, let's check that we have exactly the right stuff when we re-open
124
clearChannelGroups();
125     openScaleDatabase(false); // Use database that was created above
126
restoreScaleChannelGroups();
127     verifyChannelLogEntryValidity();
128     verifyItemLogValidity();
129   }
130
131   /**
132    * More sophisticated stress test which simultanously adds and deletes Articles and Channels.
133    *
134    * @param mancount Number of PersistChanGrpMgrs involved
135    * @param chanCount Number of Informa Channels in each one
136    * @param phase1itemCount Number of items logged before moving to Phase 2
137    * @param deleterThreadsCount Number of deleter Threads
138    * @param phase2itemCount Number of items logged before moving to Phase 3
139    * @throws Exception
140    */

141   private void informaAddDelete(int mancount, int chanCount, int phase1itemCount,
142       int deleterThreadsCount, int phase2itemCount) throws Exception JavaDoc {
143
144     this.nManagers = mancount;
145     this.nChans = chanCount;
146     scaleDbPath = "test/scaletest";
147
148     initLoggers();
149
150     // Phase 1: Add Channels and articles
151
logger.debug("Start Phase 1");
152     this.itemMax = phase1itemCount;
153     openScaleDatabase(true); // Create new virgin Database
154
createScaleChannelGroups(); // Add the requisite ChannelGroups and the Channels
155
runUntilNitems(phase1itemCount); // Run until we have the right number of items
156

157     // Phase 2: Continue running but start deleting articles
158
logger.debug("Start Phase 2");
159     createDeleterThreads(deleterThreadsCount); // Create and start threads to delete items
160
runUntilNitems(phase2itemCount); // Run until we have the requested number of items
161

162     // Phase 3: Just run the deleter threads until they are all done.
163
logger.debug("Start Phase 3");
164     waitForDeleterThreadsToBeDone();
165
166     logger.debug("Test complete.");
167   }
168
169   /**
170    * Create and start the indicated number of ArticleDeleterThreads.
171    *
172    * @param count
173    */

174   private void createDeleterThreads(int count) {
175     itemDeleterThreads = new ItemDeleter[count];
176     for (int i = 0; i < count; i++) {
177       itemDeleterThreads[i] = new ItemDeleter();
178       itemDeleterThreads[i].start();
179       itemDeleterThreads[i].setName("Deleter: " + i);
180     }
181   }
182
183   /**
184    * The DeleterThreads automatically set keepGoing to false when there are no remaining items to
185    * delete. This method loops (with sleeps in between) waiting for all ArticleDeleterThreads to be
186    * done.
187    *
188    */

189   private void waitForDeleterThreadsToBeDone() {
190 // int timeoutCounter = 0;
191
// boolean stillRunning = true;
192
// while (stillRunning) {
193
// assertTrue("Timed out waiting for Deleter threads to be done", timeoutCounter < 1000);
194
// stillRunning = false;
195
// for (int i = 0; i < itemDeleterThreads.length; i++) {
196
// if (itemDeleterThreads[i].keepGoing) stillRunning = true;
197
// }
198
// timeoutCounter++;
199
// try {
200
// Thread.sleep(100);
201
// } catch (InterruptedException e) {
202
// return;
203
// }
204
//
205
// }
206
for (int i = 0; i < itemDeleterThreads.length; i++) {
207       itemDeleterThreads[i].waitForFinish();
208     }
209   }
210
211   /**
212    * Look through the ChannelLog, comparing what we saw during record and verify mode. There are
213    * various cases that indicate errors.
214    */

215   private void verifyChannelLogEntryValidity() {
216     synchronized (channelLog) {
217       Iterator JavaDoc chanLogIter = channelLog.iterator();
218       while (chanLogIter.hasNext()) {
219         ChannelLogEntry next = (ChannelLogEntry) chanLogIter.next();
220         // if recordCounter == 0 and verifyCounter != 0, we got a channel from
221
// disk which we
222
// didn't see from the net
223
assertTrue("we got channel from disk which we didn't see from the net:" + next.theKey,
224             !(next.recordCounter == 0 && next.verifyCounter > 0));
225
226         // if recordCounter > 0 and verifyCounter == 0, we got a channel from
227
// the net, which
228
// we then didn't see from disk
229
assertTrue("we got a channel from the net, which we then didn't see from disk:"
230             + next.theKey, !(next.recordCounter > 0 && next.verifyCounter == 0));
231
232         // If recordCounter < verifyCounter, we got a channel from disk more
233
// than from
234
// memory
235
assertTrue("we got a channel from disk more often than from memory:" + next.theKey,
236             !(next.recordCounter < next.verifyCounter));
237
238         // If verifyCounter > 1, we got a channel from disk more than once
239
assertTrue("we got a channel from disk more than once:" + next.theKey,
240             !(next.verifyCounter > 1));
241
242       }
243     }
244   }
245
246   /**
247    * Look through the ItemLog to see if things are consistent. N.B. We stop checking after the
248    * number of items that were requested. The reason is that at that point the PersistChanGrps are
249    * deactivated. However they are not killed explicitly and so they may run on a bit beyond leaving
250    * the itemLog in an inconsistent state.
251    *
252    */

253   private void verifyItemLogValidity() {
254     int counter = 0;
255     synchronized (itemLog) {
256       Iterator JavaDoc itemLogIter = itemLog.iterator();
257       while (itemLogIter.hasNext() && counter < itemMax) {
258         counter++;
259         ItemLogEntry next = (ItemLogEntry) itemLogIter.next();
260         String JavaDoc msgHdr = next.theItem.getChannel() + ":" + next.theKey;
261
262         // This is code is structured this way to allow me to put a breakpoint
263
final boolean diskNotNet = next.recordCounter == 0 && next.verifyCounter > 0;
264         if (diskNotNet) {
265           assertTrue("we saw an item from disk which we didn't see from the net: " + msgHdr,
266               !diskNotNet);
267
268         }
269
270         final boolean netNotDisk = next.recordCounter > 0 && next.verifyCounter == 0;
271         if (netNotDisk) {
272           assertTrue("we saw an item from the Net which we didn't see from disk: " + msgHdr,
273               !netNotDisk);
274         }
275
276       }
277     }
278   }
279
280   /**
281    * Forget all the informa state
282    */

283   private void clearChannelGroups() {
284     deActivateProcesses();
285     managers = null;
286   }
287
288   /**
289    * Create nManager test Channel Groups of nChannels Channels each.
290    */

291   private void createScaleChannelGroups() {
292     // Create an array of ChannelGroupManagers
293
managers = new PersistChanGrpMgr[nManagers];
294     int channelIndex = 0;
295
296     for (int i = 0; i < nManagers; i++) {
297       managers[i] = new PersistChanGrpMgr(scaleSessHandler, true);
298       assertNotNull(managers[i]);
299       managers[i].createGroup(generateChanGrpName(i));
300       managers[i].setGlobalObserver(new ChannelLogRecordObserver());
301       for (int chans = 0; chans < nChans; chans++) {
302         Channel nextChan = managers[i].addChannel(RssUrlTestData.get(channelIndex++));
303         managers[i].notifyItems(nextChan);
304       }
305       managers[i].notifyChannels();
306     }
307   }
308
309   /**
310    * Create the PersistentChannelGroups and restore their state from the database. Assumes that the
311    * database is already open. Assume that the managers array is null again. After reading in each
312    * channel from disk, we ask Informa to notify for all the items and channels. As the channels are
313    * not activated, Informa will not start accessing the internet for new channels.
314    *
315    * @throws Exception (handled by JUnit)
316    */

317   private void restoreScaleChannelGroups() throws Exception JavaDoc {
318     managers = new PersistChanGrpMgr[nManagers];
319     int channelIndex = 0;
320
321     for (int i = 0; i < nManagers; i++) {
322       managers[i] = new PersistChanGrpMgr(scaleSessHandler, true);
323       assertNotNull(managers[i]);
324       managers[i].createGroup(generateChanGrpName(i));
325       managers[i].setGlobalObserver(new informaLogObserver());
326       for (int chans = 0; chans < nChans; chans++) {
327         Channel nextChan = managers[i].addChannel(RssUrlTestData.get(channelIndex++));
328         managers[i].notifyItems(nextChan);
329       }
330       managers[i].notifyChannels();
331     }
332   }
333
334   /**
335    * Sleep until the indicated "N" items have been logged.
336    *
337    * @param N
338    *
339    * @throws Exception (handled by JUnit)
340    */

341   private void runUntilNitems(int N) throws Exception JavaDoc {
342     setActiveSemaphor(true);
343     activateProcesses();
344     while (getActiveSemaphor()) {
345       Thread.sleep(1000);
346       setActiveSemaphor(itemLog.size() < N);
347     }
348     deActivateProcesses();
349   }
350
351   /**
352    * Return current value of activeSemaphor. True means background RSS poller in Informa is
353    * happening False means it is not
354    *
355    * @return current value of activeSemaphor
356    */

357   synchronized boolean getActiveSemaphor() {
358     return activeSemaphore;
359   }
360
361   /**
362    * Set activeSemaphor to new value, and return old value
363    *
364    * @param newval - true means background poller is active
365    * @return what the status was before
366    */

367   synchronized private boolean setActiveSemaphor(boolean newval) {
368     boolean oldval = activeSemaphore;
369     activeSemaphore = newval;
370     return oldval;
371   }
372
373   /**
374    * Start the background processing of channels
375    */

376   private void activateProcesses() {
377     for (int i = 0; i < nManagers; i++) {
378       managers[i].activate();
379     }
380   }
381
382   /**
383    * Stop the background processing of channels
384    */

385   private void deActivateProcesses() {
386     for (int i = 0; i < nManagers; i++) {
387       managers[i].deActivate();
388     }
389   }
390
391   /**
392    * Regenerate the empty datanase
393    *
394    * @return @throws FileNotFoundException
395    *
396    * @throws FileNotFoundException
397    */

398   private boolean getVirginDb() throws FileNotFoundException JavaDoc {
399     boolean fileOneIsOK, fileTwoIsOK;
400     fileTwoIsOK = copyFiles("informa.script", scaleDbPath + ".script");
401     fileOneIsOK = copyFiles("informa.properties", scaleDbPath + ".properties");
402     return fileOneIsOK && fileTwoIsOK;
403   }
404
405   /**
406    * Make a simple copy of one file to another
407    *
408    * @param src
409    * @param dest
410    * @return true if copy worked.
411    * @throws FileNotFoundException
412    */

413   private boolean copyFiles(String JavaDoc src, String JavaDoc dest) throws FileNotFoundException JavaDoc {
414     boolean result = false;
415     InputStream JavaDoc srcStream = new FileInputStream JavaDoc(src);
416     if (srcStream != null) {
417       try {
418         // Create channel on the destination
419
FileOutputStream JavaDoc dstStream = new FileOutputStream JavaDoc(dest);
420         int ch; // the buffer
421
while ((ch = srcStream.read()) != -1) {
422           dstStream.write(ch);
423         }
424         srcStream.close();
425         dstStream.close();
426         result = true;
427       } catch (IOException JavaDoc e) {
428         result = false;
429       }
430     }
431     if (!result) fail("Failed to copy File " + src + " to " + dest);
432     return result;
433   }
434
435   /**
436    * Open (or re-open) the database. Optionally begin from a blank database (virgin)
437    *
438    * @param virgin true means start from a fresh database
439    * @throws Exception
440    */

441   void openScaleDatabase(boolean virgin) throws Exception JavaDoc {
442     if (virgin) getVirginDb();
443
444     Properties JavaDoc hibernateProps = new Properties JavaDoc();
445     hibernateProps.setProperty("hibernate.connection.url", "jdbc:hsqldb:" + scaleDbPath);
446     scaleSessHandler = SessionHandler.getInstance(hibernateProps);
447     assertNotNull(scaleSessHandler);
448   }
449
450   /**
451    * Close the database fully
452    *
453    * @throws Exception
454    */

455   private void closeScaleDatabase() throws Exception JavaDoc {
456     scaleSessHandler.getSession().flush();
457     scaleSessHandler.getSession().close();
458   }
459
460   /**
461    * Generate a fake name for a generated channel
462    *
463    * @param i Index of channel involved
464    * @return fake name
465    */

466   private String JavaDoc generateChanGrpName(int i) {
467     return "Channel Group" + i;
468   }
469
470   /**
471    * Called when Informa retrieves a Channel. We use it in two modes: when channels are first
472    * retrieved from RSS over the network (recordmode = true), and then again when channels are
473    * retrieved when the persistent hibernate database is opened, from disk (verify mode --
474    * recordmode = false).
475    *
476    * There are two counters in the ChannelLogEntry, one that counts how often the particular Channel
477    * was seen during record mode, and the other during verify mode. Depending on the circumstance we
478    * can detect failure conditions.
479    *
480    * if recordCounter == 0 and verifyCounter != 0, we got a channel from disk which we didn't see
481    * from the net if recordCounter > 0 and verifyCounter == 0, we got a channel from the net, which
482    * we then didn't see from disk
483    *
484    * @param channel - Relevant Channel.
485    * @param recordMode - are we recording (true) or verifying (false)
486    */

487   void chanLogUpdate(ChannelIF channel, boolean recordMode) {
488     assertNotNull("ChannelRetrieved returned Null", channel);
489     URL JavaDoc locU = channel.getLocation();
490     String JavaDoc key = locU == null ? "<not yet>" : locU.toString();
491
492     Iterator JavaDoc channelIt = channelLog.iterator();
493     ChannelLogEntry found = null;
494
495     // Locate a ChannelLogEntry for specified Channel's getLocation URL; create
496
// one if none is there yet
497
while (channelIt.hasNext()) {
498       ChannelLogEntry tmp = (ChannelLogEntry) channelIt.next();
499       if (key.equals(tmp.theKey)) {
500         found = tmp;
501         break;
502       }
503     }
504     if (found == null) {
505       found = new ChannelLogEntry(channel, key);
506       synchronized (channelLog) {
507         channelLog.add(found);
508       }
509     }
510
511     // Increment the appropriate counter depending on whether we are recording
512
// or verifying.
513
if (recordMode) {
514       found.recordCounter++;
515     } else {
516       found.verifyCounter++;
517     }
518   }
519
520   /**
521    * Called when Informa retrieves an Item.
522    *
523    * @param item - Item to be recorded in log
524    * @param recordMode - are we recording (true) or verifying (false)
525    */

526   void itemLogUpdate(Item item, boolean recordMode) {
527     assertNotNull("itemRetrieved returned Null", item);
528     String JavaDoc atitle = item.getTitle();
529
530     Iterator JavaDoc itemIt = itemLog.iterator();
531     ItemLogEntry found = null;
532
533     // Locate an ItemLog for specified item
534
while (itemIt.hasNext()) {
535       ItemLogEntry tmp = (ItemLogEntry) itemIt.next();
536       if (atitle.equals(tmp.theKey)) {
537         found = tmp;
538         break;
539       }
540     }
541     if (found == null) {
542       found = new ItemLogEntry(item, atitle);
543       synchronized (itemLog) {
544         itemLog.add(found);
545       }
546     }
547
548     // Increment the appropriate counter depending on whether we are recording
549
// or verifying.
550
if (recordMode) {
551       found.recordCounter++;
552     } else {
553       found.verifyCounter++;
554     }
555   }
556
557   /**
558    * Initialize logging lists back to zero.
559    */

560   private void initLoggers() {
561     itemLog = new ArrayList JavaDoc();
562     channelLog = new ArrayList JavaDoc();
563   }
564
565   // -------------------------------------------------------------------
566
// Internal classes
567
// -------------------------------------------------------------------
568

569   /**
570    * Separate Global Observers for when we are fetching Informa/RSS information from the Net or from
571    * the Hiberate/informa persistent state.
572    */

573   class ChannelLogRecordObserver implements PersistChanGrpMgrObserverIF {
574
575     /**
576      * Called when a Channel is retrieved from the internet
577      *
578      * @param chan -
579      */

580     public void channelRetrieved(ChannelIF chan) {
581       chanLogUpdate(chan, true);
582     }
583
584     /**
585      * Called when an Item is retrieved from the internet
586      *
587      * @param newItem -
588      */

589     public void itemAdded(ItemIF newItem) {
590       itemLogUpdate((Item) newItem, true);
591     }
592
593     /**
594      * Called to indicate start and end of actual poller of feeds by Informa
595      *
596      * @param name name of group being polled
597      * @param count count of how many times this group has been polled
598      * @param now true to start, false at end
599      */

600     public void pollingNow(String JavaDoc name, int count, boolean now) {
601       // No testing ramifications.
602
}
603   }
604
605   /**
606    * Called by Informa when interesting things happen
607    *
608    */

609   class informaLogObserver implements PersistChanGrpMgrObserverIF {
610
611     /**
612      *
613      * Called when a Channel is retrieved from persistent Informa state
614      *
615      * @param chan -
616      */

617     public void channelRetrieved(ChannelIF chan) {
618       chanLogUpdate(chan, false);
619     }
620
621     /**
622      * Called when an Item is retrieved from persistent Informa state
623      *
624      * @param newItem -
625      */

626     public void itemAdded(ItemIF newItem) {
627       itemLogUpdate((Item) newItem, false);
628     }
629
630     /**
631      * Called to indicate start and end of actual poller of feeds by Informa
632      *
633      * @param name name of group being polled
634      * @param count count of how many times this group has been polled
635      * @param now true to start, false at end
636      */

637     public void pollingNow(String JavaDoc name, int count, boolean now) {
638       // No testing ramifications.
639
}
640
641   }
642
643   /**
644    * What an entry in the ChannelLog list looks like
645    */

646   class ChannelLogEntry {
647
648     int recordCounter;
649
650     int verifyCounter;
651
652     String JavaDoc theKey;
653
654     ChannelIF theChan;
655
656     ChannelLogEntry(ChannelIF achan, String JavaDoc akey) {
657       theChan = achan;
658       theKey = akey;
659       recordCounter = 0;
660       verifyCounter = 0;
661     }
662
663     /**
664      * Convert to a string, helpful for logging
665      *
666      * @return - string rendition
667      */

668     public String JavaDoc toString() {
669       return "rec/ver: " + recordCounter + "/" + verifyCounter + ":" + theKey;
670     }
671   }
672
673   /**
674    * What an entry in the itemLog List looks like
675    */

676   public class ItemLogEntry {
677
678     Item theItem;
679     String JavaDoc theKey;
680     String JavaDoc channelName;
681     int recordCounter;
682     int verifyCounter;
683     boolean deleted;
684     boolean unread;
685
686     ItemLogEntry(Item anitem, String JavaDoc akey) {
687       theItem = anitem;
688       theKey = akey;
689       channelName = anitem.getChannel().getTitle();
690       recordCounter = 0;
691       verifyCounter = 0;
692       deleted = false;
693       unread = true;
694     }
695
696     int getPersistChanGrpMgrIdx() {
697       int result = -1;
698       for (int i = 0; i < managers.length; i++) {
699         if (managers[i].hasChannel((Channel) theItem.getChannel())) {
700           assertTrue("Same channel in two groups", result == -1);
701           result = i;
702         }
703       }
704       assertTrue("Channel not found in any group", result != -1);
705       return result;
706     }
707
708     /**
709      * Convert to a string, helpful for logging
710      *
711      * @return - string rendition
712      */

713     public String JavaDoc toString() {
714       return "rec/ver: " + recordCounter + "/" + verifyCounter + ":" + theKey + "(" + channelName
715           + ")";
716     }
717
718   }
719
720   /**
721    * Thread to delete Items recorded in the ItemLog. Each thread will run through the list and
722    * delete articles that were recorded in the itemLog.
723    */

724   class ItemDeleter extends Thread JavaDoc {
725
726     boolean keepGoing;
727
728     /**
729      * Construct the thread. Start with flag to keep running.
730      */

731     ItemDeleter() {
732       keepGoing = true;
733     }
734
735     /**
736      * Wait for this thread to finish processing.
737      */

738     public synchronized void waitForFinish() {
739       try {
740         while (keepGoing) {
741           wait();
742         }
743       } catch (InterruptedException JavaDoc e) {
744         // Nothing to be done here. Simply pass control further if someone wishes us to
745
// abort waiting.
746
}
747     }
748
749     /**
750      * Thread Run method. Here's the algorithm: As long as "keepGoing", look through the entries in
751      * the itemLog. Locate one that has not been deleted and delete it. Mark the deleted one as
752      * deleted. Sleep for a little bit of time and repeat.
753      *
754      * This method has fairly subtle threading and synchronization issues.
755      */

756     public void run() {
757
758       // As long as parent wants this to keep on going
759
while (keepGoing) {
760
761         // Scan through the itemLog from start to end and delete all undeleted items
762

763         boolean foundOne = false;
764         ItemLogEntry[] copyItemLog = (ItemLogEntry[]) itemLog.toArray(new ItemLogEntry[0]);
765
766         for (int i = 0; i < copyItemLog.length; i++) {
767           ItemLogEntry entry = copyItemLog[i];
768           synchronized (entry) {
769
770             if (!entry.deleted) {
771               // Delete it and mark it deleted, and note that we deleted one.
772
PersistChanGrpMgr theGrp = managers[entry.getPersistChanGrpMgrIdx()];
773
774               Channel theChan = (Channel) entry.theItem.getChannel();
775               theGrp.deActivate();
776               int before = theGrp.getItemCount(theChan);
777               int after = theGrp.deleteItemFromChannel(theChan, entry.theItem);
778
779               if (getActiveSemaphor()) theGrp.activate();
780               logger.debug("Deleted. Count before =" + before + " /after: " + after);
781               assertEquals("Not the rigth number of items", before, after + 1);
782
783               entry.deleted = true;
784               foundOne = true;
785             }
786           }
787           try {
788             Thread.sleep(50);
789           } catch (InterruptedException JavaDoc e) {
790             return;
791           }
792         }
793         // If we didn't delete at least one, then we will scan the whole list again after sleeping.
794
if (!foundOne) keepGoing = false;
795
796       }
797       // Notify everyone processing is finished. Generally this works for
798
// the case if someone waits by calling waitForFinish() method.
799
synchronized (this) {
800         notifyAll();
801       }
802     }
803   }
804 }
Popular Tags