1 4 package com.tctest; 5 6 import com.tc.object.config.ConfigVisitor; 7 import com.tc.object.config.DSOClientConfigHelper; 8 import com.tc.object.config.TransparencyClassSpec; 9 import com.tc.simulator.app.ApplicationConfig; 10 import com.tc.simulator.listener.ListenerProvider; 11 import com.tctest.util.AbstractTransparentAppMultiplexer; 12 import com.tctest.util.DSOConfigUtil; 13 import com.tctest.util.TestUtil; 14 import com.tctest.util.Timer; 15 16 import java.util.concurrent.BlockingQueue ; 17 import java.util.concurrent.CyclicBarrier ; 18 import java.util.concurrent.LinkedBlockingQueue ; 19 20 public class WorkQueuesTestApp extends AbstractTransparentAppMultiplexer 21 { 22 private static final int NUM_ITEMS = 100; 23 private static final int SIZE_ITEMS = 10; 24 25 private final QueueMultiplexer multiplexer; 26 private final CyclicBarrier readBarrier; 27 private final Object poison; 28 29 private ItemGenerator itemGenerator = new ItemGenerator(); 30 31 public static class Item 32 { 33 public byte[] data; 34 } 35 36 public static class ItemGenerator 37 { 38 public Item next() 39 { 40 Item item = new Item(); 41 item.data = new byte[SIZE_ITEMS]; 42 return item; 43 } 44 45 } 46 47 public WorkQueuesTestApp(String appId, ApplicationConfig cfg, ListenerProvider listenerProvider) 48 { 49 super(appId, cfg, listenerProvider); 50 multiplexer = new QueueMultiplexer(); 51 readBarrier = new CyclicBarrier (Math.min(getParticipantCount(), 2)); 52 poison = new Object (); 53 } 54 55 public void run(CyclicBarrier barrier, int index) throws Throwable 56 { 57 if (index == 0) { 58 doPuts(); 59 return; 60 } 61 62 doReads(); 63 } 64 65 private void doPuts() throws Exception 66 { 67 BlockingQueue <Object > queue = new LinkedBlockingQueue <Object >(500); 68 Timer t = new Timer(); 69 70 multiplexer.start(queue); 71 72 System.out.println("Warming up..."); 73 for (int i = 0; i < NUM_ITEMS; i++) { 74 queue.put(itemGenerator.next()); 75 } 76 77 queue.put(readBarrier); 80 readBarrier.await(); 81 82 System.out.println("Putting items..."); 84 int total = NUM_ITEMS*getIntensity(); 85 t.start(); 86 for (int i = 0; i < total; i++) { 87 queue.put(itemGenerator.next()); 88 } 89 90 queue.put(readBarrier); 93 readBarrier.await(); 94 95 t.stop(); 97 98 total++; 101 102 multiplexer.putAll(poison); 104 105 TestUtil.printStats("" + getParticipantCount(), "nodes"); 107 TestUtil.printStats("" + total, "transactions"); 108 TestUtil.printStats("" + t.elapsed(), "milliseconds"); 109 TestUtil.printStats("" + t.tps(total), "tps"); 110 } 111 112 private void doReads() throws Exception 113 { 114 BlockingQueue <Object > queue = multiplexer.getNewOutputQueue(); 115 116 System.out.println("Getting items..."); 117 118 while (true) { 119 Object item = queue.take(); 120 if (item instanceof CyclicBarrier ) { 121 ((CyclicBarrier ) item).await(); 122 continue; 123 } 124 if (item == poison) { 125 break; 126 } 127 } 128 } 129 130 public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) 131 { 132 TransparencyClassSpec spec = config.getOrCreateSpec(WorkQueuesTestApp.class.getName()); 133 134 AbstractTransparentAppMultiplexer.visitL1DSOConfig(visitor, config); 135 136 DSOConfigUtil.autoLockAndInstrumentClass(config, WorkQueuesTestApp.class); 137 DSOConfigUtil.autoLockAndInstrumentClass(config, QueueMultiplexer.class, true); 138 139 DSOConfigUtil.addRoot(spec, "multiplexer"); 140 DSOConfigUtil.addRoot(spec, "readBarrier"); 141 DSOConfigUtil.addRoot(spec, "poison"); 142 } 143 } 144 | Popular Tags |