1 package org.objectweb.celtix.bus.workqueue; 2 3 import java.util.concurrent.RejectedExecutionException ; 4 import java.util.concurrent.locks.Condition ; 5 import java.util.concurrent.locks.Lock ; 6 import java.util.concurrent.locks.ReentrantLock ; 7 8 import junit.framework.TestCase; 9 10 public class AutomaticWorkQueueTest extends TestCase { 11 12 public static final int UNBOUNDED_MAX_QUEUE_SIZE = -1; 13 public static final int UNBOUNDED_HIGH_WATER_MARK = -1; 14 public static final int UNBOUNDED_LOW_WATER_MARK = -1; 15 16 public static final int INITIAL_SIZE = 2; 17 public static final int DEFAULT_MAX_QUEUE_SIZE = 10; 18 public static final int DEFAULT_HIGH_WATER_MARK = 10; 19 public static final int DEFAULT_LOW_WATER_MARK = 1; 20 public static final long DEFAULT_DEQUEUE_TIMEOUT = 2 * 60 * 1000L; 21 22 public static final int TIMEOUT = 100; 23 24 AutomaticWorkQueueImpl workqueue; 25 public void tearDown() throws Exception { 26 if (workqueue != null) { 27 workqueue.shutdown(true); 28 workqueue = null; 29 } 30 } 31 32 public void testUnboundedConstructor() { 33 workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE, 34 UNBOUNDED_HIGH_WATER_MARK, 35 UNBOUNDED_LOW_WATER_MARK, 36 DEFAULT_DEQUEUE_TIMEOUT); 37 assertNotNull(workqueue); 38 assertEquals(AutomaticWorkQueueImpl.DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize()); 39 assertEquals(UNBOUNDED_HIGH_WATER_MARK, workqueue.getHighWaterMark()); 40 assertEquals(UNBOUNDED_LOW_WATER_MARK, workqueue.getLowWaterMark()); 41 } 42 43 public void testConstructor() { 44 workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE, 45 DEFAULT_HIGH_WATER_MARK, 46 DEFAULT_LOW_WATER_MARK, 47 DEFAULT_DEQUEUE_TIMEOUT); 48 assertNotNull(workqueue); 49 assertEquals(DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize()); 50 assertEquals(DEFAULT_HIGH_WATER_MARK, workqueue.getHighWaterMark()); 51 assertEquals(DEFAULT_LOW_WATER_MARK, workqueue.getLowWaterMark()); 52 } 53 54 public void testEnqueue() { 55 workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE, 56 DEFAULT_HIGH_WATER_MARK, 57 DEFAULT_LOW_WATER_MARK, 58 DEFAULT_DEQUEUE_TIMEOUT); 59 60 try { 61 Thread.sleep(100); 62 } catch (Exception e) { 63 } 65 66 assertEquals(0, workqueue.getSize()); 68 assertEquals(INITIAL_SIZE, workqueue.getPoolSize()); 69 70 assertEquals(0, workqueue.getActiveCount()); 73 74 workqueue.execute(new TestWorkItem(), TIMEOUT); 75 76 int i = 0; 78 while (workqueue.getSize() != 0 && i++ < 50) { 79 try { 80 Thread.sleep(100); 81 } catch (InterruptedException ie) { 82 } 84 } 85 assertEquals(0, workqueue.getSize()); 86 } 87 88 public void testEnqueueImmediate() { 89 workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE, 90 DEFAULT_HIGH_WATER_MARK, 91 DEFAULT_LOW_WATER_MARK, 92 DEFAULT_DEQUEUE_TIMEOUT); 93 94 try { 95 Thread.sleep(100); 96 } catch (Exception e) { 97 } 99 100 assertEquals(0, workqueue.getSize()); 105 assertEquals(INITIAL_SIZE, workqueue.getPoolSize()); 106 assertEquals(0, workqueue.getActiveCount()); 107 108 BlockingWorkItem[] workItems = new BlockingWorkItem[DEFAULT_HIGH_WATER_MARK]; 109 BlockingWorkItem[] fillers = new BlockingWorkItem[DEFAULT_MAX_QUEUE_SIZE]; 110 111 try { 112 for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) { 115 workItems[i] = new BlockingWorkItem(); 116 try { 117 workqueue.execute(workItems[i]); 118 } catch (RejectedExecutionException ex) { 119 fail("failed on item[" + i + "] with: " + ex); 120 } 121 } 122 123 while (workqueue.getActiveCount() < INITIAL_SIZE) { 124 try { 125 Thread.sleep(250); 126 } catch (InterruptedException ex) { 127 } 129 } 130 131 for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) { 132 fillers[i] = new BlockingWorkItem(); 133 try { 134 workqueue.execute(fillers[i]); 135 } catch (RejectedExecutionException ex) { 136 fail("failed on filler[" + i + "] with: " + ex); 137 } 138 } 139 140 try { 142 Thread.sleep(250); 143 } catch (InterruptedException ex) { 144 } 146 147 assertTrue(workqueue.toString(), workqueue.isFull()); 148 assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK, workqueue.getPoolSize()); 149 assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK, workqueue.getActiveCount()); 150 151 try { 152 workqueue.execute(new BlockingWorkItem()); 153 fail("workitem should not have been accepted."); 154 } catch (RejectedExecutionException ex) { 155 } 157 158 160 workItems[0].unblock(); 161 boolean accepted = false; 162 workItems[0] = new BlockingWorkItem(); 163 164 for (int i = 0; i < 20 && !accepted; i++) { 165 try { 166 Thread.sleep(100); 167 } catch (InterruptedException ex) { 168 } 170 try { 171 workqueue.execute(workItems[0]); 172 accepted = true; 173 } catch (RejectedExecutionException ex) { 174 } 176 } 177 assertTrue(accepted); 178 } finally { 179 for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) { 180 if (workItems[i] != null) { 181 workItems[i].unblock(); 182 } 183 } 184 for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) { 185 if (fillers[i] != null) { 186 fillers[i].unblock(); 187 } 188 } 189 } 190 } 191 192 public void testDeadLockEnqueueLoads() { 193 workqueue = new AutomaticWorkQueueImpl(500, 1, 2, 2, 194 DEFAULT_DEQUEUE_TIMEOUT); 195 DeadLockThread dead = new DeadLockThread(workqueue, 200, 196 10L); 197 198 assertTrue(checkDeadLock(dead)); 199 } 200 201 public void testNonDeadLockEnqueueLoads() { 202 workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, 203 INITIAL_SIZE, 204 UNBOUNDED_HIGH_WATER_MARK, 205 UNBOUNDED_LOW_WATER_MARK, 206 DEFAULT_DEQUEUE_TIMEOUT); 207 DeadLockThread dead = new DeadLockThread(workqueue, 200); 208 209 assertTrue(checkDeadLock(dead)); 210 } 211 212 public void testSchedule() throws Exception { 213 workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE, 214 UNBOUNDED_HIGH_WATER_MARK, 215 UNBOUNDED_LOW_WATER_MARK, 216 DEFAULT_DEQUEUE_TIMEOUT); 217 final Lock runLock = new ReentrantLock (); 218 final Condition runCondition = runLock.newCondition(); 219 long start = System.currentTimeMillis(); 220 Runnable doNothing = new Runnable () { 221 public void run() { 222 runLock.lock(); 223 try { 224 runCondition.signal(); 225 } finally { 226 runLock.unlock(); 227 } 228 } 229 }; 230 231 workqueue.schedule(doNothing, 5000); 232 233 runLock.lock(); 234 try { 235 runCondition.await(); 236 } finally { 237 runLock.unlock(); 238 } 239 240 assertTrue("expected delay", 241 System.currentTimeMillis() - start >= 4950); 242 } 243 244 public void testThreadPoolShrink() { 245 workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, 20, 20, 10, 100L); 246 247 DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L); 248 249 assertTrue("Should be finished, probably deadlocked", checkDeadLock(dead)); 250 251 int i = 0; 253 while (workqueue.getPoolSize() != 10 && i++ < 50) { 254 try { 255 Thread.sleep(100); 256 } catch (InterruptedException ie) { 257 } 259 } 260 assertEquals(workqueue.getLowWaterMark(), workqueue.getPoolSize()); 261 } 262 263 public void testThreadPoolShrinkUnbounded() { 264 workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE, 265 UNBOUNDED_HIGH_WATER_MARK, 266 DEFAULT_LOW_WATER_MARK, 100L); 267 268 DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L); 269 assertTrue("Should be finished, probably deadlocked", checkDeadLock(dead)); 270 271 int i = 0; 273 int last = workqueue.getPoolSize(); 274 while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) { 275 if (last != workqueue.getPoolSize()) { 276 last = workqueue.getPoolSize(); 277 i = 0; 278 } 279 try { 280 Thread.sleep(100); 281 } catch (InterruptedException ie) { 282 } 284 } 285 assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK); 286 } 287 288 public void testShutdown() { 289 workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE, 290 INITIAL_SIZE, INITIAL_SIZE, 250); 291 292 assertEquals(0, workqueue.getSize()); 293 DeadLockThread dead = new DeadLockThread(workqueue, 100, 5L); 294 dead.start(); 295 assertTrue(checkCompleted(dead)); 296 297 workqueue.shutdown(true); 298 299 for (int i = 0; i < 20 && (workqueue.getSize() > 0 || workqueue.getPoolSize() > 0); i++) { 301 try { 302 Thread.sleep(250); 303 } catch (InterruptedException ie) { 304 } 306 } 307 assertEquals(0, workqueue.getSize()); 308 assertEquals(0, workqueue.getPoolSize()); 309 310 workqueue = null; 312 } 313 314 private boolean checkCompleted(DeadLockThread dead) { 315 int oldCompleted = 0; 316 int newCompleted = 0; 317 int noProgressCount = 0; 318 while (!dead.isFinished()) { 319 newCompleted = dead.getWorkItemCompletedCount(); 320 if (newCompleted > oldCompleted) { 321 oldCompleted = newCompleted; 322 noProgressCount = 0; 323 } else { 324 if (oldCompleted != 0 329 && ++noProgressCount > 5) { 330 return false; 331 } 332 } 333 try { 334 Thread.sleep(250); 335 } catch (InterruptedException ie) { 336 } 338 } 339 return true; 340 } 341 342 private boolean checkDeadLock(DeadLockThread dead) { 343 dead.start(); 344 return checkCompleted(dead); 345 } 346 347 public class TestWorkItem implements Runnable { 348 String name; 349 long worktime; 350 Callback callback; 351 352 public TestWorkItem() { 353 this("WI"); 354 } 355 356 public TestWorkItem(String n) { 357 this(n, DeadLockThread.DEFAULT_WORK_TIME); 358 } 359 360 public TestWorkItem(String n, long wt) { 361 this(n, wt, null); 362 } 363 364 public TestWorkItem(String n, long wt, Callback c) { 365 name = n; 366 worktime = wt; 367 callback = c; 368 } 369 370 public void run() { 371 try { 372 try { 373 Thread.sleep(worktime); 374 } catch (InterruptedException ie) { 375 return; 377 } 378 } finally { 379 if (callback != null) { 380 callback.workItemCompleted(name); 381 } 382 } 383 } 384 385 public String toString() { 386 return "[TestWorkItem:name=" + name + "]"; 387 } 388 } 389 390 public class BlockingWorkItem implements Runnable { 391 private boolean unblocked; 392 393 public void run() { 394 synchronized (this) { 395 while (!unblocked) { 396 try { 397 wait(); 398 } catch (InterruptedException ie) { 399 } 401 } 402 } 403 } 404 405 void unblock() { 406 synchronized (this) { 407 unblocked = true; 408 notify(); 409 } 410 } 411 } 412 413 public interface Callback { 414 void workItemCompleted(String name); 415 } 416 417 public class DeadLockThread extends Thread implements Callback { 418 public static final long DEFAULT_WORK_TIME = 10L; 419 public static final int DEFAULT_WORK_ITEMS = 200; 420 421 AutomaticWorkQueueImpl workqueue; 422 int nWorkItems; 423 int nWorkItemsCompleted; 424 long worktime; 425 long finishTime; 426 long startTime; 427 428 public DeadLockThread(AutomaticWorkQueueImpl wq) { 429 this(wq, DEFAULT_WORK_ITEMS, DEFAULT_WORK_TIME); 430 } 431 432 public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi) { 433 this(wq, nwi, DEFAULT_WORK_TIME); 434 } 435 436 public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi, long wt) { 437 workqueue = wq; 438 nWorkItems = nwi; 439 worktime = wt; 440 } 441 442 public synchronized boolean isFinished() { 443 return nWorkItemsCompleted == nWorkItems; 444 } 445 446 public synchronized void workItemCompleted(String name) { 447 nWorkItemsCompleted++; 448 if (isFinished()) { 449 finishTime = System.currentTimeMillis(); 450 } 451 } 452 453 public int getWorkItemCount() { 454 return nWorkItems; 455 } 456 457 public long worktime() { 458 return worktime; 459 } 460 461 public synchronized int getWorkItemCompletedCount() { 462 return nWorkItemsCompleted; 463 } 464 465 public long finishTime() { 466 return finishTime; 467 } 468 469 public long duration() { 470 return finishTime - startTime; 471 } 472 473 public void run() { 474 startTime = System.currentTimeMillis(); 475 476 for (int i = 0; i < nWorkItems; i++) { 477 try { 478 workqueue.execute(new TestWorkItem(String.valueOf(i), worktime, this), TIMEOUT); 479 } catch (RejectedExecutionException ex) { 480 } 482 } 483 while (!isFinished()) { 484 try { 485 Thread.sleep(worktime); 486 } catch (InterruptedException ie) { 487 } 489 } 490 } 491 } 492 493 } 494 | Popular Tags |