1 22 package org.jboss.util.timeout; 23 24 import org.jboss.util.JBossStringBuilder; 25 26 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 27 28 155 public class HashedTimeoutPriorityQueueImpl implements TimeoutPriorityQueue 156 { 157 180 181 private Object topLock = new Object (); 182 183 184 private TimeoutExtImpl top; 185 186 187 private InternalPriorityQueue[] queues; 188 189 private SynchronizedBoolean cancelled = new SynchronizedBoolean(false); 190 191 194 public HashedTimeoutPriorityQueueImpl() 195 { 196 queues = new InternalPriorityQueue[40]; 197 for (int i = 0; i < queues.length; ++ i) 198 queues[i] = new InternalPriorityQueue(); 199 } 200 201 public TimeoutExt offer(long time, TimeoutTarget target) 202 { 203 if (cancelled.get()) 204 throw new IllegalStateException ("TimeoutPriorityQueue has been cancelled"); 205 if (time < 0) 206 throw new IllegalArgumentException ("Negative time"); 207 if (target == null) 208 throw new IllegalArgumentException ("Null timeout target"); 209 210 TimeoutExtImpl timeout = new TimeoutExtImpl(); 211 timeout.time = time; 212 timeout.target = target; 213 int index = timeout.hashCode() % queues.length; 214 return queues[index].offer(timeout); 215 } 216 217 public TimeoutExt take() 218 { 219 return poll(-1); 220 } 221 222 public TimeoutExt poll() 223 { 224 return poll(1); 225 } 226 227 public TimeoutExt poll(long wait) 228 { 229 long endWait = -1; 230 if (wait > 0) 231 endWait = System.currentTimeMillis() + wait; 232 synchronized (topLock) 234 { 235 while (cancelled.get() == false && (wait >= 0 || endWait == -1)) 236 { 237 if (top == null) 238 { 239 try 240 { 241 if (endWait == -1) 242 topLock.wait(); 243 else 244 topLock.wait(wait); 245 } 246 catch (InterruptedException ex) 247 { 248 } 249 } 250 else 251 { 252 long now = System.currentTimeMillis(); 253 if (top.time > now) 254 { 255 long waitForFirst = top.time - now; 256 if (endWait != -1 && waitForFirst > wait) 257 waitForFirst = wait; 258 try 259 { 260 topLock.wait(waitForFirst); 261 } 262 catch (InterruptedException ex) 263 { 264 } 265 } 266 if (cancelled.get() == false && top != null && top.time <= System.currentTimeMillis()) 267 { 268 TimeoutExtImpl result = top; 269 result.queue = null; 270 result.index = TimeoutExtImpl.TIMEOUT; 271 top = null; 272 recalculateTop(false); 273 return result; 274 } 275 } 276 if (endWait != -1) 277 wait = endWait - System.currentTimeMillis(); 278 } 279 } 280 return null; 281 } 282 283 public TimeoutExt peek() 284 { 285 synchronized (topLock) 286 { 287 return top; 288 } 289 } 290 291 public boolean remove(TimeoutExt timeout) 292 { 293 TimeoutExtImpl timeoutImpl = (TimeoutExtImpl) timeout; 294 295 InternalPriorityQueue queue = timeoutImpl.queue; 297 if (queue != null && queue.remove(timeoutImpl)) 298 return true; 299 300 synchronized (topLock) 301 { 302 if (top == timeout) 304 { 305 top.done(); 306 top = null; 307 recalculateTop(true); 308 return true; 309 } 310 311 queue = timeoutImpl.queue; 314 if (queue != null) 315 return queue.remove(timeoutImpl); 316 } 317 return false; 318 } 319 320 public void clear() 321 { 322 synchronized (topLock) 323 { 324 if (cancelled.get()) 325 return; 326 327 for (int i = 1; i < queues.length; ++i) 329 queues[i].clear(); 330 331 top = cleanupTimeoutExtImpl(top); 333 } 334 } 335 336 public void cancel() 337 { 338 synchronized (topLock) 339 { 340 if (cancelled.get()) 341 return; 342 343 clear(); 344 topLock.notifyAll(); 345 } 346 } 347 348 public int size() 349 { 350 int size = 0; 351 if (top != null) 352 size =1; 353 for (int i = 0; i < queues.length; ++i) 354 size += queues[i].size(); 355 return size; 356 } 357 358 363 public boolean isCancelled() 364 { 365 return cancelled.get(); 366 } 367 368 private void recalculateTop(boolean notify) 369 { 370 for (int i = 0; i < queues.length; ++i) 371 queues[i].compareAndSwapWithTop(notify); 372 } 373 374 379 private TimeoutExtImpl cleanupTimeoutExtImpl(TimeoutExtImpl timeout) 380 { 381 if (timeout != null) 382 timeout.target = null; 383 return null; 384 } 385 386 389 private void assertExpr(boolean expr) 390 { 391 if (!expr) 392 throw new IllegalStateException ("***** assert failed *****"); 393 } 394 395 398 private class InternalPriorityQueue 399 { 400 401 private Object lock = new Object (); 402 403 404 private int size; 405 406 407 private TimeoutExtImpl[] queue; 408 409 412 InternalPriorityQueue() 413 { 414 queue = new TimeoutExtImpl[16]; 415 size = 0; 416 } 417 418 TimeoutExt offer(TimeoutExtImpl timeout) 419 { 420 boolean checkTop = false; 421 synchronized (lock) 422 { 423 if (++size == queue.length) 426 { 427 TimeoutExtImpl[] newQ = new TimeoutExtImpl[2 * queue.length]; 428 System.arraycopy(queue, 0, newQ, 0, queue.length); 429 queue = newQ; 430 } 431 queue[size] = timeout; 434 timeout.queue = this; 435 timeout.index = size; 436 normalizeUp(size); 437 if (timeout.index == 1) 438 checkTop = true; 439 } 441 if (checkTop) 442 { 443 synchronized (topLock) 444 { 445 compareAndSwapWithTop(true); 446 } 447 } 448 return timeout; 449 } 450 451 boolean compareAndSwapWithTop(boolean notify) 452 { 453 synchronized (lock) 454 { 455 if (size == 0) 456 return false; 457 458 if (top == null) 459 { 460 top = removeNode(1); 461 top.queue = null; 462 top.index = TimeoutExtImpl.TOP; 463 if (notify) 464 topLock.notify(); 465 return top != null; 466 } 467 468 if (top.time > queue[1].time) 469 { 470 TimeoutExtImpl temp = top; 471 top = queue[1]; 472 top.queue = null; 473 top.index = TimeoutExtImpl.TOP; 474 queue[1] = temp; 475 temp.queue = this; 476 temp.index = 1; 477 if (size > 1) 478 normalizeDown(1); 479 if (notify) 480 topLock.notify(); 481 } 482 } 483 return false; 484 } 485 486 boolean remove(TimeoutExt timeout) 487 { 488 synchronized (lock) 489 { 490 TimeoutExtImpl timeoutImpl = (TimeoutExtImpl) timeout; 491 if (timeoutImpl.queue == this && timeoutImpl.index > 0) 492 { 493 removeNode(timeoutImpl.index); 497 timeoutImpl.queue = null; 499 timeoutImpl.index = TimeoutExtImpl.DONE; 500 501 return true; 503 } 504 else 505 { 506 return false; 509 } 510 } 511 } 512 513 public void clear() 514 { 515 synchronized (lock) 516 { 517 if (cancelled.get()) 518 return; 519 520 for (int i = 1; i <= size; ++i) 522 queue[i] = cleanupTimeoutExtImpl(queue[i]); 523 } 524 } 525 526 public void cancel() 527 { 528 synchronized (lock) 529 { 530 if (cancelled.get()) 531 return; 532 clear(); 533 } 534 } 535 536 public int size() 537 { 538 return size; 539 } 540 541 547 private boolean normalizeUp(int index) 548 { 549 if (index == 1) 553 return false; boolean ret = false; 555 long t = queue[index].time; 556 int p = index >> 1; 557 while (queue[p].time > t) 558 { 559 swap(p, index); 561 ret = true; 562 if (p == 1) 563 break; index = p; 565 p >>= 1; 566 } 567 return ret; 568 } 569 570 void normalizeDown(int index) 571 { 572 long t = queue[index].time; 573 int c = index << 1; 574 while (c <= size) 575 { 576 TimeoutExtImpl l = queue[c]; 578 if (c + 1 <= size) 581 { 582 TimeoutExtImpl r = queue[c + 1]; 584 if (l.time <= r.time) 587 { 588 if (t <= l.time) 589 break; swap(index, c); 591 index = c; 592 } 593 else 594 { 595 if (t <= r.time) 596 break; swap(index, c + 1); 598 index = c + 1; 599 } 600 } 601 else 602 { if (t <= l.time) 604 break; swap(index, c); 606 index = c; 607 } 608 c = index << 1; 609 } 610 } 611 612 618 private void swap(int a, int b) 619 { 620 TimeoutExtImpl temp = queue[a]; 629 queue[a] = queue[b]; 630 queue[a].index = a; 631 queue[b] = temp; 632 queue[b].index = b; 633 } 634 635 641 private TimeoutExtImpl removeNode(int index) 642 { 643 TimeoutExtImpl res = queue[index]; 646 if (index == size) 649 { 650 --size; 651 queue[index] = null; 652 return res; 653 } 654 swap(index, size); --size; 656 queue[res.index] = null; 658 if (normalizeUp(index)) 659 return res; normalizeDown(index); 661 return res; 662 } 663 664 667 void checkTree() 668 { 669 assertExpr(size >= 0); 670 assertExpr(size < queue.length); 671 assertExpr(queue[0] == null); 672 if (size > 0) 673 { 674 assertExpr(queue[1] != null); 675 assertExpr(queue[1].index == 1); 676 assertExpr(queue[1].queue == this); 677 for (int i = 2; i <= size; ++i) 678 { 679 assertExpr(queue[i] != null); 680 assertExpr(queue[i].index == i); 681 assertExpr(queue[i].queue == this); 682 assertExpr(queue[i >> 1].time <= queue[i].time); } 684 for (int i = size + 1; i < queue.length; ++i) 685 assertExpr(queue[i] == null); 686 } 687 } 688 689 } 690 691 694 private class TimeoutExtImpl implements TimeoutExt 695 { 696 697 static final int TOP = 0; 698 699 700 static final int DONE = -1; 701 702 703 static final int TIMEOUT = -2; 704 705 706 InternalPriorityQueue queue; 707 708 709 int index; 710 711 712 long time; 713 714 715 TimeoutTarget target; 716 717 public long getTime() 718 { 719 return time; 720 } 721 722 public TimeoutTarget getTimeoutTarget() 723 { 724 return target; 725 } 726 727 public void done() 728 { 729 queue = null; 730 index = DONE; 731 } 732 733 public boolean cancel() 734 { 735 return remove(this); 736 } 737 } 738 739 public String dump() 740 { 741 JBossStringBuilder buffer = new JBossStringBuilder(); 742 buffer.append("TOP="); 743 if (top == null) 744 buffer.append("null"); 745 else 746 buffer.append(top.time); 747 buffer.append(" size=").append(size()).append('\n'); 748 for (int i = 0; i < queues.length; ++i) 749 { 750 buffer.append(i).append("="); 751 for (int j = 1; j <= queues[i].size; ++j) 752 buffer.append(queues[i].queue[j].time).append(','); 753 buffer.append('\n'); 754 } 755 return buffer.toString(); 756 } 757 } 758 | Popular Tags |