1 16 17 package EDU.oswego.cs.dl.util.concurrent; 18 19 30 31 public class LinkedQueue implements Channel { 32 33 34 38 protected LinkedNode head_; 39 40 43 protected final Object putLock_ = new Object (); 44 45 48 protected LinkedNode last_; 49 50 57 protected int waitingForTake_ = 0; 58 59 public LinkedQueue() { 60 head_ = new LinkedNode(null); 61 last_ = head_; 62 } 63 64 65 protected void insert(Object x) { 66 synchronized(putLock_) { 67 LinkedNode p = new LinkedNode(x); 68 synchronized(last_) { 69 last_.next = p; 70 last_ = p; 71 } 72 if (waitingForTake_ > 0) 73 putLock_.notify(); 74 } 75 } 76 77 78 protected synchronized Object extract() { 79 synchronized(head_) { 80 Object x = null; 81 LinkedNode first = head_.next; 82 if (first != null) { 83 x = first.value; 84 first.value = null; 85 head_ = first; 86 } 87 return x; 88 } 89 } 90 91 92 public void put(Object x) throws InterruptedException { 93 if (x == null) throw new IllegalArgumentException (); 94 if (Thread.interrupted()) throw new InterruptedException (); 95 insert(x); 96 } 97 98 public boolean offer(Object x, long msecs) throws InterruptedException { 99 if (x == null) throw new IllegalArgumentException (); 100 if (Thread.interrupted()) throw new InterruptedException (); 101 insert(x); 102 return true; 103 } 104 105 public Object take() throws InterruptedException { 106 if (Thread.interrupted()) throw new InterruptedException (); 107 Object x = extract(); 109 if (x != null) 110 return x; 111 else { 112 synchronized(putLock_) { 113 try { 114 ++waitingForTake_; 115 for (;;) { 116 x = extract(); 117 if (x != null) { 118 --waitingForTake_; 119 return x; 120 } 121 else { 122 putLock_.wait(); 123 } 124 } 125 } 126 catch(InterruptedException ex) { 127 --waitingForTake_; 128 putLock_.notify(); 129 throw ex; 130 } 131 } 132 } 133 } 134 135 public Object peek() { 136 synchronized(head_) { 137 LinkedNode first = head_.next; 138 if (first != null) 139 return first.value; 140 else 141 return null; 142 } 143 } 144 145 146 public boolean isEmpty() { 147 synchronized(head_) { 148 return head_.next == null; 149 } 150 } 151 152 public Object poll(long msecs) throws InterruptedException { 153 if (Thread.interrupted()) throw new InterruptedException (); 154 Object x = extract(); 155 if (x != null) 156 return x; 157 else { 158 synchronized(putLock_) { 159 try { 160 long waitTime = msecs; 161 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); 162 ++waitingForTake_; 163 for (;;) { 164 x = extract(); 165 if (x != null || waitTime <= 0) { 166 --waitingForTake_; 167 return x; 168 } 169 else { 170 putLock_.wait(waitTime); 171 waitTime = msecs - (System.currentTimeMillis() - start); 172 } 173 } 174 } 175 catch(InterruptedException ex) { 176 --waitingForTake_; 177 putLock_.notify(); 178 throw ex; 179 } 180 } 181 } 182 } 183 } 184 185 186 | Popular Tags |