| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue1 /* 2 File: BoundedLinkedQueue.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 17Jul1998 dl Simplified by eliminating wait counts 13 25aug1998 dl added peek 14 10oct1999 dl lock on node object to ensure visibility 15 27jan2000 dl setCapacity forces immediate permit reconcile 16 */ 17 18 package EDU.oswego.cs.dl.util.concurrent; 19 20 /** 21 * A bounded variant of 22 * LinkedQueue 23 * class. This class may be 24 * preferable to 25 * BoundedBuffer 26 * because it allows a bit more 27 * concurency among puts and takes, because it does not 28 * pre-allocate fixed storage for elements, and allows 29 * capacity to be dynamically reset. 30 * On the other hand, since it allocates a node object 31 * on each put, it can be slow on systems with slow 32 * allocation and GC. 33 * Also, it may be 34 * preferable to 35 * LinkedQueue 36 * when you need to limit 37 * the capacity to prevent resource exhaustion. This protection 38 * normally does not hurt much performance-wise: When the 39 * queue is not empty or full, most puts and 40 * takes are still usually able to execute concurrently. 41 * @see LinkedQueue 42 * @see BoundedBuffer 43 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> 44 **/ 45 46 public class BoundedLinkedQueue implements BoundedChannel { 47 48 /* 49 * It might be a bit nicer if this were declared as 50 * a subclass of LinkedQueue, or a sibling class of 51 * a common abstract class. It shares much of the 52 * basic design and bookkeeping fields. But too 53 * many details differ to make this worth doing. 54 */ 55 56 57 58 /** 59 * Dummy header node of list. The first actual node, if it exists, is always 60 * at head_.next. After each take, the old first node becomes the head. 61 **/ 62 protected LinkedNode head_; 63 64 /** 65 * The last node of list. Put() appends to list, so modifies last_ 66 **/ 67 protected LinkedNode last_; 68 69 70 /** 71 * Helper monitor. Ensures that only one put at a time executes. 72 **/ 73 74 protected final Object putGuard_ = new Object(); 75 76 /** 77 * Helper monitor. Protects and provides wait queue for takes 78 **/ 79 80 protected final Object takeGuard_ = new Object(); 81 82 83 /** Number of elements allowed **/ 84 protected int capacity_; 85 86 87 /** 88 * One side of a split permit count. 89 * The counts represent permits to do a put. (The queue is full when zero). 90 * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length. 91 * (The length is never separately recorded, so this cannot be 92 * checked explicitly.) 93 * To minimize contention between puts and takes, the 94 * put side uses up all of its permits before transfering them from 95 * the take side. The take side just increments the count upon each take. 96 * Thus, most puts and take can run independently of each other unless 97 * the queue is empty or full. 98 * Initial value is queue capacity. 99 **/ 100 101 protected int putSidePutPermits_; 102 103 /** Number of takes since last reconcile **/ 104 protected int takeSidePutPermits_ = 0; 105 106 107 /** 108 * Create a queue with the given capacity 109 * @exception IllegalArgumentException if capacity less or equal to zero 110 **/ 111 public BoundedLinkedQueue(int capacity) { 112 if (capacity <= 0) throw new IllegalArgumentException(); 113 capacity_ = capacity; 114 putSidePutPermits_ = capacity; 115 head_ = new LinkedNode(null); 116 last_ = head_; 117 } 118 119 /** 120 * Create a queue with the current default capacity 121 **/ 122 123 public BoundedLinkedQueue() { 124 this(DefaultChannelCapacity.get()); 125 } 126 127 /** 128 * Move put permits from take side to put side; 129 * return the number of put side permits that are available. 130 * Call only under synch on puGuard_ AND this. 131 **/ 132 protected final int reconcilePutPermits() { 133 putSidePutPermits_ += takeSidePutPermits_; 134 takeSidePutPermits_ = 0; 135 return putSidePutPermits_; 136 } 137 138 139 /** Return the current capacity of this queue **/ 140 public synchronized int capacity() { return capacity_; } 141 142 143 /** 144 * Return the number of elements in the queue. 145 * This is only a snapshot value, that may be in the midst 146 * of changing. The returned value will be unreliable in the presence of 147 * active puts and takes, and should only be used as a heuristic 148 * estimate, for example for resource monitoring purposes. 149 **/ 150 public synchronized int size() { 151 /* 152 This should ideally synch on putGuard_, but 153 doing so would cause it to block waiting for an in-progress 154 put, which might be stuck. So we instead use whatever 155 value of putSidePutPermits_ that we happen to read. 156 */ 157 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_); 158 } 159 160 161 /** 162 * Reset the capacity of this queue. 163 * If the new capacity is less than the old capacity, 164 * existing elements are NOT removed, but 165 * incoming puts will not proceed until the number of elements 166 * is less than the new capacity. 167 * @exception IllegalArgumentException if capacity less or equal to zero 168 **/ 169 170 public void setCapacity(int newCapacity) { 171 if (newCapacity <= 0) throw new IllegalArgumentException(); 172 synchronized (putGuard_) { 173 synchronized(this) { 174 takeSidePutPermits_ += (newCapacity - capacity_); 175 capacity_ = newCapacity; 176 177 // Force immediate reconcilation. 178 reconcilePutPermits(); 179 notifyAll(); 180 } 181 } 182 } 183 184 185 /** Main mechanics for take/poll **/ 186 protected synchronized Object extract() { 187 synchronized(head_) { 188 Object x = null; 189 LinkedNode first = head_.next; 190 if (first != null) { 191 x = first.value; 192 first.value = null; 193 head_ = first; 194 ++takeSidePutPermits_; 195 notify(); 196 } 197 return x; 198 } 199 } 200 201 public Object peek() { 202 synchronized(head_) { 203 LinkedNode first = head_.next; 204 if (first != null) 205 return first.value; 206 else 207 return null; 208 } 209 } 210 211 public Object take() throws InterruptedException { 212 if (Thread.interrupted()) throw new InterruptedException(); 213 Object x = extract(); 214 if (x != null) 215 return x; 216 else { 217 synchronized(takeGuard_) { 218 try { 219 for (;;) { 220 x = extract(); 221 if (x != null) { 222 return x; 223 } 224 else { 225 takeGuard_.wait(); 226 } 227 } 228 } 229 catch(InterruptedException ex) { 230 takeGuard_.notify(); 231 throw ex; 232 } 233 } 234 } 235 } 236 237 public Object poll(long msecs) throws InterruptedException { 238 if (Thread.interrupted()) throw new InterruptedException(); 239 Object x = extract(); 240 if (x != null) 241 return x; 242 else { 243 synchronized(takeGuard_) { 244 try { 245 long waitTime = msecs; 246 long start = (msecs <= 0)? 0: System.currentTimeMillis(); 247 for (;;) { 248 x = extract(); 249 if (x != null || waitTime <= 0) { 250 return x; 251 } 252 else { 253 takeGuard_.wait(waitTime); 254 waitTime = msecs - (System.currentTimeMillis() - start); 255 } 256 } 257 } 258 catch(InterruptedException ex) { 259 takeGuard_.notify(); 260 throw ex; 261 } 262 } 263 } 264 } 265 266 /** Notify a waiting take if needed **/ 267 protected final void allowTake() { 268 synchronized(takeGuard_) { 269 takeGuard_.notify(); 270 } 271 } 272 273 274 /** 275 * Create and insert a node. 276 * Call only under synch on putGuard_ 277 **/ 278 protected void insert(Object x) { 279 --putSidePutPermits_; 280 LinkedNode p = new LinkedNode(x); 281 synchronized(last_) { 282 last_.next = p; 283 last_ = p; 284 } 285 } 286 287 288 /* 289 put and offer(ms) differ only in policy before insert/allowTake 290 */ 291 292 public void put(Object x) throws InterruptedException { 293 if (x == null) throw new IllegalArgumentException(); 294 if (Thread.interrupted()) throw new InterruptedException(); 295 296 synchronized(putGuard_) { 297 298 if (putSidePutPermits_ <= 0) { // wait for permit. 299 synchronized(this) { 300 if (reconcilePutPermits() <= 0) { 301 try { 302 for(;;) { 303 wait(); 304 if (reconcilePutPermits() > 0) { 305 break; 306 } 307 } 308 } 309 catch (InterruptedException ex) { 310 notify(); 311 throw ex; 312 } 313 } 314 } 315 } 316 insert(x); 317 } 318 // call outside of lock to loosen put/take coupling 319 allowTake(); 320 } 321 322 public boolean offer(Object x, long msecs) throws InterruptedException { 323 if (x == null) throw new IllegalArgumentException(); 324 if (Thread.interrupted()) throw new InterruptedException(); 325 326 synchronized(putGuard_) { 327 328 if (putSidePutPermits_ <= 0) { 329 synchronized(this) { 330 if (reconcilePutPermits() <= 0) { 331 if (msecs <= 0) 332 return false; 333 else { 334 try { 335 long waitTime = msecs; 336 long start = System.currentTimeMillis(); 337 338 for(;;) { 339 wait(waitTime); 340 if (reconcilePutPermits() > 0) { 341 break; 342 } 343 else { 344 waitTime = msecs - (System.currentTimeMillis() - start); 345 if (waitTime <= 0) { 346 return false; 347 } 348 } 349 } 350 } 351 catch (InterruptedException ex) { 352 notify(); 353 throw ex; 354 } 355 } 356 } 357 } 358 } 359 360 insert(x); 361 } 362 363 allowTake(); 364 return true; 365 } 366 367 public boolean isEmpty() { 368 synchronized(head_) { 369 return head_.next == null; 370 } 371 } 372 373 } 374 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|