1 3 package org.jgroups.util; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.TimeoutException; 9 10 import java.util.LinkedList ; 11 12 13 14 15 23 public class Queue { 24 25 26 private Element head=null, tail=null; 27 28 29 private boolean closed=false; 30 31 32 private int size=0; 33 34 35 private final Object mutex=new Object (); 36 37 38 40 41 private int num_markers=0; 42 43 50 private static final Object endMarker=new Object (); 51 52 protected static final Log log=LogFactory.getLog(Queue.class); 53 54 55 61 static class Element { 62 63 Object obj=null; 64 65 Element next=null; 66 67 71 Element(Object o) { 72 obj=o; 73 } 74 75 78 public String toString() { 79 return obj != null? obj.toString() : "null"; 80 } 81 } 82 83 84 87 public Queue() { 88 } 89 90 91 94 public Object getFirst() { 95 synchronized(mutex) { 96 return head != null? head.obj : null; 97 } 98 } 99 100 103 public Object getLast() { 104 synchronized(mutex) { 105 return tail != null? tail.obj : null; 106 } 107 } 108 109 110 116 public boolean closed() { 117 synchronized(mutex) { 118 return closed; 119 } 120 } 121 122 129 public void add(Object obj) throws QueueClosedException { 130 if(obj == null) { 131 if(log.isErrorEnabled()) log.error("argument must not be null"); 132 return; 133 } 134 135 136 synchronized(mutex) { 137 if(closed) 138 throw new QueueClosedException(); 139 if(this.num_markers > 0) 140 throw new QueueClosedException("Queue.add(): queue has been closed. You can not add more elements. " + 141 "Waiting for removal of remaining elements."); 142 143 144 Element el=new Element(obj); 145 146 if(head == null) { 147 148 149 head=el; 150 151 tail=head; 152 153 size=1; 154 } 155 else { 156 157 tail.next=el; 158 159 tail=el; 160 161 size++; 162 } 163 164 mutex.notifyAll(); 165 } 166 } 167 168 169 177 public void addAtHead(Object obj) throws QueueClosedException { 178 if(obj == null) { 179 if(log.isErrorEnabled()) log.error("argument must not be null"); 180 return; 181 } 182 183 184 synchronized(mutex) { 185 if(closed) 186 throw new QueueClosedException(); 187 if(this.num_markers > 0) 188 throw new QueueClosedException("Queue.addAtHead(): queue has been closed. You can not add more elements. " + 189 "Waiting for removal of remaining elements."); 190 191 Element el=new Element(obj); 192 193 if(head == null) { 194 195 head=el; 196 tail=head; 197 size=1; 198 } 199 else { 200 201 el.next=head; 202 203 head=el; 204 205 size++; 206 } 207 208 mutex.notifyAll(); 209 } 210 } 211 212 213 218 public Object remove() throws QueueClosedException { 219 Object retval; 220 synchronized(mutex) { 221 222 while(size == 0) { 223 if(closed) 224 throw new QueueClosedException(); 225 try { 226 mutex.wait(); 227 } 228 catch(InterruptedException ex) { 229 } 230 } 231 232 if(closed) 233 throw new QueueClosedException(); 234 235 236 retval=removeInternal(); 237 if(retval == null) 238 if(log.isErrorEnabled()) log.error("element was null, should never be the case"); 239 } 240 241 245 if(retval == endMarker) { 246 close(false); throw new QueueClosedException(); 248 } 249 return retval; 250 } 251 252 253 260 public Object remove(long timeout) throws QueueClosedException, TimeoutException { 261 Object retval; 262 263 synchronized(mutex) { 264 if(closed) 265 throw new QueueClosedException(); 266 267 268 if(size == 0) { 269 try { 270 271 mutex.wait(timeout); 272 } 273 catch(InterruptedException ex) { 274 } 275 } 276 277 278 279 retval=removeInternal(); 280 281 if(retval == null) throw new TimeoutException("timeout=" + timeout + "ms"); 282 283 284 if(retval == endMarker) { 285 close(false); 286 throw new QueueClosedException(); 287 } 288 289 return retval; 290 } 291 } 292 293 294 299 public void removeElement(Object obj) throws QueueClosedException { 300 Element el, tmp_el; 301 302 if(obj == null) { 303 if(log.isErrorEnabled()) log.error("argument must not be null"); 304 return; 305 } 306 307 synchronized(mutex) { 308 if(closed) 309 throw new QueueClosedException(); 310 311 el=head; 312 313 314 if(el == null) return; 315 316 317 if(el.obj.equals(obj)) { 318 319 head=el.next; 320 el.next=null; 321 el.obj=null; 322 326 if(size == 1) 327 tail=head; decrementSize(); 329 return; 330 } 331 332 333 while(el.next != null) { 334 if(el.next.obj.equals(obj)) { 335 tmp_el=el.next; 336 if(tmp_el == tail) tail=el; 338 el.next.obj=null; 339 el.next=el.next.next; tmp_el.next=null; 341 tmp_el.obj=null; 342 decrementSize(); 343 break; 344 } 345 el=el.next; 346 } 347 } 348 } 349 350 351 357 public Object peek() throws QueueClosedException { 358 Object retval; 359 360 synchronized(mutex) { 361 while(size == 0) { 362 if(closed) 363 throw new QueueClosedException(); 364 try { 365 mutex.wait(); 366 } 367 catch(InterruptedException ex) { 368 } 369 } 370 371 if(closed) 372 throw new QueueClosedException(); 373 374 retval=(head != null)? head.obj : null; 375 376 if(retval == null) { 378 if(log.isErrorEnabled()) log.error("retval is null: head=" + head + ", tail=" + tail + ", size()=" + size() + 380 ", num_markers=" + num_markers + ", closed=" + closed); 381 } 382 } 383 384 if(retval == endMarker) { 385 close(false); throw new QueueClosedException(); 387 } 388 389 return retval; 390 } 391 392 393 401 public Object peek(long timeout) throws QueueClosedException, TimeoutException { 402 Object retval; 403 404 synchronized(mutex) { 405 if(size == 0) { 406 if(closed) 407 throw new QueueClosedException(); 408 try { 409 mutex.wait(timeout); 410 } 411 catch(InterruptedException ex) { 412 } 413 } 414 if(closed) 415 throw new QueueClosedException(); 416 417 retval=head != null? head.obj : null; 418 419 if(retval == null) throw new TimeoutException("timeout=" + timeout + "ms"); 420 421 if(retval == endMarker) { 422 close(false); 423 throw new QueueClosedException(); 424 } 425 return retval; 426 } 427 } 428 429 430 438 public void close(boolean flush_entries) { 439 synchronized(mutex) { 440 if(flush_entries) { 441 try { 442 add(endMarker); num_markers++; 444 } 445 catch(QueueClosedException closed_ex) { 446 } 447 return; 448 } 449 closed=true; 450 mutex.notifyAll(); 451 } 452 } 453 454 455 459 public void reset() { 460 synchronized(mutex) { 461 num_markers=0; 462 if(!closed) 463 close(false); 464 size=0; 465 head=null; 466 tail=null; 467 closed=false; 468 mutex.notifyAll(); 469 } 470 } 471 472 476 public LinkedList values() { 477 LinkedList retval=new LinkedList (); 478 synchronized(mutex) { 479 Element el=head; 480 while(el != null) { 481 retval.add(el.obj); 482 el=el.next; 483 } 484 } 485 return retval; 486 } 487 488 489 492 public int size() { 493 synchronized(mutex) { 494 return size - num_markers; 495 } 496 } 497 498 501 public String toString() { 502 return "Queue (" + size() + ") messages"; 503 } 504 505 506 507 508 509 510 511 515 private Object removeInternal() { 516 Element retval; 517 Object obj; 518 519 520 if(head == null) 521 return null; 522 523 retval=head; 525 head=head.next; 526 if(head == null) 527 tail=null; 528 529 decrementSize(); 530 if(head != null && head.obj == endMarker) { 531 closed=true; 532 } 533 534 retval.next=null; 535 obj=retval.obj; 536 retval.obj=null; 537 return obj; 538 } 539 540 541 542 void decrementSize() { 543 size--; 544 if(size < 0) 545 size=0; 546 } 547 548 549 550 551 } 552 | Popular Tags |