1 18 package org.apache.activemq.thread; 19 20 import org.apache.activemq.thread.Task; 21 import org.apache.activemq.thread.TaskRunner; 22 import org.apache.activemq.thread.TaskRunnerFactory; 23 24 import junit.framework.TestCase; 25 import java.util.concurrent.BrokenBarrierException ; 26 import java.util.concurrent.CyclicBarrier ; 27 import java.util.concurrent.CountDownLatch ; 28 import java.util.concurrent.TimeUnit ; 29 import java.util.concurrent.atomic.AtomicInteger ; 30 31 public class TaskRunnerTest extends TestCase { 32 33 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 34 .getLog(TaskRunnerTest.class); 35 36 37 public void testWakeupPooled() throws InterruptedException , BrokenBarrierException { 38 System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", "false"); 39 doTestWakeup(); 40 } 41 42 public void testWakeupDedicated() throws InterruptedException , BrokenBarrierException { 43 System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", "true"); 44 doTestWakeup(); 45 } 46 47 55 public void doTestWakeup() throws InterruptedException , BrokenBarrierException { 56 57 final AtomicInteger iterations = new AtomicInteger (0); 58 final AtomicInteger counter = new AtomicInteger (0); 59 final AtomicInteger queue = new AtomicInteger (0); 60 final CountDownLatch doneCountDownLatch = new CountDownLatch (1); 61 final int ENQUEUE_COUNT = 100000; 62 63 TaskRunnerFactory factory = new TaskRunnerFactory(); 64 final TaskRunner runner = factory.createTaskRunner(new Task() { 65 public boolean iterate() { 66 if( queue.get()==0 ) { 67 return false; 68 } else { 69 while(queue.get()>0) { 70 queue.decrementAndGet(); 71 counter.incrementAndGet(); 72 } 73 iterations.incrementAndGet(); 74 if (counter.get()==ENQUEUE_COUNT) 75 doneCountDownLatch.countDown(); 76 return true; 77 } 78 } 79 }, "Thread Name"); 80 81 long start = System.currentTimeMillis(); 82 final int WORKER_COUNT=5; 83 final CyclicBarrier barrier = new CyclicBarrier (WORKER_COUNT+1); 84 for( int i=0; i< WORKER_COUNT; i++ ) { 85 new Thread () { 86 public void run() { 87 try { 88 barrier.await(); 89 for( int i=0; i < ENQUEUE_COUNT/WORKER_COUNT; i++ ) { 90 queue.incrementAndGet(); 91 runner.wakeup(); 92 yield(); 93 } 94 } 95 catch (BrokenBarrierException e) { 96 } 97 catch (InterruptedException e) { 98 } 99 } 100 }.start(); 101 } 102 barrier.await(); 103 104 boolean b = doneCountDownLatch.await(30, TimeUnit.SECONDS); 105 long end = System.currentTimeMillis(); 106 log.info("Iterations: "+iterations.get()); 107 log.info("counter: "+counter.get()); 108 log.info("Dequeues/s: "+(1000.0*ENQUEUE_COUNT/(end-start))); 109 log.info("duration: "+((end-start)/1000.0)); 110 assertTrue(b); 111 112 runner.shutdown(); 113 } 114 115 116 117 public static void main(String [] args) { 118 junit.textui.TestRunner.run(TaskRunnerTest.class); 119 } 120 121 } 122 | Popular Tags |