| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.BoundedBuffer1 /* 2 File: BoundedBuffer.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 17Jul1998 dl Simplified by eliminating wait counts 13 25aug1998 dl added peek 14 5May1999 dl replace % with conditional (slightly faster) 15 */ 16 17 package EDU.oswego.cs.dl.util.concurrent; 18 19 /** 20 * Efficient array-based bounded buffer class. 21 * Adapted from CPJ, chapter 8, which describes design. 22 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> 23 **/ 24 25 public class BoundedBuffer implements BoundedChannel { 26 27 protected final Object[] array_; // the elements 28 29 protected int takePtr_ = 0; // circular indices 30 protected int putPtr_ = 0; 31 32 protected int usedSlots_ = 0; // length 33 protected int emptySlots_; // capacity - length 34 35 /** 36 * Helper monitor to handle puts. 37 **/ 38 protected final Object putMonitor_ = new Object(); 39 40 /** 41 * Create a BoundedBuffer with the given capacity. 42 * @exception IllegalArgumentException if capacity less or equal to zero 43 **/ 44 public BoundedBuffer(int capacity) throws IllegalArgumentException { 45 if (capacity <= 0) throw new IllegalArgumentException(); 46 array_ = new Object[capacity]; 47 emptySlots_ = capacity; 48 } 49 50 /** 51 * Create a buffer with the current default capacity 52 **/ 53 54 public BoundedBuffer() { 55 this(DefaultChannelCapacity.get()); 56 } 57 58 /** 59 * Return the number of elements in the buffer. 60 * This is only a snapshot value, that may change 61 * immediately after returning. 62 **/ 63 public synchronized int size() { return usedSlots_; } 64 65 public int capacity() { return array_.length; } 66 67 protected void incEmptySlots() { 68 synchronized(putMonitor_) { 69 ++emptySlots_; 70 putMonitor_.notify(); 71 } 72 } 73 74 protected synchronized void incUsedSlots() { 75 ++usedSlots_; 76 notify(); 77 } 78 79 protected final void insert(Object x) { // mechanics of put 80 --emptySlots_; 81 array_[putPtr_] = x; 82 if (++putPtr_ >= array_.length) putPtr_ = 0; 83 } 84 85 protected final Object extract() { // mechanics of take 86 --usedSlots_; 87 Object old = array_[takePtr_]; 88 array_[takePtr_] = null; 89 if (++takePtr_ >= array_.length) takePtr_ = 0; 90 return old; 91 } 92 93 public Object peek() { 94 synchronized(this) { 95 if (usedSlots_ > 0) 96 return array_[takePtr_]; 97 else 98 return null; 99 } 100 } 101 102 103 public void put(Object x) throws InterruptedException { 104 if (x == null) throw new IllegalArgumentException(); 105 if (Thread.interrupted()) throw new InterruptedException(); 106 107 synchronized(putMonitor_) { 108 while (emptySlots_ <= 0) { 109 try { putMonitor_.wait(); } 110 catch (InterruptedException ex) { 111 putMonitor_.notify(); 112 throw ex; 113 } 114 } 115 insert(x); 116 } 117 incUsedSlots(); 118 } 119 120 public boolean offer(Object x, long msecs) throws InterruptedException { 121 if (x == null) throw new IllegalArgumentException(); 122 if (Thread.interrupted()) throw new InterruptedException(); 123 124 synchronized(putMonitor_) { 125 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); 126 long waitTime = msecs; 127 while (emptySlots_ <= 0) { 128 if (waitTime <= 0) return false; 129 try { putMonitor_.wait(waitTime); } 130 catch (InterruptedException ex) { 131 putMonitor_.notify(); 132 throw ex; 133 } 134 waitTime = msecs - (System.currentTimeMillis() - start); 135 } 136 insert(x); 137 } 138 incUsedSlots(); 139 return true; 140 } 141 142 143 144 public Object take() throws InterruptedException { 145 if (Thread.interrupted()) throw new InterruptedException(); 146 Object old = null; 147 synchronized(this) { 148 while (usedSlots_ <= 0) { 149 try { wait(); } 150 catch (InterruptedException ex) { 151 notify(); 152 throw ex; 153 } 154 } 155 old = extract(); 156 } 157 incEmptySlots(); 158 return old; 159 } 160 161 public Object poll(long msecs) throws InterruptedException { 162 if (Thread.interrupted()) throw new InterruptedException(); 163 Object old = null; 164 synchronized(this) { 165 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); 166 long waitTime = msecs; 167 168 while (usedSlots_ <= 0) { 169 if (waitTime <= 0) return null; 170 try { wait(waitTime); } 171 catch (InterruptedException ex) { 172 notify(); 173 throw ex; 174 } 175 waitTime = msecs - (System.currentTimeMillis() - start); 176 177 } 178 old = extract(); 179 } 180 incEmptySlots(); 181 return old; 182 } 183 184 } 185 186 187 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|