1 7 8 package java.util.concurrent; 9 10 import java.util.concurrent.locks.*; 11 import java.util.*; 12 13 39 public class PriorityBlockingQueue<E> extends AbstractQueue<E> 40 implements BlockingQueue <E>, java.io.Serializable { 41 private static final long serialVersionUID = 5595510919245408276L; 42 43 private final PriorityQueue<E> q; 44 private final ReentrantLock lock = new ReentrantLock(true); 45 private final Condition notEmpty = lock.newCondition(); 46 47 53 public PriorityBlockingQueue() { 54 q = new PriorityQueue<E>(); 55 } 56 57 67 public PriorityBlockingQueue(int initialCapacity) { 68 q = new PriorityQueue<E>(initialCapacity, null); 69 } 70 71 83 public PriorityBlockingQueue(int initialCapacity, 84 Comparator<? super E> comparator) { 85 q = new PriorityQueue<E>(initialCapacity, comparator); 86 } 87 88 107 public PriorityBlockingQueue(Collection<? extends E> c) { 108 q = new PriorityQueue<E>(c); 109 } 110 111 112 114 125 public boolean add(E o) { 126 return super.add(o); 127 } 128 129 137 public Comparator<? super E> comparator() { 138 return q.comparator(); 139 } 140 141 151 public boolean offer(E o) { 152 if (o == null) throw new NullPointerException (); 153 final ReentrantLock lock = this.lock; 154 lock.lock(); 155 try { 156 boolean ok = q.offer(o); 157 assert ok; 158 notEmpty.signal(); 159 return true; 160 } finally { 161 lock.unlock(); 162 } 163 } 164 165 174 public void put(E o) { 175 offer(o); } 177 178 190 public boolean offer(E o, long timeout, TimeUnit unit) { 191 return offer(o); } 193 194 public E take() throws InterruptedException { 195 final ReentrantLock lock = this.lock; 196 lock.lockInterruptibly(); 197 try { 198 try { 199 while (q.size() == 0) 200 notEmpty.await(); 201 } catch (InterruptedException ie) { 202 notEmpty.signal(); throw ie; 204 } 205 E x = q.poll(); 206 assert x != null; 207 return x; 208 } finally { 209 lock.unlock(); 210 } 211 } 212 213 214 public E poll() { 215 final ReentrantLock lock = this.lock; 216 lock.lock(); 217 try { 218 return q.poll(); 219 } finally { 220 lock.unlock(); 221 } 222 } 223 224 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 225 long nanos = unit.toNanos(timeout); 226 final ReentrantLock lock = this.lock; 227 lock.lockInterruptibly(); 228 try { 229 for (;;) { 230 E x = q.poll(); 231 if (x != null) 232 return x; 233 if (nanos <= 0) 234 return null; 235 try { 236 nanos = notEmpty.awaitNanos(nanos); 237 } catch (InterruptedException ie) { 238 notEmpty.signal(); throw ie; 240 } 241 } 242 } finally { 243 lock.unlock(); 244 } 245 } 246 247 public E peek() { 248 final ReentrantLock lock = this.lock; 249 lock.lock(); 250 try { 251 return q.peek(); 252 } finally { 253 lock.unlock(); 254 } 255 } 256 257 public int size() { 258 final ReentrantLock lock = this.lock; 259 lock.lock(); 260 try { 261 return q.size(); 262 } finally { 263 lock.unlock(); 264 } 265 } 266 267 272 public int remainingCapacity() { 273 return Integer.MAX_VALUE; 274 } 275 276 280 public boolean remove(Object o) { 281 final ReentrantLock lock = this.lock; 282 lock.lock(); 283 try { 284 return q.remove(o); 285 } finally { 286 lock.unlock(); 287 } 288 } 289 290 public boolean contains(Object o) { 291 final ReentrantLock lock = this.lock; 292 lock.lock(); 293 try { 294 return q.contains(o); 295 } finally { 296 lock.unlock(); 297 } 298 } 299 300 public Object [] toArray() { 301 final ReentrantLock lock = this.lock; 302 lock.lock(); 303 try { 304 return q.toArray(); 305 } finally { 306 lock.unlock(); 307 } 308 } 309 310 311 public String toString() { 312 final ReentrantLock lock = this.lock; 313 lock.lock(); 314 try { 315 return q.toString(); 316 } finally { 317 lock.unlock(); 318 } 319 } 320 321 public int drainTo(Collection<? super E> c) { 322 if (c == null) 323 throw new NullPointerException (); 324 if (c == this) 325 throw new IllegalArgumentException (); 326 final ReentrantLock lock = this.lock; 327 lock.lock(); 328 try { 329 int n = 0; 330 E e; 331 while ( (e = q.poll()) != null) { 332 c.add(e); 333 ++n; 334 } 335 return n; 336 } finally { 337 lock.unlock(); 338 } 339 } 340 341 public int drainTo(Collection<? super E> c, int maxElements) { 342 if (c == null) 343 throw new NullPointerException (); 344 if (c == this) 345 throw new IllegalArgumentException (); 346 if (maxElements <= 0) 347 return 0; 348 final ReentrantLock lock = this.lock; 349 lock.lock(); 350 try { 351 int n = 0; 352 E e; 353 while (n < maxElements && (e = q.poll()) != null) { 354 c.add(e); 355 ++n; 356 } 357 return n; 358 } finally { 359 lock.unlock(); 360 } 361 } 362 363 367 public void clear() { 368 final ReentrantLock lock = this.lock; 369 lock.lock(); 370 try { 371 q.clear(); 372 } finally { 373 lock.unlock(); 374 } 375 } 376 377 public <T> T[] toArray(T[] a) { 378 final ReentrantLock lock = this.lock; 379 lock.lock(); 380 try { 381 return q.toArray(a); 382 } finally { 383 lock.unlock(); 384 } 385 } 386 387 397 public Iterator<E> iterator() { 398 final ReentrantLock lock = this.lock; 399 lock.lock(); 400 try { 401 return new Itr(q.iterator()); 402 } finally { 403 lock.unlock(); 404 } 405 } 406 407 private class Itr<E> implements Iterator<E> { 408 private final Iterator<E> iter; 409 Itr(Iterator<E> i) { 410 iter = i; 411 } 412 413 public boolean hasNext() { 414 420 return iter.hasNext(); 421 } 422 423 public E next() { 424 ReentrantLock lock = PriorityBlockingQueue.this.lock; 425 lock.lock(); 426 try { 427 return iter.next(); 428 } finally { 429 lock.unlock(); 430 } 431 } 432 433 public void remove() { 434 ReentrantLock lock = PriorityBlockingQueue.this.lock; 435 lock.lock(); 436 try { 437 iter.remove(); 438 } finally { 439 lock.unlock(); 440 } 441 } 442 } 443 444 451 private void writeObject(java.io.ObjectOutputStream s) 452 throws java.io.IOException { 453 lock.lock(); 454 try { 455 s.defaultWriteObject(); 456 } finally { 457 lock.unlock(); 458 } 459 } 460 461 } 462 | Popular Tags |