KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sourceforge > groboutils > util > thread > v1 > QueueThread


1 /*
2  * @(#)QueueThread.java 0.9.0 06/04/2000 - 13:31:52
3  *
4  * Copyright (C) 2000,,2003 2002 Matt Albrecht
5  * groboclown@users.sourceforge.net
6  * http://groboutils.sourceforge.net
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a
9  * copy of this software and associated documentation files (the "Software"),
10  * to deal in the Software without restriction, including without limitation
11  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12  * and/or sell copies of the Software, and to permit persons to whom the
13  * Software is furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
21  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24  * DEALINGS IN THE SOFTWARE.
25  */

26
27 package net.sourceforge.groboutils.util.thread.v1;
28
29
30 import net.sourceforge.groboutils.util.datastruct.v1.SynchQueue;
31
32
33 /**
34  * For threads which endlessly process events from a SynchQueue. This
35  * is a common technique for thread pooling.
36  * <P>
37  * Users must make a implementation of <tt>IObjectListener</tt>.
38  * If the user does not give a <tt>SynchQueue</tt>, it is created for them.
39  * Once the QueueThread is created, the queue and the listener cannot
40  * be changed.
41  * <P>
42  * By default, the underlying {@link LoopThread} does not sleep between events,
43  * it is a daemon thread, runs on the lowest thread priority, and has
44  * not started yet. Of course, this can all be changed.
45  * <P>
46  * It is advisable not to use the methods {@link
47  * LoopThread#setSleepTime( int )} or
48  * {@link LoopThread#setSleepTimeMillis( long )}, since these will cause the
49  * <tt>QueueThread</tt> to not respond to incoming requests during this sleep
50  * time. However, there may be occations when this is necessary, so this is
51  * left available to the user.
52  * <P>
53  * Since the {@link SynchQueue#dequeue()} method can wait indefinitely, you may
54  * opt to set the dequeue's timeout to something other than 0. Without setting
55  * this, the thread may wait indefinitely for an incoming element to the queue
56  * without ever checking for a {@link LoopThread#stop()} or
57  * {@link LoopThread#suspend()} signal. Thanks to
58  * <a HREF="mailto:d.gallot@atoseuronext.be">Dominique Gallot</a> for pointing
59  * this out.
60  * <P>
61  * After a {@link LoopThread#stop()} is invoked, you may allow the
62  * object listener to finish processing the remaining elements in the
63  * inner queue by calling {@link #processRemaining()}.
64  *
65  * @author Matt Albrecht <a HREF="mailto:groboclown@users.sourceforge.net">groboclown@users.sourceforge.net</a>
66  * @since June 4, 2000
67  * @version $Date: 2003/02/10 22:52:48 $
68  */

