KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > workqueue > AutomaticWorkQueueTest


1 package org.objectweb.celtix.bus.workqueue;
2
3 import java.util.concurrent.RejectedExecutionException JavaDoc;
4 import java.util.concurrent.locks.Condition JavaDoc;
5 import java.util.concurrent.locks.Lock JavaDoc;
6 import java.util.concurrent.locks.ReentrantLock JavaDoc;
7
8 import junit.framework.TestCase;
9
10 public class AutomaticWorkQueueTest extends TestCase {
11
12     public static final int UNBOUNDED_MAX_QUEUE_SIZE = -1;
13     public static final int UNBOUNDED_HIGH_WATER_MARK = -1;
14     public static final int UNBOUNDED_LOW_WATER_MARK = -1;
15
16     public static final int INITIAL_SIZE = 2;
17     public static final int DEFAULT_MAX_QUEUE_SIZE = 10;
18     public static final int DEFAULT_HIGH_WATER_MARK = 10;
19     public static final int DEFAULT_LOW_WATER_MARK = 1;
20     public static final long DEFAULT_DEQUEUE_TIMEOUT = 2 * 60 * 1000L;
21
22     public static final int TIMEOUT = 100;
23
24     AutomaticWorkQueueImpl workqueue;
25     public void tearDown() throws Exception JavaDoc {
26         if (workqueue != null) {
27             workqueue.shutdown(true);
28             workqueue = null;
29         }
30     }
31     
32     public void testUnboundedConstructor() {
33         workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
34                                                UNBOUNDED_HIGH_WATER_MARK,
35                                                UNBOUNDED_LOW_WATER_MARK,
36                                                DEFAULT_DEQUEUE_TIMEOUT);
37         assertNotNull(workqueue);
38         assertEquals(AutomaticWorkQueueImpl.DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize());
39         assertEquals(UNBOUNDED_HIGH_WATER_MARK, workqueue.getHighWaterMark());
40         assertEquals(UNBOUNDED_LOW_WATER_MARK, workqueue.getLowWaterMark());
41     }
42
43     public void testConstructor() {
44         workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
45                                                DEFAULT_HIGH_WATER_MARK,
46                                                DEFAULT_LOW_WATER_MARK,
47                                                DEFAULT_DEQUEUE_TIMEOUT);
48         assertNotNull(workqueue);
49         assertEquals(DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize());
50         assertEquals(DEFAULT_HIGH_WATER_MARK, workqueue.getHighWaterMark());
51         assertEquals(DEFAULT_LOW_WATER_MARK, workqueue.getLowWaterMark());
52     }
53
54     public void testEnqueue() {
55         workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
56                                                DEFAULT_HIGH_WATER_MARK,
57                                                DEFAULT_LOW_WATER_MARK,
58                                                DEFAULT_DEQUEUE_TIMEOUT);
59
60         try {
61             Thread.sleep(100);
62         } catch (Exception JavaDoc e) {
63             // ignore
64
}
65
66         // We haven't enqueued anything yet, so should be zero
67
assertEquals(0, workqueue.getSize());
68         assertEquals(INITIAL_SIZE, workqueue.getPoolSize());
69
70         // Check that no threads are working yet, as we haven't enqueued
71
// anything yet.
72
assertEquals(0, workqueue.getActiveCount());
73
74         workqueue.execute(new TestWorkItem(), TIMEOUT);
75
76         // Give threads a chance to dequeue (5sec max)
77
int i = 0;
78         while (workqueue.getSize() != 0 && i++ < 50) {
79             try {
80                 Thread.sleep(100);
81             } catch (InterruptedException JavaDoc ie) {
82                 // ignore
83
}
84         }
85         assertEquals(0, workqueue.getSize());
86     }
87
88     public void testEnqueueImmediate() {
89         workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
90                                                DEFAULT_HIGH_WATER_MARK,
91                                                DEFAULT_LOW_WATER_MARK,
92                                                DEFAULT_DEQUEUE_TIMEOUT);
93
94         try {
95             Thread.sleep(100);
96         } catch (Exception JavaDoc e) {
97             // ignore
98
}
99
100         // We haven't enqueued anything yet, so should there shouldn't be
101
// any items on the queue, the thread pool should still be the
102
// initial size and no threads should be working
103
//
104
assertEquals(0, workqueue.getSize());
105         assertEquals(INITIAL_SIZE, workqueue.getPoolSize());
106         assertEquals(0, workqueue.getActiveCount());
107
108         BlockingWorkItem[] workItems = new BlockingWorkItem[DEFAULT_HIGH_WATER_MARK];
109         BlockingWorkItem[] fillers = new BlockingWorkItem[DEFAULT_MAX_QUEUE_SIZE];
110
111         try {
112             // fill up the queue, then exhaust the thread pool
113
//
114
for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) {
115                 workItems[i] = new BlockingWorkItem();
116                 try {
117                     workqueue.execute(workItems[i]);
118                 } catch (RejectedExecutionException JavaDoc ex) {
119                     fail("failed on item[" + i + "] with: " + ex);
120                 }
121             }
122
123             while (workqueue.getActiveCount() < INITIAL_SIZE) {
124                 try {
125                     Thread.sleep(250);
126                 } catch (InterruptedException JavaDoc ex) {
127                     // ignore
128
}
129             }
130
131             for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) {
132                 fillers[i] = new BlockingWorkItem();
133                 try {
134                     workqueue.execute(fillers[i]);
135                 } catch (RejectedExecutionException JavaDoc ex) {
136                     fail("failed on filler[" + i + "] with: " + ex);
137                 }
138             }
139
140             // give threads a chance to start executing the work items
141
try {
142                 Thread.sleep(250);
143             } catch (InterruptedException JavaDoc ex) {
144                 // ignore
145
}
146
147             assertTrue(workqueue.toString(), workqueue.isFull());
148             assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK, workqueue.getPoolSize());
149             assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK, workqueue.getActiveCount());
150
151             try {
152                 workqueue.execute(new BlockingWorkItem());
153                 fail("workitem should not have been accepted.");
154             } catch (RejectedExecutionException JavaDoc ex) {
155                 // ignore
156
}
157
158             // unblock one work item and allow thread to dequeue next item
159

