1 3 package org.jgroups.util; 4 5 6 import EDU.oswego.cs.dl.util.concurrent.CondVar; 7 import EDU.oswego.cs.dl.util.concurrent.Mutex; 8 import EDU.oswego.cs.dl.util.concurrent.Sync; 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jgroups.TimeoutException; 12 13 import java.util.Vector ; 14 15 16 17 18 28 public class Queue2 { 29 30 Element head=null, tail=null; 31 32 33 boolean closed=false; 34 35 36 int size=0; 37 38 39 final Sync mutex=new Mutex(); 40 41 42 final CondVar add_condvar=new CondVar(mutex); 43 44 45 final CondVar remove_condvar=new CondVar(mutex); 46 47 48 int num_markers=0; 49 50 protected static final Log log=LogFactory.getLog(Queue2.class); 51 52 53 60 private static final Object endMarker=new Object (); 61 62 68 class Element { 69 70 Object obj=null; 71 72 Element next=null; 73 74 78 Element(Object o) { 79 obj=o; 80 } 81 82 85 public String toString() { 86 return obj != null? obj.toString() : "null"; 87 } 88 } 89 90 91 94 public Queue2() { 95 } 96 97 98 101 public Object getFirst() { 102 return head != null? head.obj : null; 103 } 104 105 108 public Object getLast() { 109 return tail != null? tail.obj : null; 110 } 111 112 113 119 public boolean closed() { 120 return closed; 121 } 122 123 130 public void add(Object obj) throws QueueClosedException { 131 if(obj == null) { 132 if(log.isErrorEnabled()) log.error("argument must not be null"); 133 return; 134 } 135 if(closed) 136 throw new QueueClosedException(); 137 if(this.num_markers > 0) 138 throw new QueueClosedException("Queue2.add(): queue has been closed. You can not add more elements. " + 139 "Waiting for removal of remaining elements."); 140 141 try { 142 mutex.acquire(); 143 144 145 Element el=new Element(obj); 146 147 if(head == null) { 148 149 150 head=el; 151 152 tail=head; 153 154 size=1; 155 } 156 else { 157 158 tail.next=el; 159 160 tail=el; 161 162 size++; 163 } 164 165 add_condvar.broadcast(); } 167 catch(InterruptedException e) { 168 } 169 finally { 170 mutex.release(); 171 } 172 } 173 174 175 184 public void addAtHead(Object obj) throws QueueClosedException { 185 if(obj == null) { 186 if(log.isErrorEnabled()) log.error("argument must not be null"); 187 return; 188 } 189 if(closed) 190 throw new QueueClosedException(); 191 if(this.num_markers > 0) 192 throw new QueueClosedException("Queue2.addAtHead(): queue has been closed. You can not add more elements. " + 193 "Waiting for removal of remaining elements."); 194 195 try { 196 mutex.acquire(); 197 Element el=new Element(obj); 198 199 if(head == null) { 200 201 head=el; 202 tail=head; 203 size=1; 204 } 205 else { 206 207 el.next=head; 208 209 head=el; 210 211 size++; 212 } 213 214 add_condvar.broadcast(); 215 } 216 catch(InterruptedException e) { 217 218 } 219 finally { 220 mutex.release(); 221 } 222 } 223 224 225 230 public Object remove() throws QueueClosedException { 231 232 Object retval=null; 233 234 try { 235 mutex.acquire(); 236 237 while(size == 0) { 238 if(closed) 239 throw new QueueClosedException(); 240 try { 241 add_condvar.await(); 242 } 243 catch(InterruptedException ex) { 244 } 245 } 246 247 if(closed) 248 throw new QueueClosedException(); 249 250 251 retval=removeInternal(); 252 if(retval == null) 253 if(log.isErrorEnabled()) log.error("element was null, should never be the case"); 254 } 255 catch(InterruptedException e) { 256 ; 257 } 258 finally { 259 mutex.release(); 260 } 261 262 266 if(retval == endMarker) { 267 close(false); throw new QueueClosedException(); 269 } 270 271 272 return retval; 273 } 274 275 276 283 public Object remove(long timeout) throws QueueClosedException, TimeoutException { 284 Object retval=null; 285 286 try { 287 mutex.acquire(); 288 289 if(size == 0) { 290 if(closed) 291 throw new QueueClosedException(); 292 try { 293 294 add_condvar.timedwait(timeout); 295 } 296 catch(InterruptedException ex) { 297 } 298 } 299 300 301 302 if(closed) 303 throw new QueueClosedException(); 304 305 306 retval=removeInternal(); 307 308 if(retval == null) throw new TimeoutException(); 309 310 311 if(retval == endMarker) { 312 close(false); 313 throw new QueueClosedException(); 314 } 315 } 316 catch(InterruptedException e) { 317 318 } 319 finally { 320 mutex.release(); 321 } 322 323 324 return retval; 325 } 326 327 328 333 public void removeElement(Object obj) throws QueueClosedException { 334 Element el, tmp_el; 335 boolean removed=false; 336 337 if(obj == null) { 338 if(log.isErrorEnabled()) log.error("argument must not be null"); 339 return; 340 } 341 342 try { 343 mutex.acquire(); 344 el=head; 345 346 347 if(el == null) return; 348 349 350 if(el.obj.equals(obj)) { 351 352 head=el.next; 353 el.next=null; 354 358 if(size == 1) 359 tail=head; decrementSize(); 361 removed=true; 362 363 364 return; 365 } 366 367 368 while(el.next != null) { 369 if(el.next.obj.equals(obj)) { 370 tmp_el=el.next; 371 if(tmp_el == tail) tail=el; 373 el.next=el.next.next; tmp_el.next=null; 375 decrementSize(); 376 removed=true; 377 break; 378 } 379 el=el.next; 380 } 381 } 382 catch(InterruptedException e) { 383 384 } 385 finally { 386 if(removed) 387 remove_condvar.broadcast(); 388 mutex.release(); 389 } 390 } 391 392 393 399 public Object peek() throws QueueClosedException { 400 Object retval=null; 401 402 try { 403 mutex.acquire(); 404 while(size == 0) { 405 if(closed) 406 throw new QueueClosedException(); 407 try { 408 add_condvar.await(); 409 } 410 catch(InterruptedException ex) { 411 } 412 } 413 414 if(closed) 415 throw new QueueClosedException(); 416 417 retval=(head != null)? head.obj : null; 418 419 if(retval == null) { 421 if(log.isErrorEnabled()) log.error("retval is null: head=" + head + ", tail=" + tail + ", size()=" + size() + 423 ", num_markers=" + num_markers + ", closed()=" + closed()); 424 } 425 } 426 catch(InterruptedException e) { 427 428 } 429 finally { 430 mutex.release(); 431 } 432 433 if(retval == endMarker) { 434 close(false); throw new QueueClosedException(); 436 } 437 438 return retval; 439 } 440 441 442 450 451 public Object peek(long timeout) throws QueueClosedException, TimeoutException { 452 Object retval=null; 453 454 try { 455 mutex.acquire(); 456 if(size == 0) { 457 if(closed) 458 throw new QueueClosedException(); 459 try { 460 mutex.wait(timeout); 461 } 462 catch(InterruptedException ex) { 463 } 464 } 465 if(closed) 466 throw new QueueClosedException(); 467 468 retval=head != null? head.obj : null; 469 470 if(retval == null) throw new TimeoutException(); 471 472 if(retval == endMarker) { 473 close(false); 474 throw new QueueClosedException(); 475 } 476 } 477 catch(InterruptedException e) { 478 479 } 480 finally { 481 mutex.release(); 482 } 483 return retval; 484 } 485 486 487 495 public void close(boolean flush_entries) { 496 if(flush_entries) { 497 try { 498 add(endMarker); num_markers++; 500 } 501 catch(QueueClosedException closed) { 502 } 503 return; 504 } 505 506 try { 507 mutex.acquire(); 508 closed=true; 509 try { 510 add_condvar.broadcast(); 511 remove_condvar.broadcast(); 512 } 513 catch(Exception e) { 514 if(log.isErrorEnabled()) log.error("exception=" + e); 515 } 516 } 517 catch(InterruptedException e) { 518 519 } 520 finally { 521 mutex.release(); 522 } 523 } 524 525 526 530 public void reset() { 531 num_markers=0; 532 if(!closed) 533 close(false); 534 535 try { 536 mutex.acquire(); 537 size=0; 538 head=null; 539 tail=null; 540 closed=false; 541 } 542 catch(InterruptedException e) { 543 544 } 545 finally { 546 mutex.release(); 547 } 548 } 549 550 551 554 public int size() { 555 return size - num_markers; 556 } 557 558 561 public String toString() { 562 return "Queue2 (" + size() + ") messages"; 563 } 564 565 568 public String debug() { 569 return toString() + ", head=" + head + ", tail=" + tail + ", closed()=" + closed() + ", contents=" + getContents(); 570 } 571 572 573 576 public Vector getContents() { 577 Vector retval=new Vector (); 578 Element el; 579 580 try { 581 mutex.acquire(); 582 el=head; 583 while(el != null) { 584 retval.addElement(el.obj); 585 el=el.next; 586 } 587 } 588 catch(InterruptedException e) { 589 590 } 591 finally { 592 mutex.release(); 593 } 594 return retval; 595 } 596 597 598 605 public void waitUntilEmpty(long timeout) throws QueueClosedException, TimeoutException { 606 long time_to_wait=timeout >=0? timeout : 0; 608 try { 609 mutex.acquire(); 610 611 if(timeout == 0) { 612 while(size > 0 && closed == false) { 613 remove_condvar.await(); 614 } 615 } 616 else { 617 long start_time=System.currentTimeMillis(); 618 619 while(time_to_wait > 0 && size > 0 && closed == false) { 620 try { 621 remove_condvar.timedwait(time_to_wait); 622 } 623 catch(InterruptedException ex) { 624 625 } 626 time_to_wait=timeout - (System.currentTimeMillis() - start_time); 627 } 628 629 if(size > 0) 630 throw new TimeoutException("queue has " + size + " elements"); 631 } 632 633 if(closed) 634 throw new QueueClosedException(); 635 } 636 catch(InterruptedException e) { 637 638 } 639 finally { 640 mutex.release(); 641 } 642 } 643 644 645 646 647 648 652 private Object removeInternal() { 653 Element retval; 654 655 656 if(head == null) 657 return null; 658 659 retval=head; 661 head=head.next; 662 if(head == null) 663 tail=null; 664 665 decrementSize(); 666 667 remove_condvar.broadcast(); 669 if(head != null && head.obj == endMarker) { 670 closed=true; 671 } 672 673 retval.next=null; 674 return retval.obj; 675 } 676 677 678 void decrementSize() { 679 size--; 680 if(size < 0) 681 size=0; 682 } 683 684 685 686 687 } 688 | Popular Tags |