KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > Queue2


1 // $Id: Queue2.java,v 1.5 2004/12/31 14:10:40 belaban Exp $
2

3 package org.jgroups.util;
4
5
6 import EDU.oswego.cs.dl.util.concurrent.CondVar;
7 import EDU.oswego.cs.dl.util.concurrent.Mutex;
8 import EDU.oswego.cs.dl.util.concurrent.Sync;
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.jgroups.TimeoutException;
12
13 import java.util.Vector JavaDoc;
14
15
16
17
18 /**
19  * Elements are added at the tail and removed from the head. Class is thread-safe in that
20  * 1 producer and 1 consumer may add/remove elements concurrently. The class is not
21  * explicitely designed for multiple producers or consumers. Implemented as a linked
22  * list, so that removal of an element at the head does not cause a right-shift of the
23  * remaining elements (as in a Vector-based implementation).
24  * <p>Implementation is based on util.concurrent.* classes
25  * @author Bela Ban
26  * @author Filip Hanik
27  */

28 public class Queue2 {
29     /*head and the tail of the list so that we can easily add and remove objects*/
30     Element head=null, tail=null;
31
32     /*flag to determine the state of the queue*/
33     boolean closed=false;
34
35     /*current size of the queue*/
36     int size=0;
37
38     /* Lock object for synchronization. Is notified when element is added */
39     final Sync mutex=new Mutex();
40
41     /** Signals to listeners when an element has been added */
42     final CondVar add_condvar=new CondVar(mutex);
43
44     /** Signals to listeners when an element has been removed */
45     final CondVar remove_condvar=new CondVar(mutex);
46
47     /*the number of end markers that have been added*/
48     int num_markers=0;
49
50     protected static final Log log=LogFactory.getLog(Queue2.class);
51
52
53     /**
54      * if the queue closes during the runtime
55      * an endMarker object is added to the end of the queue to indicate that
56      * the queue will close automatically when the end marker is encountered
57      * This allows for a "soft" close.
58      * @see Queue#close
59      */

60     private static final Object JavaDoc endMarker=new Object JavaDoc();
61
62     /**
63      * the class Element indicates an object in the queue.
64      * This element allows for the linked list algorithm by always holding a
65      * reference to the next element in the list.
66      * if Element.next is null, then this element is the tail of the list.
67      */

68     class Element {
69         /*the actual value stored in the queue*/
70         Object JavaDoc obj=null;
71         /*pointer to the next item in the (queue) linked list*/
72         Element next=null;
73
74         /**
75          * creates an Element object holding its value
76          * @param o - the object to be stored in the queue position
77          */

78         Element(Object JavaDoc o) {
79             obj=o;
80         }
81
82         /**
83          * prints out the value of the object
84          */

85         public String JavaDoc toString() {
86             return obj != null? obj.toString() : "null";
87         }
88     }
89
90
91     /**
92      * creates an empty queue
93      */

94     public Queue2() {
95     }
96
97
98     /**
99      * Returns the first element. Returns null if no elements are available.
100      */

101     public Object JavaDoc getFirst() {
102         return head != null? head.obj : null;
103     }
104
105     /**
106      * Returns the last element. Returns null if no elements are available.
107      */

108     public Object JavaDoc getLast() {
109         return tail != null? tail.obj : null;
110     }
111
112
113     /**
114      * returns true if the Queue has been closed
115      * however, this method will return false if the queue has been closed
116      * using the close(true) method and the last element has yet not been received.
117      * @return true if the queue has been closed
118      */

119     public boolean closed() {
120         return closed;
121     }
122
123     /**
124      * adds an object to the tail of this queue
125      * If the queue has been closed with close(true) no exception will be
126      * thrown if the queue has not been flushed yet.
127      * @param obj - the object to be added to the queue
128      * @exception QueueClosedException exception if closed() returns true
129      */

130     public void add(Object JavaDoc obj) throws QueueClosedException {
131         if(obj == null) {
132             if(log.isErrorEnabled()) log.error("argument must not be null");
133             return;
134         }
135         if(closed)
136             throw new QueueClosedException();
137         if(this.num_markers > 0)
138             throw new QueueClosedException("Queue2.add(): queue has been closed. You can not add more elements. " +
139                                            "Waiting for removal of remaining elements.");
140
141         try {
142             mutex.acquire();
143
144             /*create a new linked list element*/
145             Element el=new Element(obj);
146             /*check the first element*/
147             if(head == null) {
148                 /*the object added is the first element*/
149                 /*set the head to be this object*/
150                 head=el;
151                 /*set the tail to be this object*/
152                 tail=head;
153                 /*set the size to be one, since the queue was empty*/
154                 size=1;
155             }
156             else {
157                 /*add the object to the end of the linked list*/
158                 tail.next=el;
159                 /*set the tail to point to the last element*/
160                 tail=el;
161                 /*increase the size*/
162                 size++;
163             }
164             /*wake up all the threads that are waiting for the lock to be released*/
165             add_condvar.broadcast(); // todo: maybe signal is all we need ?
166
}
167         catch(InterruptedException JavaDoc e) {
168         }
169         finally {
170             mutex.release();
171         }
172     }
173
174
175     /**
176      * Adds a new object to the head of the queue
177      * basically (obj.equals(queue.remove(queue.add(obj)))) returns true
178      * If the queue has been closed with close(true) no exception will be
179      * thrown if the queue has not been flushed yet.
180      * @param obj - the object to be added to the queue
181      * @exception QueueClosedException exception if closed() returns true
182      *
183      */

184     public void addAtHead(Object JavaDoc obj) throws QueueClosedException {
185         if(obj == null) {
186             if(log.isErrorEnabled()) log.error("argument must not be null");
187             return;
188         }
189         if(closed)
190             throw new QueueClosedException();
191         if(this.num_markers > 0)
192             throw new QueueClosedException("Queue2.addAtHead(): queue has been closed. You can not add more elements. " +
193                                            "Waiting for removal of remaining elements.");
194
195         try {
196             mutex.acquire();
197             Element el=new Element(obj);
198             /*check the head element in the list*/
199             if(head == null) {
200                 /*this is the first object, we could have done add(obj) here*/
201                 head=el;
202                 tail=head;
203                 size=1;
204             }
205             else {
206                 /*set the head element to be the child of this one*/
207                 el.next=head;
208                 /*set the head to point to the recently added object*/
209                 head=el;
210                 /*increase the size*/
211                 size++;
212             }
213             /*wake up all the threads that are waiting for the lock to be released*/
214             add_condvar.broadcast();
215         }
216         catch(InterruptedException JavaDoc e) {
217
218         }
219         finally {
220             mutex.release();
221         }
222     }
223
224
225     /**
226      * Removes 1 element from head or <B>blocks</B>
227      * until next element has been added or until queue has been closed
228      * @return the first element to be taken of the queue
229      */

230     public Object JavaDoc remove() throws QueueClosedException {
231         /*initialize the return value*/
232         Object JavaDoc retval=null;
233
234         try {
235             mutex.acquire();
236             /*wait as long as the queue is empty. return when an element is present or queue is closed*/
237             while(size == 0) {
238                 if(closed)
239                     throw new QueueClosedException();
240                 try {
241                     add_condvar.await();
242                 }
243                 catch(InterruptedException JavaDoc ex) {
244                 }
245             }
246
247             if(closed)
248                 throw new QueueClosedException();
249
250             /*remove the head from the queue, if we make it to this point, retval should not be null !*/
251             retval=removeInternal();
252             if(retval == null)
253                 if(log.isErrorEnabled()) log.error("element was null, should never be the case");
254         }
255         catch(InterruptedException JavaDoc e) {
256             ;
257         }
258         finally {
259             mutex.release();
260         }
261
262         /*
263          * we ran into an Endmarker, which means that the queue was closed before
264          * through close(true)
265          */

266         if(retval == endMarker) {
267             close(false); // mark queue as closed
268
throw new QueueClosedException();
269         }
270
271         /*return the object*/
272         return retval;
273     }
274
275
276     /**
277      * Removes 1 element from the head.
278      * If the queue is empty the operation will wait for timeout ms.
279      * if no object is added during the timeout time, a Timout exception is thrown
280      * @param timeout - the number of milli seconds this operation will wait before it times out
281      * @return the first object in the queue
282      */

283     public Object JavaDoc remove(long timeout) throws QueueClosedException, TimeoutException {
284         Object JavaDoc retval=null;
285
286         try {
287             mutex.acquire();
288                       /*if the queue size is zero, we want to wait until a new object is added*/
289             if(size == 0) {
290                 if(closed)
291                     throw new QueueClosedException();
292                 try {
293                     /*release the mutex lock and wait no more than timeout ms*/
294                     add_condvar.timedwait(timeout);
295                 }
296                 catch(InterruptedException JavaDoc ex) {
297                 }
298             }
299             /*we either timed out, or got notified by the mutex lock object*/
300
301             /*check to see if the object closed*/
302             if(closed)
303                 throw new QueueClosedException();
304
305             /*get the next value*/
306             retval=removeInternal();
307             /*null result means we timed out*/
308             if(retval == null) throw new TimeoutException();
309
310             /*if we reached an end marker we are going to close the queue*/
311             if(retval == endMarker) {
312                 close(false);
313                 throw new QueueClosedException();
314             }
315         }
316         catch(InterruptedException JavaDoc e) {
317
318         }
319         finally {
320             mutex.release();
321         }
322
323         /*at this point we actually did receive a value from the queue, return it*/
324         return retval;
325     }
326
327
328     /**
329      * removes a specific object from the queue.
330      * the object is matched up using the Object.equals method.
331      * @param obj the actual object to be removed from the queue
332      */

333     public void removeElement(Object JavaDoc obj) throws QueueClosedException {
334         Element el, tmp_el;
335         boolean removed=false;
336
337         if(obj == null) {
338             if(log.isErrorEnabled()) log.error("argument must not be null");
339             return;
340         }
341
342         try {
343             mutex.acquire();
344             el=head;
345
346             /*the queue is empty*/
347             if(el == null) return;
348
349             /*check to see if the head element is the one to be removed*/
350             if(el.obj.equals(obj)) {
351                 /*the head element matched we will remove it*/
352                 head=el.next;
353                 el.next=null;
354                 /*check if we only had one object left
355                  *at this time the queue becomes empty
356                  *this will set the tail=head=null
357                  */

358                 if(size == 1)
359                     tail=head; // null
360
decrementSize();
361                 removed=true;
362
363                 /*and end the operation, it was successful*/
364                 return;
365             }
366
367             /*look through the other elements*/
368             while(el.next != null) {
369                 if(el.next.obj.equals(obj)) {
370                     tmp_el=el.next;
371                     if(tmp_el == tail) // if it is the last element, move tail one to the left (bela Sept 20 2002)
372
tail=el;
373                     el.next=el.next.next; // point to the el past the next one. can be null.
374
tmp_el.next=null;
375                     decrementSize();
376                     removed=true;
377                     break;
378                 }
379                 el=el.next;
380             }
381         }
382         catch(InterruptedException JavaDoc e) {
383
384         }
385         finally {
386             if(removed)
387                 remove_condvar.broadcast();
388             mutex.release();
389         }
390     }
391
392
393     /**
394      * returns the first object on the queue, without removing it.
395      * If the queue is empty this object blocks until the first queue object has
396      * been added
397      * @return the first object on the queue
398      */

399     public Object JavaDoc peek() throws QueueClosedException {
400         Object JavaDoc retval=null;
401
402         try {
403             mutex.acquire();
404             while(size == 0) {
405                 if(closed)
406                     throw new QueueClosedException();
407                 try {
408                     add_condvar.await();
409                 }
410                 catch(InterruptedException JavaDoc ex) {
411                 }
412             }
413
414             if(closed)
415                 throw new QueueClosedException();
416
417             retval=(head != null)? head.obj : null;
418
419             // @remove:
420
if(retval == null) {
421                 // print some diagnostics
422
if(log.isErrorEnabled()) log.error("retval is null: head=" + head + ", tail=" + tail + ", size()=" + size() +
423                         ", num_markers=" + num_markers + ", closed()=" + closed());
424             }
425         }
426         catch(InterruptedException JavaDoc e) {
427
428         }
429         finally {
430             mutex.release();
431         }
432
433         if(retval == endMarker) {
434             close(false); // mark queue as closed
435
throw new QueueClosedException();
436         }
437
438         return retval;
439     }
440
441
442     /**
443      * returns the first object on the queue, without removing it.
444      * If the queue is empty this object blocks until the first queue object has
445      * been added or the operation times out
446      * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
447      * before it times out
448      * @return the first object on the queue
449      */

450
451     public Object JavaDoc peek(long timeout) throws QueueClosedException, TimeoutException {
452         Object JavaDoc retval=null;
453
454         try {
455             mutex.acquire();
456             if(size == 0) {
457                 if(closed)
458                     throw new QueueClosedException();
459                 try {
460                     mutex.wait(timeout);
461                 }
462                 catch(InterruptedException JavaDoc ex) {
463                 }
464             }
465             if(closed)
466                 throw new QueueClosedException();
467
468             retval=head != null? head.obj : null;
469
470             if(retval == null) throw new TimeoutException();
471
472             if(retval == endMarker) {
473                 close(false);
474                 throw new QueueClosedException();
475             }
476         }
477         catch(InterruptedException JavaDoc e) {
478
479         }
480         finally {
481             mutex.release();
482         }
483         return retval;
484     }
485
486
487     /**
488      Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
489      attempted on a closed queue, an exception is thrown.
490      @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
491      Entries may be added and removed, but when the end-of-entries marker
492      is encountered, the queue is marked as closed. This allows to flush
493      pending messages before closing the queue.
494      */

495     public void close(boolean flush_entries) {
496         if(flush_entries) {
497             try {
498                 add(endMarker); // add an end-of-entries marker to the end of the queue
499
num_markers++;
500             }
501             catch(QueueClosedException closed) {
502             }
503             return;
504         }
505
506         try {
507             mutex.acquire();
508             closed=true;
509             try {
510                 add_condvar.broadcast();
511                 remove_condvar.broadcast();
512             }
513             catch(Exception JavaDoc e) {
514                 if(log.isErrorEnabled()) log.error("exception=" + e);
515             }
516         }
517         catch(InterruptedException JavaDoc e) {
518
519         }
520         finally {
521             mutex.release();
522         }
523     }
524
525
526     /**
527      * resets the queue.
528      * This operation removes all the objects in the queue and marks the queue open
529      */

530     public void reset() {
531         num_markers=0;
532         if(!closed)
533             close(false);
534
535         try {
536             mutex.acquire();
537             size=0;
538             head=null;
539             tail=null;
540             closed=false;
541         }
542         catch(InterruptedException JavaDoc e) {
543
544         }
545         finally {
546             mutex.release();
547         }
548     }
549
550
551     /**
552      * returns the number of objects that are currently in the queue
553      */

554     public int size() {
555         return size - num_markers;
556     }
557
558     /**
559      * prints the size of the queue
560      */

561     public String JavaDoc toString() {
562         return "Queue2 (" + size() + ") messages";
563     }
564
565     /**
566      * Dumps internal state @remove
567      */

568     public String JavaDoc debug() {
569         return toString() + ", head=" + head + ", tail=" + tail + ", closed()=" + closed() + ", contents=" + getContents();
570     }
571
572
573     /**
574      * returns a vector with all the objects currently in the queue
575      */

576     public Vector JavaDoc getContents() {
577         Vector JavaDoc retval=new Vector JavaDoc();
578         Element el;
579
580         try {
581             mutex.acquire();
582             el=head;
583             while(el != null) {
584                 retval.addElement(el.obj);
585                 el=el.next;
586             }
587         }
588         catch(InterruptedException JavaDoc e) {
589
590         }
591         finally {
592             mutex.release();
593         }
594         return retval;
595     }
596
597
598     /**
599      * Blocks until the queue has no elements left. If the queue is empty, the call will return
600      * immediately
601      * @param timeout Call returns if timeout has elapsed (number of milliseconds). 0 means to wait forever
602      * @throws QueueClosedException Thrown if queue has been closed
603      * @throws TimeoutException Thrown if timeout has elapsed
604      */

605     public void waitUntilEmpty(long timeout) throws QueueClosedException, TimeoutException {
606         long time_to_wait=timeout >=0? timeout : 0; // eliminate negative values
607

608         try {
609             mutex.acquire();
610
611             if(timeout == 0) {
612                 while(size > 0 && closed == false) {
613                     remove_condvar.await();
614                 }
615             }
616             else {
617                 long start_time=System.currentTimeMillis();
618
619                 while(time_to_wait > 0 && size > 0 && closed == false) {
620                     try {
621                         remove_condvar.timedwait(time_to_wait);
622                     }
623                     catch(InterruptedException JavaDoc ex) {
624
625                     }
626                     time_to_wait=timeout - (System.currentTimeMillis() - start_time);
627                 }
628
629                 if(size > 0)
630                     throw new TimeoutException("queue has " + size + " elements");
631             }
632
633             if(closed)
634                 throw new QueueClosedException();
635         }
636         catch(InterruptedException JavaDoc e) {
637
638         }
639         finally {
640             mutex.release();
641         }
642     }
643
644
645     /* ------------------------------------- Private Methods ----------------------------------- */
646
647
648     /**
649      * Removes the first element. Returns null if no elements in queue.
650      * Always called with mutex locked (we don't have to lock mutex ourselves)
651      */

652     private Object JavaDoc removeInternal() {
653         Element retval;
654
655         /*if the head is null, the queue is empty*/
656         if(head == null)
657             return null;
658
659         retval=head; // head must be non-null now
660

661         head=head.next;
662         if(head == null)
663             tail=null;
664
665         decrementSize();
666
667         remove_condvar.broadcast(); // todo: correct ?
668

669         if(head != null && head.obj == endMarker) {
670             closed=true;
671         }
672
673         retval.next=null;
674         return retval.obj;
675     }
676
677
678     void decrementSize() {
679         size--;
680         if(size < 0)
681             size=0;
682     }
683
684
685     /* ---------------------------------- End of Private Methods -------------------------------- */
686
687 }
688
Popular Tags