1 import EDU.oswego.cs.dl.util.concurrent.*; 2 import java.util.*; 3 4 public class BufferTasks extends FJTask { 5 6 static int niters = 1024 * 64; 7 static int[] pairs = {1, 2, 4, 8, 16, 32, 64}; 8 static int[] sizes = { 1024, 64, 1 }; 9 10 public static void main(String [] args) { 11 try { 12 int procs; 13 try { 14 procs = Integer.parseInt(args[0]); 15 } 16 catch (Exception e) { 17 System.out.println("Usage: java BufferTasks <threads>"); 18 return; 19 } 20 21 System.out.print("pairs:"); 22 for (int p = 0; p < pairs.length; ++p) 23 System.out.print("\t" + pairs[p]); 24 System.out.print("\n"); 25 26 27 FJTaskRunnerGroup g = new FJTaskRunnerGroup(procs); 28 g.invoke(new BufferTasks()); 29 } 30 catch (InterruptedException ex) {} 31 } 32 33 public void run() { 34 for (int s = 0; s < sizes.length; ++s) { 35 System.out.println("cap: " + sizes[s]); 36 37 for (int p = 0; p < pairs.length; ++p) { 38 39 buffer = new Buffer(sizes[s]); 40 int npairs = pairs[p]; 41 int iters = niters / npairs; 42 43 long startTime = System.currentTimeMillis(); 44 setCallbackCount(npairs * 2); 45 46 for (int k = 0; k < npairs; ++k) { 47 new Producer(iters).fork(); 48 new Consumer(iters).fork(); 49 } 50 51 while (!checkDone()) yield(); 52 53 long now = System.currentTimeMillis(); 54 long time = now - startTime; 55 long tpi = (time * 1000) / (npairs * niters); 56 System.out.print("\t" + tpi); 57 58 } 59 60 System.out.print("\n"); 61 62 getFJTaskRunnerGroup().stats(); 63 } 64 } 65 66 67 71 int callbackCount; 72 synchronized void notifyDone() { --callbackCount; } 73 synchronized void setCallbackCount(int c) { callbackCount = c; } 74 synchronized boolean checkDone() { return callbackCount == 0; } 75 76 77 Buffer buffer; 78 79 class Producer extends FJTask { 80 final int iters; 81 Producer(int n) { iters = n; } 82 83 public void run() { 84 for (int n = iters; n > 0; --n) { 85 if (!buffer.offer(new Integer (n))) { yield(); 89 new Producer(n).start(); 90 return; 91 } 92 } 93 notifyDone(); 94 } 95 } 96 97 98 class Consumer extends FJTask { 99 final int iters; 100 101 Consumer(int n) { iters = n; } 102 103 public void run() { 104 for (int n = iters; n > 0; --n) { 105 if (buffer.poll() == null) { 108 yield(); 109 new Consumer(n).start(); 110 return; 111 } 112 } 113 notifyDone(); 114 } 115 } 116 117 118 static class Buffer { 119 120 protected Object [] array_; protected int putPtr_ = 0; protected int takePtr_ = 0; 123 124 final NonBlockingSemaphore putPermits; 125 final NonBlockingSemaphore takePermits; 126 127 public Buffer(int capacity){ 128 putPermits = new NonBlockingSemaphore(capacity); 129 takePermits = new NonBlockingSemaphore(0); 130 array_ = new Object [capacity]; 131 } 132 133 public boolean offer(Object x){ 134 if (!putPermits.attempt()) return false; 135 synchronized(this) { 136 array_[putPtr_] = x; 137 if (++putPtr_ == array_.length) putPtr_ = 0; 138 } 139 takePermits.release(); 140 return true; 141 } 142 143 public Object poll() { 144 if (!takePermits.attempt()) return null; 145 Object x; 146 synchronized(this) { 147 x = array_[takePtr_]; 148 array_[takePtr_] = null; 149 if (++takePtr_ == array_.length) takePtr_ = 0; 150 } 151 putPermits.release(); 152 return x; 153 } 154 155 } 156 157 158 static class NonBlockingSemaphore { 159 private long permits_; 160 161 public NonBlockingSemaphore(long initialPermits) { 162 permits_ = initialPermits; 163 } 164 165 public synchronized boolean attempt() { 166 if (permits_ > 0) { 167 --permits_; 168 return true; 169 } 170 else 171 return false; 172 } 173 174 public synchronized void release() { 175 ++permits_; 176 } 177 } 178 } 179 180 181 182 | Popular Tags |