KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > Ostermiller > util > Parallelizer


1 /*
2  * Runs multiple jobs in parallel
3  * Copyright (C) 2004-2005 Matt Conway
4  * http://simplygenius.com/
5  * Copyright (C) 2005 Stephen Ostermiller
6  * http://ostermiller.org/contact.pl?regarding=Java+Utilities
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * See COPYING.TXT for details.
19  */

20
21 package com.Ostermiller.util;
22
23 import java.util.*;
24
25 /**
26  * Runs multiple jobs in parallel, n threads at a time, and waits
27  * until all threads are complete before continuing.
28  * <p>
29  * Typically, Parallelizer would be used to run each of the items-
30  * in a for loop at the same time. For example the following for
31  * loop:
32  * <pre>
33  * for (int i=0; i<10; i++){
34  * System.out.println("Hello World " + i);
35  * }
36  * System.out.println("done");
37  * </pre>
38  * To this:
39  * <pre>
40  * Parallelizer pll = new Parallelizer();
41  * for (int i=0; i<10; i++){
42  * final int j = i;
43  * pll.run(
44  * new Runnable(){
45  * System.out.println("Hello World " + j);
46  * }
47  * );
48  * }
49  * pll.join();
50  * System.out.println("done");
51  *
52  * More information about this class is available from <a target="_top" HREF=
53  * "http://ostermiller.org/utils/Parallelizer.html">ostermiller.org</a>.
54  *
55  * @author Matt Conway - http://simplygenius.com/
56  * @author Stephen Ostermiller - http://ostermiller.org/contact.pl?regarding=Java+Utilities
57  * @since ostermillerutils 1.05.00
58  */

