Code - Class EDU.oswego.cs.dl.util.concurrent.BoundedBuffer


1 /*
2   File: BoundedBuffer.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   17Jul1998 dl Simplified by eliminating wait counts
13   25aug1998 dl added peek
14    5May1999 dl replace % with conditional (slightly faster)
15 */

16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20  * Efficient array-based bounded buffer class.
21  * Adapted from CPJ, chapter 8, which describes design.
22  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
23  **/

24
25 public class BoundedBuffer implements BoundedChannel {
26
27   protected final Object[] array_; // the elements
28

29   protected int takePtr_ = 0; // circular indices
30
protected int putPtr_ = 0;
31
32   protected int usedSlots_ = 0; // length
33
protected int emptySlots_; // capacity - length
34

35   /**
36    * Helper monitor to handle puts.
37    **/

38   protected final Object putMonitor_ = new Object();
39
40   /**
41    * Create a BoundedBuffer with the given capacity.
42    * @exception IllegalArgumentException if capacity less or equal to zero
43    **/

44   public BoundedBuffer(int capacity) throws IllegalArgumentException {
45     if (capacity <= 0) throw new IllegalArgumentException();
46     array_ = new Object[capacity];
47     emptySlots_ = capacity;
48   }
49
50   /**
51    * Create a buffer with the current default capacity
52    **/

53
54   public BoundedBuffer() {
55     this(DefaultChannelCapacity.get());
56   }
57
58   /**
59    * Return the number of elements in the buffer.
60    * This is only a snapshot value, that may change
61    * immediately after returning.
62    **/

63   public synchronized int size() { return usedSlots_; }
64
65   public int capacity() { return array_.length; }
66
67   protected void incEmptySlots() {
68     synchronized(putMonitor_) {
69       ++emptySlots_;
70       putMonitor_.notify();
71     }
72   }
73
74   protected synchronized void incUsedSlots() {
75     ++usedSlots_;
76     notify();
77   }
78
79   protected final void insert(Object x) { // mechanics of put
80
--emptySlots_;
81     array_[putPtr_] = x;
82     if (++putPtr_ >= array_.length) putPtr_ = 0;
83   }
84
85   protected final Object extract() { // mechanics of take
86
--usedSlots_;
87     Object old = array_[takePtr_];
88     array_[takePtr_] = null;
89     if (++takePtr_ >= array_.length) takePtr_ = 0;
90     return old;
91   }
92
93   public Object peek() {
94     synchronized(this) {
95       if (usedSlots_ > 0)
96         return array_[takePtr_];
97       else
98         return null;
99     }
100   }
101
102
103   public void put(Object x) throws InterruptedException {
104     if (x == null) throw new IllegalArgumentException();
105     if (Thread.interrupted()) throw new InterruptedException();
106
107     synchronized(putMonitor_) {
108       while (emptySlots_ <= 0) {
109     try { putMonitor_.wait(); }
110         catch (InterruptedException ex) {
111           putMonitor_.notify();
112           throw ex;
113         }
114       }
115       insert(x);
116     }
117     incUsedSlots();
118   }
119
120   public boolean offer(Object x, long msecs) throws InterruptedException {
121     if (x == null) throw new IllegalArgumentException();
122     if (Thread.interrupted()) throw new InterruptedException();
123
124     synchronized(putMonitor_) {
125       long start = (msecs <= 0)? 0 : System.currentTimeMillis();
126       long waitTime = msecs;
127       while (emptySlots_ <= 0) {
128         if (waitTime <= 0) return false;
129     try { putMonitor_.wait(waitTime); }
130         catch (InterruptedException ex) {
131           putMonitor_.notify();
132           throw ex;
133         }
134         waitTime = msecs - (System.currentTimeMillis() - start);
135       }
136       insert(x);
137     }
138     incUsedSlots();
139     return true;
140   }
141
142
143
144   public Object take() throws InterruptedException {
145     if (Thread.interrupted()) throw new InterruptedException();
146     Object old = null;
147     synchronized(this) {
148       while (usedSlots_ <= 0) {
149         try { wait(); }
150         catch (InterruptedException ex) {
151           notify();
152           throw ex;
153         }
154       }
155       old = extract();
156     }
157     incEmptySlots();
158     return old;
159   }
160
161   public Object poll(long msecs) throws InterruptedException {
162     if (Thread.interrupted()) throw new InterruptedException();
163     Object old = null;
164     synchronized(this) {
165       long start = (msecs <= 0)? 0 : System.currentTimeMillis();
166       long waitTime = msecs;
167       
168       while (usedSlots_ <= 0) {
169         if (waitTime <= 0) return null;
170         try { wait(waitTime); }
171         catch (InterruptedException ex) {
172           notify();
173           throw ex;
174         }
175         waitTime = msecs - (System.currentTimeMillis() - start);
176
177       }
178       old = extract();
179     }
180     incEmptySlots();
181     return old;
182   }
183
184 }
185
186
187

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates