KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > BufferTasks


1 import EDU.oswego.cs.dl.util.concurrent.*;
2 import java.util.*;
3
4 public class BufferTasks extends FJTask {
5
6   static int niters = 1024 * 64;
7   static int[] pairs = {1, 2, 4, 8, 16, 32, 64};
8   static int[] sizes = { 1024, 64, 1 };
9
10   public static void main(String JavaDoc[] args) {
11     try {
12       int procs;
13       try {
14         procs = Integer.parseInt(args[0]);
15       }
16       catch (Exception JavaDoc e) {
17         System.out.println("Usage: java BufferTasks <threads>");
18         return;
19       }
20
21       System.out.print("pairs:");
22       for (int p = 0; p < pairs.length; ++p)
23         System.out.print("\t" + pairs[p]);
24       System.out.print("\n");
25
26
27       FJTaskRunnerGroup g = new FJTaskRunnerGroup(procs);
28       g.invoke(new BufferTasks());
29     }
30     catch (InterruptedException JavaDoc ex) {}
31   }
32
33   public void run() {
34     for (int s = 0; s < sizes.length; ++s) {
35       System.out.println("cap: " + sizes[s]);
36
37       for (int p = 0; p < pairs.length; ++p) {
38
39         buffer = new Buffer(sizes[s]);
40         int npairs = pairs[p];
41         int iters = niters / npairs;
42
43         long startTime = System.currentTimeMillis();
44         setCallbackCount(npairs * 2);
45
46         for (int k = 0; k < npairs; ++k) {
47           new Producer(iters).fork();
48           new Consumer(iters).fork();
49         }
50         
51         while (!checkDone()) yield();
52
53         long now = System.currentTimeMillis();
54         long time = now - startTime;
55         long tpi = (time * 1000) / (npairs * niters);
56         System.out.print("\t" + tpi);
57
58       }
59
60       System.out.print("\n");
61
62       getFJTaskRunnerGroup().stats();
63     }
64   }
65
66
67   /**
68    * Keep track of callbacks so that test driver knows when
69    * to terminate
70    **/

71   int callbackCount;
72   synchronized void notifyDone() { --callbackCount; }
73   synchronized void setCallbackCount(int c) { callbackCount = c; }
74   synchronized boolean checkDone() { return callbackCount == 0; }
75
76   /** The shared buffer **/
77   Buffer buffer;
78
79   class Producer extends FJTask {
80     final int iters;
81     Producer(int n) { iters = n; }
82     
83     public void run() {
84       for (int n = iters; n > 0; --n) {
85         // If cannot continue, create a new task to
86
// take our place, and start it.
87
if (!buffer.offer(new Integer JavaDoc(n))) { // Doesn't matter what's put in
88
yield();
89           new Producer(n).start();
90           return;
91         }
92       }
93       notifyDone();
94     }
95   }
96
97
98   class Consumer extends FJTask {
99     final int iters;
100
101     Consumer(int n) { iters = n; }
102
103     public void run() {
104       for (int n = iters; n > 0; --n) {
105         // If cannot continue, create a new task to
106
// take our place, and start it.
107
if (buffer.poll() == null) {
108           yield();
109           new Consumer(n).start();
110           return;
111         }
112       }
113       notifyDone();
114     }
115   }
116
117
118   static class Buffer {
119
120     protected Object JavaDoc[] array_; // the elements
121
protected int putPtr_ = 0; // circular indices
122
protected int takePtr_ = 0;
123     
124     final NonBlockingSemaphore putPermits;
125     final NonBlockingSemaphore takePermits;
126     
127     public Buffer(int capacity){
128       putPermits = new NonBlockingSemaphore(capacity);
129       takePermits = new NonBlockingSemaphore(0);
130       array_ = new Object JavaDoc[capacity];
131     }
132     
133     public boolean offer(Object JavaDoc x){
134       if (!putPermits.attempt()) return false;
135       synchronized(this) {
136         array_[putPtr_] = x;
137         if (++putPtr_ == array_.length) putPtr_ = 0;
138       }
139       takePermits.release();
140       return true;
141     }
142     
143     public Object JavaDoc poll() {
144       if (!takePermits.attempt()) return null;
145       Object JavaDoc x;
146       synchronized(this) {
147         x = array_[takePtr_];
148         array_[takePtr_] = null;
149         if (++takePtr_ == array_.length) takePtr_ = 0;
150       }
151       putPermits.release();
152       return x;
153     }
154     
155   }
156
157
158   static class NonBlockingSemaphore {
159     private long permits_;
160     
161     public NonBlockingSemaphore(long initialPermits) {
162       permits_ = initialPermits;
163     }
164     
165     public synchronized boolean attempt() {
166       if (permits_ > 0) {
167         --permits_;
168         return true;
169       }
170       else
171         return false;
172     }
173     
174     public synchronized void release() {
175       ++permits_;
176     }
177   }
178 }
179
180
181
182
Popular Tags