KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > LinkedListQueue


1 // $Id: LinkedListQueue.java,v 1.6 2004/09/23 16:29:56 belaban Exp $
2

3 package org.jgroups.util;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.TimeoutException;
9
10 import java.util.Iterator JavaDoc;
11 import java.util.LinkedList JavaDoc;
12 import java.util.NoSuchElementException JavaDoc;
13 import java.util.Vector JavaDoc;
14
15
16 /**
17  * LinkedListQueue implementation based on java.util.Queue. Can be renamed to Queue.java and compiled if someone wants to
18  * use this implementation rather than the original Queue. However, a simple insertion and removal of 1 million
19  * objects into this queue shoed that it was ca. 15-20% slower than the original queue. We just include it in the
20  * JGroups distribution to maybe use it at a later point when it has become faster.
21  *
22  * @author Bela Ban
23  */

24 public class LinkedListQueue {
25
26     final LinkedList JavaDoc l=new LinkedList JavaDoc();
27
28     /*flag to determine the state of the Queue*/
29     boolean closed=false;
30
31     /*lock object for synchronization*/
32     final Object JavaDoc mutex=new Object JavaDoc();
33
34     /*the number of end markers that have been added*/
35     int num_markers=0;
36
37
38     /**
39      * if the queue closes during the runtime
40      * an endMarker object is added to the end of the queue to indicate that
41      * the queue will close automatically when the end marker is encountered
42      * This allows for a "soft" close.
43      *
44      * @see LinkedListQueue#close
45      */

46     private static final Object JavaDoc endMarker=new Object JavaDoc();
47
48     protected static final Log log=LogFactory.getLog(LinkedListQueue.class);
49
50
51     /**
52      * creates an empty queue
53      */

54     public LinkedListQueue() {
55     }
56
57
58     /**
59      * returns true if the Queue has been closed
60      * however, this method will return false if the queue has been closed
61      * using the close(true) method and the last element has yet not been received.
62      *
63      * @return true if the queue has been closed
64      */

65     public boolean closed() {
66         return closed;
67     }
68
69
70     /**
71      * adds an object to the tail of this queue
72      * If the queue has been closed with close(true) no exception will be
73      * thrown if the queue has not been flushed yet.
74      *
75      * @param obj - the object to be added to the queue
76      * @throws QueueClosed exception if closed() returns true
77      */

78     public void add(Object JavaDoc obj) throws QueueClosedException {
79         if(closed)
80             throw new QueueClosedException();
81         if(this.num_markers > 0)
82             throw new QueueClosedException("LinkedListQueue.add(): queue has been closed. You can not add more elements. " +
83                                            "Waiting for removal of remaining elements.");
84
85         /*lock the queue from other threads*/
86         synchronized(mutex) {
87             l.add(obj);
88
89             /*wake up all the threads that are waiting for the lock to be released*/
90             mutex.notifyAll();
91         }
92     }
93
94
95     /**
96      * Adds a new object to the head of the queue
97      * basically (obj.equals(LinkedListQueue.remove(LinkedListQueue.add(obj)))) returns true
98      * If the queue has been closed with close(true) no exception will be
99      * thrown if the queue has not been flushed yet.
100      *
101      * @param obj - the object to be added to the queue
102      * @throws QueueClosed exception if closed() returns true
103      */

104     public void addAtHead(Object JavaDoc obj) throws QueueClosedException {
105         if(closed)
106             throw new QueueClosedException();
107         if(this.num_markers > 0)
108             throw new QueueClosedException("LinkedListQueue.addAtHead(): queue has been closed. You can not add more elements. " +
109                                            "Waiting for removal of remaining elements.");
110
111         /*lock the queue from other threads*/
112         synchronized(mutex) {
113             l.addFirst(obj);
114
115             /*wake up all the threads that are waiting for the lock to be released*/
116             mutex.notifyAll();
117         }
118     }
119
120
121     /**
122      * Removes 1 element from head or <B>blocks</B>
123      * until next element has been added
124      *
125      * @return the first element to be taken of the queue
126      */

127     public Object JavaDoc remove() throws QueueClosedException {
128         Object JavaDoc retval=null;
129
130         /*lock the queue*/
131         synchronized(mutex) {
132             /*wait as long as the queue is empty*/
133             while(l.size() == 0) {
134                 if(closed)
135                     throw new QueueClosedException();
136                 try {
137                     mutex.wait();
138                 }
139                 catch(InterruptedException JavaDoc ex) {
140                 }
141             }
142
143             if(closed)
144                 throw new QueueClosedException();
145
146             /*remove the head from the queue*/
147             try {
148                 retval=l.removeFirst();
149                 if(l.size() == 1 && l.getFirst().equals(endMarker))
150                     closed=true;
151             }
152             catch(NoSuchElementException JavaDoc ex) {
153                 if(log.isErrorEnabled()) log.error("retval == null, size()=" + l.size());
154                 return null;
155             }
156
157             // we ran into an Endmarker, which means that the queue was closed before
158
// through close(true)
159
if(retval == endMarker) {
160                 close(false); // mark queue as closed
161
throw new QueueClosedException();
162             }
163         }
164
165         /*return the object, should be never null*/
166         return retval;
167     }
168
169
170     /**
171      * Removes 1 element from the head.
172      * If the queue is empty the operation will wait for timeout ms.
173      * if no object is added during the timeout time, a Timout exception is thrown
174      *
175      * @param timeout - the number of milli seconds this operation will wait before it times out
176      * @return the first object in the queue
177      */

178     public Object JavaDoc remove(long timeout) throws QueueClosedException, TimeoutException {
179         Object JavaDoc retval=null;
180
181         /*lock the queue*/
182         synchronized(mutex) {
183             /*if the queue size is zero, we want to wait until a new object is added*/
184             if(l.size() == 0) {
185                 if(closed)
186                     throw new QueueClosedException();
187                 try {
188                     /*release the mutex lock and wait no more than timeout ms*/
189                     mutex.wait(timeout);
190                 }
191                 catch(InterruptedException JavaDoc ex) {
192                 }
193             }
194             /*we either timed out, or got notified by the mutex lock object*/
195
196             /*check to see if the object closed*/
197             if(closed)
198                 throw new QueueClosedException();
199
200             /*get the next value*/
201             try {
202                 retval=l.removeFirst();
203                 if(l.size() == 1 && l.getFirst().equals(endMarker))
204                     closed=true;
205             }
206             catch(NoSuchElementException JavaDoc ex) {
207                 /*null result means we timed out*/
208                 throw new TimeoutException();
209             }
210         
211             /*if we reached an end marker we are going to close the queue*/
212             if(retval == endMarker) {
213                 close(false);
214                 throw new QueueClosedException();
215             }
216             /*at this point we actually did receive a value from the queue, return it*/
217             return retval;
218         }
219     }
220
221
222     /**
223      * removes a specific object from the queue.
224      * the object is matched up using the Object.equals method.
225      *
226      * @param obj the actual object to be removed from the queue
227      */

228     public void removeElement(Object JavaDoc obj) throws QueueClosedException {
229         boolean removed;
230
231         if(obj == null) return;
232     
233         /*lock the queue*/
234         synchronized(mutex) {
235             removed=l.remove(obj);
236             if(!removed)
237                 if(log.isWarnEnabled()) log.warn("element " + obj + " was not found in the queue");
238         }
239     }
240
241
242     /**
243      * returns the first object on the queue, without removing it.
244      * If the queue is empty this object blocks until the first queue object has
245      * been added
246      *
247      * @return the first object on the queue
248      */

249     public Object JavaDoc peek() throws QueueClosedException {
250         Object JavaDoc retval=null;
251
252         synchronized(mutex) {
253             while(l.size() == 0) {
254                 if(closed)
255                     throw new QueueClosedException();
256                 try {
257                     mutex.wait();
258                 }
259                 catch(InterruptedException JavaDoc ex) {
260                 }
261             }
262
263             if(closed)
264                 throw new QueueClosedException();
265
266             try {
267                 retval=l.getFirst();
268             }
269             catch(NoSuchElementException JavaDoc ex) {
270                 if(log.isErrorEnabled()) log.error("retval == null, size()=" + l.size());
271                 return null;
272             }
273         }
274
275         if(retval == endMarker) {
276             close(false); // mark queue as closed
277
throw new QueueClosedException();
278         }
279
280         return retval;
281     }
282
283
284     /**
285      * returns the first object on the queue, without removing it.
286      * If the queue is empty this object blocks until the first queue object has
287      * been added or the operation times out
288      *
289      * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
290      * before it times out
291      * @return the first object on the queue
292      */

293
294     public Object JavaDoc peek(long timeout) throws QueueClosedException, TimeoutException {
295         Object JavaDoc retval=null;
296
297         synchronized(mutex) {
298             if(l.size() == 0) {
299                 if(closed)
300                     throw new QueueClosedException();
301                 try {
302                     mutex.wait(timeout);
303                 }
304                 catch(InterruptedException JavaDoc ex) {
305                 }
306             }
307             if(closed)
308                 throw new QueueClosedException();
309
310
311             try {
312                 retval=l.getFirst();
313             }
314             catch(NoSuchElementException JavaDoc ex) {
315                 /*null result means we timed out*/
316                 throw new TimeoutException();
317             }
318
319             if(retval == endMarker) {
320                 close(false);
321                 throw new QueueClosedException();
322             }
323             return retval;
324         }
325     }
326
327
328     /**
329      * Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
330      * attempted on a closed queue, an exception is thrown.
331      *
332      * @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
333      * Entries may be added and removed, but when the end-of-entries marker
334      * is encountered, the queue is marked as closed. This allows to flush
335      * pending messages before closing the queue.
336      */

337     public void close(boolean flush_entries) {
338         if(flush_entries) {
339             try {
340                 add(endMarker); // add an end-of-entries marker to the end of the queue
341
num_markers++;
342             }
343             catch(QueueClosedException closed) {
344             }
345             return;
346         }
347
348         synchronized(mutex) {
349             closed=true;
350             try {
351                 mutex.notifyAll();
352             }
353             catch(Exception JavaDoc e) {
354                 if(log.isErrorEnabled()) log.error("exception=" + e);
355             }
356         }
357     }
358
359
360     /**
361      * resets the queue.
362      * This operation removes all the objects in the queue and marks the queue open
363      */

364     public void reset() {
365         num_markers=0;
366         if(!closed)
367             close(false);
368
369         synchronized(mutex) {
370             l.clear();
371             closed=false;
372         }
373     }
374
375
376     /**
377      * returns the number of objects that are currently in the queue
378      */

379     public int size() {
380         return l.size() - num_markers;
381     }
382
383     /**
384      * prints the size of the queue
385      */

386     public String JavaDoc toString() {
387         return "LinkedListQueue (" + size() + ") messages [closed=" + closed + ']';
388     }
389
390
391     /**
392      * returns a vector with all the objects currently in the queue
393      */

394     public Vector JavaDoc getContents() {
395         Vector JavaDoc retval=new Vector JavaDoc();
396
397         synchronized(mutex) {
398             for(Iterator JavaDoc it=l.iterator(); it.hasNext();) {
399                 retval.addElement(it.next());
400             }
401         }
402         return retval;
403     }
404
405
406 }
407
Popular Tags