KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > excalibur > event > impl > DefaultQueue


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12  * implied.
13  *
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.excalibur.event.impl;
18
19 import org.apache.commons.collections.Buffer;
20 import org.apache.commons.collections.UnboundedFifoBuffer;
21 import org.apache.excalibur.event.EnqueuePredicate;
22 import org.apache.excalibur.event.PreparedEnqueue;
23 import org.apache.excalibur.event.SinkException;
24 import org.apache.excalibur.event.SinkFullException;
25
26 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
27
28 /**
29  * The default queue implementation is a variable size queue. This queue is
30  * thread safe, however the overhead in synchronization costs a few extra
31  * milliseconds.
32  *
33  * @author <a HREF="mailto:dev@avalon.apache.org">Avalon Development Team</a>
34  */

35 public final class DefaultQueue extends AbstractQueue
36 {
37     private final Buffer m_elements;
38     private final ReentrantLock m_mutex;
39     protected int m_reserve;
40     private final int m_maxSize;
41
42     /**
43      * Construct a new DefaultQueue with the specified number of elements.
44      * if the number of elements is greater than zero, then the
45      * <code>Queue</code> is bounded by that number. Otherwise, the
46      * <code>Queue</code> is not bounded at all.
47      *
48      * @param size The maximum number of elements in the <code>Queue</code>.
49      * Any number less than 1 means there is no limit.
50      */

51     public DefaultQueue( int size )
52     {
53         this( new ThresholdEnqueuePredicate( size ) );
54     }
55
56     public DefaultQueue( EnqueuePredicate predicate )
57     {
58         setEnqueuePredicate( predicate );
59
60         m_mutex = new ReentrantLock();
61         m_elements = new UnboundedFifoBuffer();
62         m_reserve = 0;
63         m_maxSize = -1;
64     }
65
66     /**
67      * Create an unbounded DefaultQueue.
68      */

69     public DefaultQueue()
70     {
71         this( new NullEnqueuePredicate() );
72     }
73
74     /**
75      * Return the number of elements currently in the <code>Queue</code>.
76      *
77      * @return <code>int</code> representing the number of elements (including the reserved ones).
78      */

79     public int size()
80     {
81         return m_elements.size() + m_reserve;
82     }
83
84     /**
85      * Return the maximum number of elements that will fit in the
86      * <code>Queue</code>. A number below 1 indecates an unbounded
87      * <code>Queue</code>, which means there is no limit.
88      *
89      * @return <code>int</code> representing the maximum number of elements
90      */

91     public int maxSize()
92     {
93         return m_maxSize;
94     }
95
96     public PreparedEnqueue prepareEnqueue( final Object JavaDoc[] elements )
97         throws SinkException
98     {
99         PreparedEnqueue enqueue = null;
100
101         try
102         {
103             m_mutex.acquire();
104             try
105             {
106                 if( getEnqueuePredicate().accept(elements, this) )
107                 {
108                     enqueue = new DefaultPreparedEnqueue( this, elements );
109                 }
110                 else
111                 {
112                     throw new SinkFullException( "Not enough room to enqueue these elements." );
113                 }
114             }
115             finally
116             {
117                 m_mutex.release();
118             }
119         }
120         catch( InterruptedException JavaDoc ie )
121         {
122             if ( null == enqueue )
123             {
124                 throw new SinkException("The mutex was interrupted before it could be released");
125             }
126         }
127
128         return enqueue;
129     }
130
131     public boolean tryEnqueue( final Object JavaDoc element )
132     {
133         boolean success = false;
134
135         try
136         {
137             m_mutex.acquire();
138             try
139             {
140                 success = getEnqueuePredicate().accept( element, this );
141
142                 if ( success )
143                 {
144                     m_elements.add( element );
145                 }
146             }
147             finally
148             {
149                 m_mutex.release();
150             }
151         }
152         catch( InterruptedException JavaDoc ie )
153         {
154         }
155
156         return success;
157     }
158
159     public void enqueue( final Object JavaDoc[] elements )
160         throws SinkException
161     {
162         final int len = elements.length;
163
164         try
165         {
166             m_mutex.acquire();
167             try
168             {
169                 if( ! getEnqueuePredicate().accept( elements, this ) )
170                 {
171                     throw new SinkFullException( "Not enough room to enqueue these elements." );
172                 }
173
174                 for( int i = 0; i < len; i++ )
175                 {
176                     m_elements.add( elements[ i ] );
177                 }
178             }
179             finally
180             {
181                 m_mutex.release();
182             }
183         }
184         catch( InterruptedException JavaDoc ie )
185         {
186         }
187     }
188
189     public void enqueue( final Object JavaDoc element )
190         throws SinkException
191     {
192         try
193         {
194             m_mutex.acquire();
195             try
196             {
197                 if( ! getEnqueuePredicate().accept(element, this) )
198                 {
199                     throw new SinkFullException( "Not enough room to enqueue these elements." );
200                 }
201
202                 m_elements.add( element );
203             }
204             finally
205             {
206                 m_mutex.release();
207             }
208         }
209         catch( InterruptedException JavaDoc ie )
210         {
211         }
212     }
213
214     public Object JavaDoc[] dequeue( final int numElements )
215     {
216         getDequeueInterceptor().before(this);
217         Object JavaDoc[] elements = EMPTY_ARRAY;
218
219         try
220         {
221             if( m_mutex.attempt( m_timeout ) )
222             {
223                 try
224                 {
225                     elements = retrieveElements( m_elements,
226                                                  Math.min( size(),
227                                                            numElements ) );
228                 }
229                 finally
230                 {
231                     m_mutex.release();
232                 }
233             }
234         }
235         catch( InterruptedException JavaDoc ie )
236         {
237             //TODO: exception handling
238
}
239
240         getDequeueInterceptor().after(this);
241         return elements;
242     }
243
244     public Object JavaDoc[] dequeueAll()
245     {
246         getDequeueInterceptor().before(this);
247         Object JavaDoc[] elements = EMPTY_ARRAY;
248
249         try
250         {
251             if( m_mutex.attempt( m_timeout ) )
252             {
253                 try
254                 {
255                     elements = retrieveElements( m_elements, size() );
256                 }
257                 finally
258                 {
259                     m_mutex.release();
260                 }
261             }
262         }
263         catch( InterruptedException JavaDoc ie )
264         {
265             // TODO: exception hanlding
266
}
267
268         getDequeueInterceptor().after(this);
269         return elements;
270     }
271
272     /**
273      * Removes the given number of elements from the given <code>buf</code>
274      * and returns them in an array. Trusts the caller to pass in a buffer
275      * full of <code>Object</code>s and with at least
276      * <code>count</code> elements available.
277      * <p>
278      * @param buf to remove elements from, the caller is responsible
279      * for synchronizing access
280      * @param count number of elements to remove/return
281      * @return requested number of elements
282      */

283     private static Object JavaDoc[] retrieveElements( Buffer buf, int count )
284     {
285         Object JavaDoc[] elements = new Object JavaDoc[ count ];
286
287         for( int i = 0; i < count; i++ )
288         {
289             elements[ i ] = buf.remove();
290         }
291
292         return elements;
293     }
294
295     public Object JavaDoc dequeue()
296     {
297         getDequeueInterceptor().before(this);
298         Object JavaDoc element = null;
299
300         try
301         {
302             if( m_mutex.attempt( m_timeout ) )
303             {
304                 try
305                 {
306                     if( size() > 0 )
307                     {
308                         element = m_elements.remove();
309                     }
310                 }
311                 finally
312                 {
313                     m_mutex.release();
314                 }
315             }
316         }
317         catch( InterruptedException JavaDoc ie )
318         {
319             // TODO: exception handling
320
}
321
322         getDequeueInterceptor().after(this);
323         return element;
324     }
325
326     private static final class DefaultPreparedEnqueue implements PreparedEnqueue
327     {
328         private final DefaultQueue m_parent;
329         private Object JavaDoc[] m_elements;
330
331         private DefaultPreparedEnqueue( DefaultQueue parent, Object JavaDoc[] elements )
332         {
333             m_parent = parent;
334             m_elements = elements;
335             m_parent.m_reserve += elements.length;
336         }
337
338         public void commit()
339         {
340             if( null == m_elements )
341             {
342                 throw new IllegalStateException JavaDoc( "This PreparedEnqueue has already been processed!" );
343             }
344
345             try
346             {
347                 m_parent.m_reserve -= m_elements.length;
348                 m_parent.enqueue( m_elements );
349                 m_elements = null;
350             }
351             catch( Exception JavaDoc e )
352             {
353                 throw new IllegalStateException JavaDoc( "Default enqueue did not happen--should be impossible" );
354                 // will never happen
355
}
356         }
357
358         public void abort()
359         {
360             if( null == m_elements )
361             {
362                 throw new IllegalStateException JavaDoc( "This PreparedEnqueue has already been processed!" );
363             }
364
365             m_parent.m_reserve -= m_elements.length;
366             m_elements = null;
367         }
368     }
369 }
370
Popular Tags