KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > Queue


1 // $Id: Queue.java,v 1.22 2005/04/11 12:54:56 belaban Exp $
2

3 package org.jgroups.util;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.TimeoutException;
9
10 import java.util.LinkedList JavaDoc;
11
12
13
14
15 /**
16  * Elements are added at the tail and removed from the head. Class is thread-safe in that
17  * 1 producer and 1 consumer may add/remove elements concurrently. The class is not
18  * explicitely designed for multiple producers or consumers. Implemented as a linked
19  * list, so that removal of an element at the head does not cause a right-shift of the
20  * remaining elements (as in a Vector-based implementation).
21  * @author Bela Ban
22  */

23 public class Queue {
24
25     /*head and the tail of the list so that we can easily add and remove objects*/
26     private Element head=null, tail=null;
27
28     /*flag to determine the state of the queue*/
29     private boolean closed=false;
30
31     /*current size of the queue*/
32     private int size=0;
33
34     /* Lock object for synchronization. Is notified when element is added */
35     private final Object JavaDoc mutex=new Object JavaDoc();
36
37     /** Lock object for syncing on removes. It is notified when an object is removed */
38     // Object remove_mutex=new Object();
39

40     /*the number of end markers that have been added*/
41     private int num_markers=0;
42
43     /**
44      * if the queue closes during the runtime
45      * an endMarker object is added to the end of the queue to indicate that
46      * the queue will close automatically when the end marker is encountered
47      * This allows for a "soft" close.
48      * @see Queue#close
49      */

50     private static final Object JavaDoc endMarker=new Object JavaDoc();
51
52     protected static final Log log=LogFactory.getLog(Queue.class);
53
54     
55     /**
56      * the class Element indicates an object in the queue.
57      * This element allows for the linked list algorithm by always holding a
58      * reference to the next element in the list.
59      * if Element.next is null, then this element is the tail of the list.
60      */

61     static class Element {
62         /*the actual value stored in the queue*/
63         Object JavaDoc obj=null;
64         /*pointer to the next item in the (queue) linked list*/
65         Element next=null;
66
67         /**
68          * creates an Element object holding its value
69          * @param o - the object to be stored in the queue position
70          */

71         Element(Object JavaDoc o) {
72             obj=o;
73         }
74
75         /**
76          * prints out the value of the object
77          */

78         public String JavaDoc toString() {
79             return obj != null? obj.toString() : "null";
80         }
81     }
82
83
84     /**
85      * creates an empty queue
86      */

87     public Queue() {
88     }
89
90
91     /**
92      * Returns the first element. Returns null if no elements are available.
93      */

94     public Object JavaDoc getFirst() {
95         synchronized(mutex) {
96             return head != null? head.obj : null;
97         }
98     }
99
100     /**
101      * Returns the last element. Returns null if no elements are available.
102      */

103     public Object JavaDoc getLast() {
104         synchronized(mutex) {
105             return tail != null? tail.obj : null;
106         }
107     }
108
109
110     /**
111      * returns true if the Queue has been closed
112      * however, this method will return false if the queue has been closed
113      * using the close(true) method and the last element has yet not been received.
114      * @return true if the queue has been closed
115      */

116     public boolean closed() {
117         synchronized(mutex) {
118             return closed;
119         }
120     }
121
122     /**
123      * adds an object to the tail of this queue
124      * If the queue has been closed with close(true) no exception will be
125      * thrown if the queue has not been flushed yet.
126      * @param obj - the object to be added to the queue
127      * @exception QueueClosedException exception if closed() returns true
128      */

129     public void add(Object JavaDoc obj) throws QueueClosedException {
130         if(obj == null) {
131             if(log.isErrorEnabled()) log.error("argument must not be null");
132             return;
133         }
134
135         /*lock the queue from other threads*/
136         synchronized(mutex) {
137            if(closed)
138               throw new QueueClosedException();
139            if(this.num_markers > 0)
140               throw new QueueClosedException("Queue.add(): queue has been closed. You can not add more elements. " +
141                                              "Waiting for removal of remaining elements.");
142
143             /*create a new linked list element*/
144             Element el=new Element(obj);
145             /*check the first element*/
146             if(head == null) {
147                 /*the object added is the first element*/
148                 /*set the head to be this object*/
149                 head=el;
150                 /*set the tail to be this object*/
151                 tail=head;
152                 /*set the size to be one, since the queue was empty*/
153                 size=1;
154             }
155             else {
156                 /*add the object to the end of the linked list*/
157                 tail.next=el;
158                 /*set the tail to point to the last element*/
159                 tail=el;
160                 /*increase the size*/
161                 size++;
162             }
163             /*wake up all the threads that are waiting for the lock to be released*/
164             mutex.notifyAll();
165         }
166     }
167
168
169     /**
170      * Adds a new object to the head of the queue
171      * basically (obj.equals(queue.remove(queue.add(obj)))) returns true
172      * If the queue has been closed with close(true) no exception will be
173      * thrown if the queue has not been flushed yet.
174      * @param obj - the object to be added to the queue
175      * @exception QueueClosedException exception if closed() returns true
176      */

177     public void addAtHead(Object JavaDoc obj) throws QueueClosedException {
178         if(obj == null) {
179             if(log.isErrorEnabled()) log.error("argument must not be null");
180             return;
181         }
182
183         /*lock the queue from other threads*/
184         synchronized(mutex) {
185            if(closed)
186               throw new QueueClosedException();
187            if(this.num_markers > 0)
188               throw new QueueClosedException("Queue.addAtHead(): queue has been closed. You can not add more elements. " +
189                                              "Waiting for removal of remaining elements.");
190
191             Element el=new Element(obj);
192             /*check the head element in the list*/
193             if(head == null) {
194                 /*this is the first object, we could have done add(obj) here*/
195                 head=el;
196                 tail=head;
197                 size=1;
198             }
199             else {
200                 /*set the head element to be the child of this one*/
201                 el.next=head;
202                 /*set the head to point to the recently added object*/
203                 head=el;
204                 /*increase the size*/
205                 size++;
206             }
207             /*wake up all the threads that are waiting for the lock to be released*/
208             mutex.notifyAll();
209         }
210     }
211
212
213     /**
214      * Removes 1 element from head or <B>blocks</B>
215      * until next element has been added or until queue has been closed
216      * @return the first element to be taken of the queue
217      */

218     public Object JavaDoc remove() throws QueueClosedException {
219         Object JavaDoc retval;
220         synchronized(mutex) {
221             /*wait as long as the queue is empty. return when an element is present or queue is closed*/
222             while(size == 0) {
223                 if(closed)
224                     throw new QueueClosedException();
225                 try {
226                     mutex.wait();
227                 }
228                 catch(InterruptedException JavaDoc ex) {
229                 }
230             }
231
232             if(closed)
233                 throw new QueueClosedException();
234
235             /*remove the head from the queue, if we make it to this point, retval should not be null !*/
236             retval=removeInternal();
237             if(retval == null)
238                 if(log.isErrorEnabled()) log.error("element was null, should never be the case");
239         }
240
241         /*
242          * we ran into an Endmarker, which means that the queue was closed before
243          * through close(true)
244          */

245         if(retval == endMarker) {
246             close(false); // mark queue as closed
247
throw new QueueClosedException();
248         }
249         return retval;
250     }
251
252
253     /**
254      * Removes 1 element from the head.
255      * If the queue is empty the operation will wait for timeout ms.
256      * if no object is added during the timeout time, a Timout exception is thrown
257      * @param timeout - the number of milli seconds this operation will wait before it times out
258      * @return the first object in the queue
259      */

260     public Object JavaDoc remove(long timeout) throws QueueClosedException, TimeoutException {
261         Object JavaDoc retval;
262
263         synchronized(mutex) {
264             if(closed)
265                 throw new QueueClosedException();
266
267             /*if the queue size is zero, we want to wait until a new object is added*/
268             if(size == 0) {
269                 try {
270                     /*release the mutex lock and wait no more than timeout ms*/
271                     mutex.wait(timeout);
272                 }
273                 catch(InterruptedException JavaDoc ex) {
274                 }
275             }
276             /*we either timed out, or got notified by the mutex lock object*/
277
278             /*get the next value*/
279             retval=removeInternal();
280             /*null result means we timed out*/
281             if(retval == null) throw new TimeoutException("timeout=" + timeout + "ms");
282
283             /*if we reached an end marker we are going to close the queue*/
284             if(retval == endMarker) {
285                 close(false);
286                 throw new QueueClosedException();
287             }
288             /*at this point we actually did receive a value from the queue, return it*/
289             return retval;
290         }
291     }
292
293
294     /**
295      * removes a specific object from the queue.
296      * the object is matched up using the Object.equals method.
297      * @param obj the actual object to be removed from the queue
298      */

299     public void removeElement(Object JavaDoc obj) throws QueueClosedException {
300         Element el, tmp_el;
301
302         if(obj == null) {
303             if(log.isErrorEnabled()) log.error("argument must not be null");
304             return;
305         }
306
307         synchronized(mutex) {
308             if(closed) /*check to see if the queue is closed*/
309                 throw new QueueClosedException();
310
311             el=head;
312
313             /*the queue is empty*/
314             if(el == null) return;
315
316             /*check to see if the head element is the one to be removed*/
317             if(el.obj.equals(obj)) {
318                 /*the head element matched we will remove it*/
319                 head=el.next;
320                 el.next=null;
321                 el.obj=null;
322                 /*check if we only had one object left
323                  *at this time the queue becomes empty
324                  *this will set the tail=head=null
325                  */

326                 if(size == 1)
327                     tail=head; // null
328
decrementSize();
329                 return;
330             }
331
332             /*look through the other elements*/
333             while(el.next != null) {
334                 if(el.next.obj.equals(obj)) {
335                     tmp_el=el.next;
336                     if(tmp_el == tail) // if it is the last element, move tail one to the left (bela Sept 20 2002)
337
tail=el;
338                     el.next.obj=null;
339                     el.next=el.next.next; // point to the el past the next one. can be null.
340
tmp_el.next=null;
341                     tmp_el.obj=null;
342                     decrementSize();
343                     break;
344                 }
345                 el=el.next;
346             }
347         }
348     }
349
350
351     /**
352      * returns the first object on the queue, without removing it.
353      * If the queue is empty this object blocks until the first queue object has
354      * been added
355      * @return the first object on the queue
356      */

357     public Object JavaDoc peek() throws QueueClosedException {
358         Object JavaDoc retval;
359
360         synchronized(mutex) {
361             while(size == 0) {
362                 if(closed)
363                     throw new QueueClosedException();
364                 try {
365                     mutex.wait();
366                 }
367                 catch(InterruptedException JavaDoc ex) {
368                 }
369             }
370
371             if(closed)
372                 throw new QueueClosedException();
373
374             retval=(head != null)? head.obj : null;
375
376             // @remove:
377
if(retval == null) {
378                 // print some diagnostics
379
if(log.isErrorEnabled()) log.error("retval is null: head=" + head + ", tail=" + tail + ", size()=" + size() +
380                                             ", num_markers=" + num_markers + ", closed=" + closed);
381             }
382         }
383
384         if(retval == endMarker) {
385             close(false); // mark queue as closed
386
throw new QueueClosedException();
387         }
388
389         return retval;
390     }
391
392
393     /**
394      * returns the first object on the queue, without removing it.
395      * If the queue is empty this object blocks until the first queue object has
396      * been added or the operation times out
397      * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
398      * before it times out
399      * @return the first object on the queue
400      */

401     public Object JavaDoc peek(long timeout) throws QueueClosedException, TimeoutException {
402         Object JavaDoc retval;
403
404         synchronized(mutex) {
405             if(size == 0) {
406                 if(closed)
407                     throw new QueueClosedException();
408                 try {
409                     mutex.wait(timeout);
410                 }
411                 catch(InterruptedException JavaDoc ex) {
412                 }
413             }
414             if(closed)
415                 throw new QueueClosedException();
416
417             retval=head != null? head.obj : null;
418
419             if(retval == null) throw new TimeoutException("timeout=" + timeout + "ms");
420
421             if(retval == endMarker) {
422                 close(false);
423                 throw new QueueClosedException();
424             }
425             return retval;
426         }
427     }
428
429
430     /**
431      Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
432      attempted on a closed queue, an exception is thrown.
433      @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
434      Entries may be added and removed, but when the end-of-entries marker
435      is encountered, the queue is marked as closed. This allows to flush
436      pending messages before closing the queue.
437      */

438     public void close(boolean flush_entries) {
439         synchronized(mutex) {
440             if(flush_entries) {
441                 try {
442                     add(endMarker); // add an end-of-entries marker to the end of the queue
443
num_markers++;
444                 }
445                 catch(QueueClosedException closed_ex) {
446                 }
447                 return;
448             }
449             closed=true;
450             mutex.notifyAll();
451         }
452     }
453
454
455     /**
456      * resets the queue.
457      * This operation removes all the objects in the queue and marks the queue open
458      */

459     public void reset() {
460         synchronized(mutex) {
461            num_markers=0;
462            if(!closed)
463               close(false);
464             size=0;
465             head=null;
466             tail=null;
467             closed=false;
468             mutex.notifyAll();
469         }
470     }
471
472     /**
473      * Returns all the elements of the queue
474      * @return A copy of the queue
475      */

476     public LinkedList JavaDoc values() {
477         LinkedList JavaDoc retval=new LinkedList JavaDoc();
478         synchronized(mutex) {
479             Element el=head;
480             while(el != null) {
481                 retval.add(el.obj);
482                 el=el.next;
483             }
484         }
485         return retval;
486     }
487
488
489     /**
490      * returns the number of objects that are currently in the queue
491      */

492     public int size() {
493         synchronized(mutex) {
494             return size - num_markers;
495         }
496     }
497
498     /**
499      * prints the size of the queue
500      */

501     public String JavaDoc toString() {
502         return "Queue (" + size() + ") messages";
503     }
504
505
506
507
508     /* ------------------------------------- Private Methods ----------------------------------- */
509
510
511     /**
512      * Removes the first element. Returns null if no elements in queue.
513      * Always called with mutex locked (we don't have to lock mutex ourselves)
514      */

515     private Object JavaDoc removeInternal() {
516         Element retval;
517         Object JavaDoc obj;
518
519         /*if the head is null, the queue is empty*/
520         if(head == null)
521             return null;
522
523         retval=head; // head must be non-null now
524

525         head=head.next;
526         if(head == null)
527             tail=null;
528
529         decrementSize();
530         if(head != null && head.obj == endMarker) {
531             closed=true;
532         }
533
534         retval.next=null;
535         obj=retval.obj;
536         retval.obj=null;
537         return obj;
538     }
539
540
541     /** Doesn't need to be synchronized; is always called from synchronized methods */
542     void decrementSize() {
543         size--;
544         if(size < 0)
545             size=0;
546     }
547
548
549     /* ---------------------------------- End of Private Methods -------------------------------- */
550
551 }
552
Popular Tags