69 public class QueueThread extends LoopThread
70 {
71     private IObjectListener m_objListener = null;
72     
73     private SynchQueue m_queue = null;
74     
75     private boolean m_isProcessingObject = false;
76     
77     private long m_timeout = 0;
78     private int m_nanos = 0;
79     
80
81     /**
82      * The runnable class - keep it private so no one
83      * can directly access it in a thread but us.
84      */

85     private class QueueRunnable implements Runnable JavaDoc
86     {
87         public void run()
88         {
89             if (m_queue.isEmpty())
90             {
91                 m_isProcessingObject = false;
92             }
93
94             try
95             {
96                 Object JavaDoc o = m_queue.dequeue( m_timeout, m_nanos );
97                 m_isProcessingObject = true;
98                 m_objListener.processObject( o );
99     
100                 if (m_queue.isEmpty())
101                 {
102                     m_isProcessingObject = false;
103                 }
104             }
105             catch (InterruptedException JavaDoc ie)
106             {
107                 // whoops - how do we handle this?
108
}
109         }
110     }
111     
112     
113     
114     
115     //--------------------------------------------------------------
116
// Constructors
117

118     
119     /**
120      *
121      */

122     public QueueThread( IObjectListener ol )
123     {
124         this( ol, new SynchQueue() );
125     }
126     
127     /**
128      *
129      */

130     public QueueThread( IObjectListener ol, SynchQueue sq )
131     {
132         super();
133         
134         initialize( ol, sq );
135     }
136     
137     /**
138      *
139      */

140     public QueueThread( IObjectListener ol, ThreadGroup JavaDoc tg )
141     {
142         this( ol, new SynchQueue(), tg );
143     }
144     
145     
146     /**
147      *
148      */

149     public QueueThread( IObjectListener ol, SynchQueue sq, ThreadGroup JavaDoc tg )
150     {
151         super( null, tg );
152         
153         initialize( ol, sq );
154     }
155     
156     
157     /**
158      *
159      */

160     public QueueThread( IObjectListener ol, String JavaDoc threadName )
161     {
162         this( ol, new SynchQueue(), threadName );
163     }
164     
165     
166     /**
167      *
168      */

169     public QueueThread( IObjectListener ol, SynchQueue sq, String JavaDoc threadName )
170     {
171         super( null, threadName );
172         
173         initialize( ol, sq );
174     }
175     
176     
177     /**
178      *
179      */

180     public QueueThread( IObjectListener ol, ThreadGroup JavaDoc tg, String JavaDoc threadName )
181     {
182         this( ol, new SynchQueue(), tg, threadName );
183     }
184     
185     
186     /**
187      *
188      */

189     public QueueThread( IObjectListener ol, SynchQueue sq, ThreadGroup JavaDoc tg,
190             String JavaDoc threadName )
191     {
192         super( null, tg, threadName );
193         
194         initialize( ol, sq );
195     }
196     
197     
198     
199     
200     //--------------------------------------------------------------
201
// Public Methods
202

203     
204     /**
205      * Retrieves the internal listened queue.
206      */

207     public SynchQueue getQueue()
208     {
209         return this.m_queue;
210     }
211     
212     
213     /**
214      * @return <tt>false</tt> if the thread is waiting for an object
215      * to be placed in the queue and be processed, otherwise
216      * <tt>true</tt>.
217      */

218     public boolean isProcessingObjects()
219     {
220         return this.m_isProcessingObject;
221     }
222     
223     
224     /**
225      * Set the maximum time (in milliseconds and nanoseconds) to wait for
226      * an incoming element on the inner queue before checking for
227      * {@link LoopThread#stop()} and {@link LoopThread#suspend()}
228      * signals.
229      *
230      * @param timeout the maximum time to wait in milliseconds.
231      * @param nanos additional time, in nanoseconds range 0-999999.
232      * @see SynchQueue#dequeue( long, int )
233      */

234     public void setTimeout( long timeout, int nanos )
235     {
236         this.m_timeout = timeout;
237         this.m_nanos = nanos;
238     }
239     
240     
241     /**
242      * Set the maximum time (in milliseconds) to wait for
243      * an incoming element on the inner queue before checking for
244      * {@link LoopThread#stop()} and {@link LoopThread#suspend()}
245      * signals.
246      *
247      * @param timeout the maximum time to wait in milliseconds.
248      * @param nanos additional time, in nanoseconds range 0-999999.
249      * @see SynchQueue#dequeue( long, int )
250      * @see #setTimeout( long, int )
251      */

252     public void setTimeout( long timeout )
253     {
254         setTimeout( timeout, 0 );
255     }
256     
257     
258     /**
259      * Retrieve the millisecond part of the maximum timeout to wait for an
260      * incoming element on the inner queue before checking for thread
261      * event signals.
262      *
263      * @see #setTimeout( long, int )
264      */

265     public long getTimeoutMilliseconds()
266     {
267         return this.m_timeout;
268     }
269     
270     
271     /**
272      * Retrieve the nanosecond part of the maximum timeout to wait for an
273      * incoming element on the inner queue before checking for thread
274      * event signals.
275      *
276      * @see #setTimeout( long, int )
277      */

278     public int getTimeoutNanoseconds()
279     {
280         return this.m_nanos;
281     }
282     
283     
284     /**
285      * Process all elements in the queue until the queue is empty.
286      * This may only be called while the thread is not running.
287      * <P>
288      * This should be invoked with care, as it can cause an infinite
289      * loop if another thread is pushing in data after this processing
290      * thread has finished (but, that could also lead to an out-of-memory
291      * error if this method is never invoked).
292      */

293     public void processRemaining()
294             throws InterruptedException JavaDoc
295     {
296         if (isRunning())
297         {
298             throw new IllegalStateException JavaDoc(
299                 "cannot call processRemaining() while the underlying thread "+
300                 "is still running." );
301         }
302         if (!this.m_queue.isEmpty())
303         {
304             this.m_isProcessingObject = true;
305             do
306             {
307                 Object JavaDoc o = this.m_queue.dequeue();
308                 this.m_objListener.processObject( o );
309             }
310             while (!this.m_queue.isEmpty());
311             this.m_isProcessingObject = false;
312         }
313     }
314     
315     
316     //--------------------------------------------------------------
317
// Protected Methods
318

319     
320     protected void initialize( IObjectListener ol, SynchQueue sq )
321     {
322         setRunnable( new QueueRunnable() );
323         
324         this.m_objListener = ol;
325         this.m_queue = sq;
326         
327         initializeDefaults();
328     }
329     
330     
331     protected void initializeDefaults()
332     {
333         setSleepTime( 0 );
334         setDaemon( true );
335         setPriority( Thread.MIN_PRIORITY );
336     }
337     
338     
339 }
340
Popular Tags