| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.LinkedQueue1 /* 2 File: LinkedQueue.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 25aug1998 dl added peek 13 10dec1998 dl added isEmpty 14 10oct1999 dl lock on node object to ensure visibility 15 */ 16 17 package EDU.oswego.cs.dl.util.concurrent; 18 19 /** 20 * A linked list based channel implementation. 21 * The algorithm avoids contention between puts 22 * and takes when the queue is not empty. 23 * Normally a put and a take can proceed simultaneously. 24 * (Although it does not allow multiple concurrent puts or takes.) 25 * This class tends to perform more efficently than 26 * other Channel implementations in producer/consumer 27 * applications. 28 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 29 **/ 30 31 public class LinkedQueue implements Channel { 32 33 34 /** 35 * Dummy header node of list. The first actual node, if it exists, is always 36 * at head_.next. After each take, the old first node becomes the head. 37 **/ 38 protected LinkedNode head_; 39 40 /** 41 * Helper monitor for managing access to last node. 42 **/ 43 protected final Object putLock_ = new Object(); 44 45 /** 46 * The last node of list. Put() appends to list, so modifies last_ 47 **/ 48 protected LinkedNode last_; 49 50 /** 51 * The number of threads waiting for a take. 52 * Notifications are provided in put only if greater than zero. 53 * The bookkeeping is worth it here since in reasonably balanced 54 * usages, the notifications will hardly ever be necessary, so 55 * the call overhead to notify can be eliminated. 56 **/ 57 protected int waitingForTake_ = 0; 58 59 public LinkedQueue() { 60 head_ = new LinkedNode(null); 61 last_ = head_; 62 } 63 64 /** Main mechanics for put/offer **/ 65 protected void insert(Object x) { 66 synchronized(putLock_) { 67 LinkedNode p = new LinkedNode(x); 68 synchronized(last_) { 69 last_.next = p; 70 last_ = p; 71 } 72 if (waitingForTake_ > 0) 73 putLock_.notify(); 74 } 75 } 76 77 /** Main mechanics for take/poll **/ 78 protected synchronized Object extract() { 79 synchronized(head_) { 80 Object x = null; 81 LinkedNode first = head_.next; 82 if (first != null) { 83 x = first.value; 84 first.value = null; 85 head_ = first; 86 } 87 return x; 88 } 89 } 90 91 92 public void put(Object x) throws InterruptedException { 93 if (x == null) throw new IllegalArgumentException(); 94 if (Thread.interrupted()) throw new InterruptedException(); 95 insert(x); 96 } 97 98 public boolean offer(Object x, long msecs) throws InterruptedException { 99 if (x == null) throw new IllegalArgumentException(); 100 if (Thread.interrupted()) throw new InterruptedException(); 101 insert(x); 102 return true; 103 } 104 105 public Object take() throws InterruptedException { 106 if (Thread.interrupted()) throw new InterruptedException(); 107 // try to extract. If fail, then enter wait-based retry loop 108 Object x = extract(); 109 if (x != null) 110 return x; 111 else { 112 synchronized(putLock_) { 113 try { 114 ++waitingForTake_; 115 for (;;) { 116 x = extract(); 117 if (x != null) { 118 --waitingForTake_; 119 return x; 120 } 121 else { 122 putLock_.wait(); 123 } 124 } 125 } 126 catch(InterruptedException ex) { 127 --waitingForTake_; 128 putLock_.notify(); 129 throw ex; 130 } 131 } 132 } 133 } 134 135 public Object peek() { 136 synchronized(head_) { 137 LinkedNode first = head_.next; 138 if (first != null) 139 return first.value; 140 else 141 return null; 142 } 143 } 144 145 146 public boolean isEmpty() { 147 synchronized(head_) { 148 return head_.next == null; 149 } 150 } 151 152 public Object poll(long msecs) throws InterruptedException { 153 if (Thread.interrupted()) throw new InterruptedException(); 154 Object x = extract(); 155 if (x != null) 156 return x; 157 else { 158 synchronized(putLock_) { 159 try { 160 long waitTime = msecs; 161 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); 162 ++waitingForTake_; 163 for (;;) { 164 x = extract(); 165 if (x != null || waitTime <= 0) { 166 --waitingForTake_; 167 return x; 168 } 169 else { 170 putLock_.wait(waitTime); 171 waitTime = msecs - (System.currentTimeMillis() - start); 172 } 173 } 174 } 175 catch(InterruptedException ex) { 176 --waitingForTake_; 177 putLock_.notify(); 178 throw ex; 179 } 180 } 181 } 182 } 183 } 184 185 186 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|