1 7 8 package java.util.concurrent; 9 import java.util.concurrent.locks.*; 10 import java.util.*; 11 12 35 36 public class DelayQueue<E extends Delayed > extends AbstractQueue<E> 37 implements BlockingQueue <E> { 38 39 private transient final ReentrantLock lock = new ReentrantLock(); 40 private transient final Condition available = lock.newCondition(); 41 private final PriorityQueue<E> q = new PriorityQueue<E>(); 42 43 46 public DelayQueue() {} 47 48 57 public DelayQueue(Collection<? extends E> c) { 58 this.addAll(c); 59 } 60 61 68 public boolean offer(E o) { 69 final ReentrantLock lock = this.lock; 70 lock.lock(); 71 try { 72 E first = q.peek(); 73 q.offer(o); 74 if (first == null || o.compareTo(first) < 0) 75 available.signalAll(); 76 return true; 77 } finally { 78 lock.unlock(); 79 } 80 } 81 82 83 89 public void put(E o) { 90 offer(o); 91 } 92 93 102 public boolean offer(E o, long timeout, TimeUnit unit) { 103 return offer(o); 104 } 105 106 114 public boolean add(E o) { 115 return offer(o); 116 } 117 118 124 public E take() throws InterruptedException { 125 final ReentrantLock lock = this.lock; 126 lock.lockInterruptibly(); 127 try { 128 for (;;) { 129 E first = q.peek(); 130 if (first == null) { 131 available.await(); 132 } else { 133 long delay = first.getDelay(TimeUnit.NANOSECONDS); 134 if (delay > 0) { 135 long tl = available.awaitNanos(delay); 136 } else { 137 E x = q.poll(); 138 assert x != null; 139 if (q.size() != 0) 140 available.signalAll(); return x; 142 143 } 144 } 145 } 146 } finally { 147 lock.unlock(); 148 } 149 } 150 151 165 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 166 final ReentrantLock lock = this.lock; 167 lock.lockInterruptibly(); 168 long nanos = unit.toNanos(timeout); 169 try { 170 for (;;) { 171 E first = q.peek(); 172 if (first == null) { 173 if (nanos <= 0) 174 return null; 175 else 176 nanos = available.awaitNanos(nanos); 177 } else { 178 long delay = first.getDelay(TimeUnit.NANOSECONDS); 179 if (delay > 0) { 180 if (nanos <= 0) 181 return null; 182 if (delay > nanos) 183 delay = nanos; 184 long timeLeft = available.awaitNanos(delay); 185 nanos -= delay - timeLeft; 186 } else { 187 E x = q.poll(); 188 assert x != null; 189 if (q.size() != 0) 190 available.signalAll(); 191 return x; 192 } 193 } 194 } 195 } finally { 196 lock.unlock(); 197 } 198 } 199 200 201 208 public E poll() { 209 final ReentrantLock lock = this.lock; 210 lock.lock(); 211 try { 212 E first = q.peek(); 213 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 214 return null; 215 else { 216 E x = q.poll(); 217 assert x != null; 218 if (q.size() != 0) 219 available.signalAll(); 220 return x; 221 } 222 } finally { 223 lock.unlock(); 224 } 225 } 226 227 235 public E peek() { 236 final ReentrantLock lock = this.lock; 237 lock.lock(); 238 try { 239 return q.peek(); 240 } finally { 241 lock.unlock(); 242 } 243 } 244 245 public int size() { 246 final ReentrantLock lock = this.lock; 247 lock.lock(); 248 try { 249 return q.size(); 250 } finally { 251 lock.unlock(); 252 } 253 } 254 255 public int drainTo(Collection<? super E> c) { 256 if (c == null) 257 throw new NullPointerException (); 258 if (c == this) 259 throw new IllegalArgumentException (); 260 final ReentrantLock lock = this.lock; 261 lock.lock(); 262 try { 263 int n = 0; 264 for (;;) { 265 E first = q.peek(); 266 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 267 break; 268 c.add(q.poll()); 269 ++n; 270 } 271 if (n > 0) 272 available.signalAll(); 273 return n; 274 } finally { 275 lock.unlock(); 276 } 277 } 278 279 public int drainTo(Collection<? super E> c, int maxElements) { 280 if (c == null) 281 throw new NullPointerException (); 282 if (c == this) 283 throw new IllegalArgumentException (); 284 if (maxElements <= 0) 285 return 0; 286 final ReentrantLock lock = this.lock; 287 lock.lock(); 288 try { 289 int n = 0; 290 while (n < maxElements) { 291 E first = q.peek(); 292 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 293 break; 294 c.add(q.poll()); 295 ++n; 296 } 297 if (n > 0) 298 available.signalAll(); 299 return n; 300 } finally { 301 lock.unlock(); 302 } 303 } 304 305 309 public void clear() { 310 final ReentrantLock lock = this.lock; 311 lock.lock(); 312 try { 313 q.clear(); 314 } finally { 315 lock.unlock(); 316 } 317 } 318 319 324 public int remainingCapacity() { 325 return Integer.MAX_VALUE; 326 } 327 328 public Object [] toArray() { 329 final ReentrantLock lock = this.lock; 330 lock.lock(); 331 try { 332 return q.toArray(); 333 } finally { 334 lock.unlock(); 335 } 336 } 337 338 public <T> T[] toArray(T[] array) { 339 final ReentrantLock lock = this.lock; 340 lock.lock(); 341 try { 342 return q.toArray(array); 343 } finally { 344 lock.unlock(); 345 } 346 } 347 348 352 public boolean remove(Object o) { 353 final ReentrantLock lock = this.lock; 354 lock.lock(); 355 try { 356 return q.remove(o); 357 } finally { 358 lock.unlock(); 359 } 360 } 361 362 371 public Iterator<E> iterator() { 372 final ReentrantLock lock = this.lock; 373 lock.lock(); 374 try { 375 return new Itr(q.iterator()); 376 } finally { 377 lock.unlock(); 378 } 379 } 380 381 private class Itr<E> implements Iterator<E> { 382 private final Iterator<E> iter; 383 Itr(Iterator<E> i) { 384 iter = i; 385 } 386 387 public boolean hasNext() { 388 return iter.hasNext(); 389 } 390 391 public E next() { 392 final ReentrantLock lock = DelayQueue.this.lock; 393 lock.lock(); 394 try { 395 return iter.next(); 396 } finally { 397 lock.unlock(); 398 } 399 } 400 401 public void remove() { 402 final ReentrantLock lock = DelayQueue.this.lock; 403 lock.lock(); 404 try { 405 iter.remove(); 406 } finally { 407 lock.unlock(); 408 } 409 } 410 } 411 412 } 413 | Popular Tags |