59 public class Parallelizer
60 {
61     /**
62      * Constant that may be passed concurrentThreadLimit argument
63      * of the constructor indicating that no limit should be placed
64      * on the number of threads that are allowed to run concurrently.
65      *
66      * @since ostermillerutils 1.05.00
67      */

68     public static final int INFINITE_THREAD_LIMIT = 0;
69
70     /**
71      * The number of threads that are allowed to be run concurrently.
72      * (INFINITE_THREAD_LIMIT for no limit)
73      */

74     private int concurrentThreadLimit = INFINITE_THREAD_LIMIT;
75
76     /**
77      * Create a new Parallelizer with no limit on the number
78      * of threads that will be allowed to be run concurrently.
79      *
80      * @since ostermillerutils 1.05.00
81      */

82     public Parallelizer(){
83         this(INFINITE_THREAD_LIMIT);
84     }
85
86     /**
87      * Create a new Parallelizer with the specified limit on the number
88      * of threads that will be allowed to be run concurrently.
89      * <p>
90      * When the concurrent thread limit is reached and the parallelizer
91      * gets a new thread to run, the new thread will be queued until
92      * a thread finishes.
93      *
94      * @param concurrentThreadLimit number of threads that will be allowed
95      * to run simultaneously or INFINITE_THREAD_LIMIT for no limit.
96      * @throws IllegalArgumentException if concurrentThreadLimit not a whole
97      * number or INFINITE_THREAD_LIMIT
98      *
99      * @since ostermillerutils 1.05.00
100      */

101     public Parallelizer(int concurrentThreadLimit){
102         if (concurrentThreadLimit < INFINITE_THREAD_LIMIT) throw new IllegalArgumentException JavaDoc("Bad concurrent thread limit: " + concurrentThreadLimit);
103         this.concurrentThreadLimit = concurrentThreadLimit;
104     }
105
106     /**
107      * A Set of threads that are currently running.
108      * This set is also used as a lock to synchronize
109      * anything that touches running threads.
110      */

111     private HashSet<Thread JavaDoc> runningThreads = new HashSet<Thread JavaDoc>();
112
113     /**
114      * A queue of jobs that have not yet been started.
115      */

116     private LinkedList<Thread JavaDoc> toRunQueue = new LinkedList<Thread JavaDoc>();
117
118     /**
119      * Run the given job. The given job is either run
120      * immediately or if the max number of concurrent jobs are already
121      * running, it is queued to be run when some job is finished.
122      * <p>
123      * If this method throws an error, that
124      * error may be handled and this method
125      * may be called again as it will not rethrow the same
126      * instance of the error.
127      *
128      * @param job job which is to be run in parallel with other jobs.
129      * @throws Error if any thread that is already running has thrown an Error.
130      * @throws NullPointerException if job is null.
131      *
132      * @since ostermillerutils 1.05.00
133      */

134     public void run(Runnable JavaDoc job){
135         run(null, job, null, 0);
136     }
137
138     /**
139      * Run the given job. The given job is either run
140      * immediately or if the max number of concurrent jobs are already
141      * running, it is queued to be run when some job is finished.
142      * <p>
143      * If this method throws an error, that
144      * error may be handled and this method
145      * may be called again as it will not rethrow the same
146      * instance of the error.
147      *
148      * @param job job which is to be run in parallel with other jobs.
149      * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
150      * @throws Error if any thread that is already running has thrown an Error.
151      * @throws NullPointerException if job is null.
152      *
153      * @since ostermillerutils 1.05.00
154      */

155     public void run(Runnable JavaDoc job, String JavaDoc threadName){
156         run(null, job, threadName, 0);
157     }
158
159     /**
160      * Run the given job. The given job is either run
161      * immediately or if the max number of concurrent jobs are already
162      * running, it is queued to be run when some job is finished.
163      * <p>
164      * If this method throws an error, that
165      * error may be handled and this method
166      * may be called again as it will not rethrow the same
167      * instance of the error.
168      *
169      * @param threadGroup group in which this job should be run (null for default group).
170      * @param job job which is to be run in parallel with other jobs.
171      * @throws Error if any thread that is already running has thrown an Error.
172      * @throws NullPointerException if job is null.
173      *
174      * @since ostermillerutils 1.05.00
175      */

176     public void run(ThreadGroup JavaDoc threadGroup, Runnable JavaDoc job){
177         run(threadGroup, job, null, 0);
178     }
179
180     /**
181      * Run the given job. The given job is either run
182      * immediately or if the max number of concurrent jobs are already
183      * running, it is queued to be run when some job is finished.
184      * <p>
185      * If this method throws an error, that
186      * error may be handled and this method
187      * may be called again as it will not rethrow the same
188      * instance of the error.
189      *
190      * @param threadGroup group in which this job should be run (null for default group).
191      * @param job job which is to be run in parallel with other jobs.
192      * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
193      * @throws Error if any thread that is already running has thrown an Error.
194      * @throws NullPointerException if job is null.
195      *
196      * @since ostermillerutils 1.05.00
197      */

198     public void run(ThreadGroup JavaDoc threadGroup, Runnable JavaDoc job, String JavaDoc threadName){
199         run(threadGroup, job, threadName, 0);
200     }
201
202     /**
203      * Run the given job. The given job is either run
204      * immediately or if the max number of concurrent jobs are already
205      * running, it is queued to be run when some job is finished.
206      * <p>
207      * If this method throws an error, that
208      * error may be handled and this method
209      * may be called again as it will not rethrow the same
210      * instance of the error.
211      *
212      * @param threadGroup group in which this job should be run (null for default group).
213      * @param job job which is to be run in parallel with other jobs.
214      * @param threadName name for the thread that will be created to run the job (null for auto generated thread name)
215      * @param stackSize system dependent stack size suggestion for thread creation (0 for default stack size).
216      * @throws Error if any thread that is already running has thrown an Error.
217      * @throws NullPointerException if job is null.
218      *
219      * @since ostermillerutils 1.05.00
220      */

221     public void run(ThreadGroup JavaDoc threadGroup, final Runnable JavaDoc job, String JavaDoc threadName, long stackSize){
222         throwFirstError();
223
224         Runnable JavaDoc jobWrapper = new Runnable JavaDoc(){
225             public void run(){
226                 try {
227                     job.run();
228                 } catch (RuntimeException JavaDoc runtimeException){
229                     // Put exceptions in the exception queue
230
synchronized(runningThreads){
231                         exceptionList.add(runtimeException);
232                     }
233                 } catch (Error JavaDoc error){
234                     // Put errors in the error queue
235
synchronized(runningThreads){
236                         errorList.add(error);
237                     }
238                 } finally {
239                     synchronized(runningThreads){
240                         // when done remove ourselves from the list
241
// of running threads.
242
runningThreads.remove(Thread.currentThread());
243                         // Notify the block method.
244
runningThreads.notifyAll();
245                     }
246                     // If there are jobs queued up to be run, now would
247
// be a good time to run them.
248
startAJobIfNeeded();
249                 }
250             }
251         };
252
253         // ensure the thread name is not null, and auto generate a name if it is
254
threadName = getNextThreadName(threadName);
255
256         // If we are already running the max number of jobs, queue this job up
257
synchronized(runningThreads){
258             toRunQueue.add(
259                 new Thread JavaDoc(
260                     threadGroup,
261                     jobWrapper,
262                     threadName,
263                     stackSize
264                 )
265             );
266         }
267
268         // Now that the job is in the queue of jobs to run,
269
// check the queue and see if the job should be started
270
startAJobIfNeeded();
271     }
272
273     /**
274      * An number to assign to the next auto generated thread name
275      */

276     private static int threadNameCount = 0;
277
278     /**
279      * Ensure the given thread name is not null. If not null, return it,
280      * if it is null, then then generate a name.
281      *
282      * @param threadName existing thread name to check
283      * @return the given thread name or a generated thread name if the specified name was null.
284      */

285     private static String JavaDoc getNextThreadName(String JavaDoc threadName){
286         if (threadName != null) return threadName;
287         return "Parallelizer-"+(threadNameCount++);
288     }
289
290     /**
291      * A queue of exceptions that running threads have thrown.
292      */

293     private LinkedList<RuntimeException JavaDoc> exceptionList = new LinkedList<RuntimeException JavaDoc>();
294
295     /**
296      * Remove the first exception from the exception list and throw it.
297      *
298      * @throws RuntimeException if a running thread has thrown an exception not yet thrown by this method.
299      */

300     private void throwFirstException(){
301         synchronized(runningThreads){
302             if (exceptionList.size() > 0){
303                 throw (RuntimeException JavaDoc)exceptionList.removeFirst();
304             }
305         }
306     }
307
308     /**
309      * A queue of exceptions that running threads have thrown.
310      */

311     private LinkedList<Error JavaDoc> errorList = new LinkedList<Error JavaDoc>();
312
313     /**
314      * Remove the first error from the error list and throw it.
315      *
316      * @throws Error if a running thread has thrown an error not yet thrown by this method.
317      */

318     private void throwFirstError() throws Error JavaDoc {
319         synchronized(runningThreads){
320             if (errorList.size() > 0){
321                 throw (Error JavaDoc)errorList.removeFirst();
322             }
323         }
324     }
325
326     /**
327      * Remove a job from the toRunQueue, create a thread for it,
328      * start the thread, and put the job in the set of running jobs.
329      * But do all this only if there are jobs queued up to be run
330      * and we are not already running the max number of concurrent
331      * jobs at once.
332      */

333     private void startAJobIfNeeded(){
334         synchronized(runningThreads){
335             // If we are already running the max number of jobs, just return
336
if (concurrentThreadLimit != INFINITE_THREAD_LIMIT){
337                 if (runningThreads.size() >= concurrentThreadLimit) return;
338             }
339
340             // If there are no more job to run, return
341
if (toRunQueue.size() == 0) return;
342
343             // Get a job out of the queue
344
Thread JavaDoc thread = (Thread JavaDoc)toRunQueue.removeFirst();
345
346             // Put the thread in the list of running threads
347
runningThreads.add(thread);
348             thread.start();
349         }
350     }
351
352     /**
353      * Return true iff all jobs that have been requested to run
354      * in this Parallelizer have completed.
355      * <p>
356      * If this method throws an error, that
357      * error may be handled and this method
358      * may be called again as it will not rethrow the same
359      * instance of the error.
360      *
361      * @return Whether all jobs are done or not.
362      * @throws Error if any of the running threads has thrown an Error.
363      *
364      * @since ostermillerutils 1.05.00
365      */

366     public boolean done(){
367         throwFirstError();
368         synchronized(runningThreads){
369             return (toRunQueue.size() + runningThreads.size()) == 0;
370         }
371     }
372
373     /**
374      * All currently running threads will be interrupted.
375      * The threads interrupted threads may die, causing
376      * jobs that were queued but not yet started, to start.
377      * <p>
378      * If this method throws an error, that
379      * error may be handled and this method
380      * may be called again as it will not rethrow the same
381      * instance of the error.
382      *
383      * @throws Error if any of the running threads has thrown an Error.
384      *
385      * @since ostermillerutils 1.05.00
386      */

387     public void interrupt(){
388         throwFirstError();
389         synchronized(runningThreads){
390             for (Iterator<Thread JavaDoc> i=runningThreads.iterator(); i.hasNext();){
391                 ((Thread JavaDoc)i.next()).interrupt();
392                 throwFirstError();
393             }
394         }
395     }
396
397     /**
398      * Dump the stack of each running thread.
399      * <p>
400      * If this method throws an error, that
401      * error may be handled and this method
402      * may be called again as it will not rethrow the same
403      * instance of the error.
404      *
405      * @throws Error if any of the running threads has thrown an Error.
406      *
407      * @since ostermillerutils 1.05.00
408      */

409     public void dumpStack(){
410         throwFirstError();
411         synchronized(runningThreads){
412             for (Iterator<Thread JavaDoc> i=runningThreads.iterator(); i.hasNext();){
413                 ((Thread JavaDoc)i.next()).dumpStack();
414                 throwFirstError();
415             }
416         }
417     }
418
419     /**
420      * Gets a list of all running threads. There may be jobs that
421      * are queued and do not yet have threads. These job are not
422      * returned.
423      * <p>
424      * If this method throws an error, that
425      * error may be handled and this method
426      * may be called again as it will not rethrow the same
427      * instance of the error.
428      *
429      * @throws Error if any of the running threads has thrown an Error.
430      * @return an array of all currently running threads.
431      *
432      * @since ostermillerutils 1.05.00
433      */

434     public Thread JavaDoc[] getRunningThreads(){
435         throwFirstError();
436         synchronized(runningThreads){
437             return (Thread JavaDoc[])runningThreads.toArray(new Thread JavaDoc[0]);
438         }
439     }
440
441     /**
442      * Block until all the jobs in this Parallelizer have run
443      * and then return.
444      * <p>
445      * If this method throws an exception or an error, that
446      * exception or error may be handled and this method
447      * may be called again as it will not rethrow the same
448      * instance of the exception or error.
449      *
450      * @throws InterruptedException if interrupted while waiting.
451      * @throws RuntimeException any running thread throws or has thrown a runtime exception.
452      * @throws Error if any of the running threads throws or has thrown an Error.
453      *
454      * @since ostermillerutils 1.05.00
455      */

456     public void join() throws InterruptedException JavaDoc {
457         while (!done()){
458             synchronized(runningThreads){
459                 throwFirstException();
460                 runningThreads.wait();
461                 throwFirstError();
462                 throwFirstException();
463             }
464         }
465     }
466 }
467
Popular Tags