160             workItems[0].unblock();
161             boolean accepted = false;
162             workItems[0] = new BlockingWorkItem();
163
164             for (int i = 0; i < 20 && !accepted; i++) {
165                 try {
166                     Thread.sleep(100);
167                 } catch (InterruptedException JavaDoc ex) {
168                     // ignore
169
}
170                 try {
171                     workqueue.execute(workItems[0]);
172                     accepted = true;
173                 } catch (RejectedExecutionException JavaDoc ex) {
174                     // ignore
175
}
176             }
177             assertTrue(accepted);
178         } finally {
179             for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) {
180                 if (workItems[i] != null) {
181                     workItems[i].unblock();
182                 }
183             }
184             for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) {
185                 if (fillers[i] != null) {
186                     fillers[i].unblock();
187                 }
188             }
189         }
190     }
191
192     public void testDeadLockEnqueueLoads() {
193         workqueue = new AutomaticWorkQueueImpl(500, 1, 2, 2,
194                                                DEFAULT_DEQUEUE_TIMEOUT);
195         DeadLockThread dead = new DeadLockThread(workqueue, 200,
196                                                  10L);
197
198         assertTrue(checkDeadLock(dead));
199     }
200
201     public void testNonDeadLockEnqueueLoads() {
202         workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE,
203                                                INITIAL_SIZE,
204                                                UNBOUNDED_HIGH_WATER_MARK,
205                                                UNBOUNDED_LOW_WATER_MARK,
206                                                DEFAULT_DEQUEUE_TIMEOUT);
207         DeadLockThread dead = new DeadLockThread(workqueue, 200);
208
209         assertTrue(checkDeadLock(dead));
210     }
211     
212     public void testSchedule() throws Exception JavaDoc {
213         workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
214                                                UNBOUNDED_HIGH_WATER_MARK,
215                                                UNBOUNDED_LOW_WATER_MARK,
216                                                DEFAULT_DEQUEUE_TIMEOUT);
217         final Lock JavaDoc runLock = new ReentrantLock JavaDoc();
218         final Condition JavaDoc runCondition = runLock.newCondition();
219         long start = System.currentTimeMillis();
220         Runnable JavaDoc doNothing = new Runnable JavaDoc() {
221             public void run() {
222                 runLock.lock();
223                 try {
224                     runCondition.signal();
225                 } finally {
226                     runLock.unlock();
227                 }
228             }
229         };
230         
231         workqueue.schedule(doNothing, 5000);
232         
233         runLock.lock();
234         try {
235             runCondition.await();
236         } finally {
237             runLock.unlock();
238         }
239         
240         assertTrue("expected delay",
241                    System.currentTimeMillis() - start >= 4950);
242     }
243
244     public void testThreadPoolShrink() {
245         workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, 20, 20, 10, 100L);
246
247         DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L);
248
249         assertTrue("Should be finished, probably deadlocked", checkDeadLock(dead));
250
251         // Give threads a chance to dequeue (5sec max)
252
int i = 0;
253         while (workqueue.getPoolSize() != 10 && i++ < 50) {
254             try {
255                 Thread.sleep(100);
256             } catch (InterruptedException JavaDoc ie) {
257                 // ignore
258
}
259         }
260         assertEquals(workqueue.getLowWaterMark(), workqueue.getPoolSize());
261     }
262
263     public void testThreadPoolShrinkUnbounded() {
264         workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
265                                                UNBOUNDED_HIGH_WATER_MARK,
266                                                DEFAULT_LOW_WATER_MARK, 100L);
267
268         DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L);
269         assertTrue("Should be finished, probably deadlocked", checkDeadLock(dead));
270
271         // Give threads a chance to dequeue (5sec max)
272
int i = 0;
273         int last = workqueue.getPoolSize();
274         while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) {
275             if (last != workqueue.getPoolSize()) {
276                 last = workqueue.getPoolSize();
277                 i = 0;
278             }
279             try {
280                 Thread.sleep(100);
281             } catch (InterruptedException JavaDoc ie) {
282                 // ignore
283
}
284         }
285         assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
286     }
287
288     public void testShutdown() {
289         workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
290                                                INITIAL_SIZE, INITIAL_SIZE, 250);
291
292         assertEquals(0, workqueue.getSize());
293         DeadLockThread dead = new DeadLockThread(workqueue, 100, 5L);
294         dead.start();
295         assertTrue(checkCompleted(dead));
296
297         workqueue.shutdown(true);
298
299         // Give threads a chance to shutdown (1 sec max)
300
for (int i = 0; i < 20 && (workqueue.getSize() > 0 || workqueue.getPoolSize() > 0); i++) {
301             try {
302                 Thread.sleep(250);
303             } catch (InterruptedException JavaDoc ie) {
304                 // ignore
305
}
306         }
307         assertEquals(0, workqueue.getSize());
308         assertEquals(0, workqueue.getPoolSize());
309         
310         //already shutdown
311
workqueue = null;
312     }
313
314     private boolean checkCompleted(DeadLockThread dead) {
315         int oldCompleted = 0;
316         int newCompleted = 0;
317         int noProgressCount = 0;
318         while (!dead.isFinished()) {
319             newCompleted = dead.getWorkItemCompletedCount();
320             if (newCompleted > oldCompleted) {
321                 oldCompleted = newCompleted;
322                 noProgressCount = 0;
323             } else {
324                 // No reduction in the completion count so it may be deadlocked,
325
// allow thread to make no progress for 5 time-slices before
326
// assuming a deadlock has occurred
327
//
328
if (oldCompleted != 0
329                     && ++noProgressCount > 5) {
330                     return false;
331                 }
332             }
333             try {
334                 Thread.sleep(250);
335             } catch (InterruptedException JavaDoc ie) {
336                 // ignore
337
}
338         }
339         return true;
340     }
341
342     private boolean checkDeadLock(DeadLockThread dead) {
343         dead.start();
344         return checkCompleted(dead);
345     }
346
347     public class TestWorkItem implements Runnable JavaDoc {
348         String JavaDoc name;
349         long worktime;
350         Callback callback;
351
352         public TestWorkItem() {
353             this("WI");
354         }
355
356         public TestWorkItem(String JavaDoc n) {
357             this(n, DeadLockThread.DEFAULT_WORK_TIME);
358         }
359
360         public TestWorkItem(String JavaDoc n, long wt) {
361             this(n, wt, null);
362         }
363
364         public TestWorkItem(String JavaDoc n, long wt, Callback c) {
365             name = n;
366             worktime = wt;
367             callback = c;
368         }
369
370         public void run() {
371             try {
372                 try {
373                     Thread.sleep(worktime);
374                 } catch (InterruptedException JavaDoc ie) {
375                     // ignore
376
return;
377                 }
378             } finally {
379                 if (callback != null) {
380                     callback.workItemCompleted(name);
381                 }
382             }
383         }
384
385         public String JavaDoc toString() {
386             return "[TestWorkItem:name=" + name + "]";
387         }
388     }
389
390     public class BlockingWorkItem implements Runnable JavaDoc {
391         private boolean unblocked;
392
393         public void run() {
394             synchronized (this) {
395                 while (!unblocked) {
396                     try {
397                         wait();
398                     } catch (InterruptedException JavaDoc ie) {
399                         // ignore
400
}
401                 }
402             }
403         }
404
405         void unblock() {
406             synchronized (this) {
407                 unblocked = true;
408                 notify();
409             }
410         }
411     }
412
413     public interface Callback {
414         void workItemCompleted(String JavaDoc name);
415     }
416
417     public class DeadLockThread extends Thread JavaDoc implements Callback {
418         public static final long DEFAULT_WORK_TIME = 10L;
419         public static final int DEFAULT_WORK_ITEMS = 200;
420
421         AutomaticWorkQueueImpl workqueue;
422         int nWorkItems;
423         int nWorkItemsCompleted;
424         long worktime;
425         long finishTime;
426         long startTime;
427
428         public DeadLockThread(AutomaticWorkQueueImpl wq) {
429             this(wq, DEFAULT_WORK_ITEMS, DEFAULT_WORK_TIME);
430         }
431
432         public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi) {
433             this(wq, nwi, DEFAULT_WORK_TIME);
434         }
435
436         public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi, long wt) {
437             workqueue = wq;
438             nWorkItems = nwi;
439             worktime = wt;
440         }
441
442         public synchronized boolean isFinished() {
443             return nWorkItemsCompleted == nWorkItems;
444         }
445
446         public synchronized void workItemCompleted(String JavaDoc name) {
447             nWorkItemsCompleted++;
448             if (isFinished()) {
449                 finishTime = System.currentTimeMillis();
450             }
451         }
452
453         public int getWorkItemCount() {
454             return nWorkItems;
455         }
456
457         public long worktime() {
458             return worktime;
459         }
460
461         public synchronized int getWorkItemCompletedCount() {
462             return nWorkItemsCompleted;
463         }
464
465         public long finishTime() {
466             return finishTime;
467         }
468
469         public long duration() {
470             return finishTime - startTime;
471         }
472
473         public void run() {
474             startTime = System.currentTimeMillis();
475
476             for (int i = 0; i < nWorkItems; i++) {
477                 try {
478                     workqueue.execute(new TestWorkItem(String.valueOf(i), worktime, this), TIMEOUT);
479                 } catch (RejectedExecutionException JavaDoc ex) {
480                     // ignore
481
}
482             }
483             while (!isFinished()) {
484                 try {
485                     Thread.sleep(worktime);
486                 } catch (InterruptedException JavaDoc ie) {
487                     // ignore
488
}
489             }
490         }
491     }
492
493 }
494
Popular Tags