KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > de > nava > informa > utils > toolkit > WorkersManager


1 //
2
// Informa -- RSS Library for Java
3
// Copyright (c) 2002 by Niko Schmuck
4
//
5
// Niko Schmuck
6
// http://sourceforge.net/projects/informa
7
// mailto:niko_schmuck@users.sourceforge.net
8
//
9
// This library is free software.
10
//
11
// You may redistribute it and/or modify it under the terms of the GNU
12
// Lesser General Public License as published by the Free Software Foundation.
13
//
14
// Version 2.1 of the license should be included with this distribution in
15
// the file LICENSE. If the license is not included with this distribution,
16
// you may find a copy at the FSF web site at 'www.gnu.org' or 'www.fsf.org',
17
// or you may write to the Free Software Foundation, 675 Mass Ave, Cambridge,
18
// MA 02139 USA.
19
//
20
// This library is distributed in the hope that it will be useful,
21
// but WITHOUT ANY WARRANTY; without even the implied waranty of
22
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23
// Lesser General Public License for more details.
24
//
25
// $Id: WorkersManager.java,v 1.3 2004/09/16 14:06:00 spyromus Exp $
26
//
27

28 package de.nava.informa.utils.toolkit;
29
30 import de.nava.informa.utils.poller.PriorityComparator;
31 import org.apache.commons.collections.BinaryHeap;
32 import org.apache.commons.collections.PriorityQueue;
33 import org.apache.commons.collections.SynchronizedPriorityQueue;
34
35 import java.util.ArrayList JavaDoc;
36 import java.util.Iterator JavaDoc;
37 import java.util.List JavaDoc;
38
39 /**
40  * This class manages worker threads. Its main responsibility is to create / remove worker threads
41  * and distribute tasks among them. This is an effort to hide complexity of threading logic from
42  * the rest of toolkit. Worker manager receives processing requests from external tools and assigns
43  * them to free worker threads. If there's no free worker thread left manager puts item in queue.
44  * When worker thread finishes its job it asks for next job using <code>JobSource</code> callback
45  * interface. If there is at least one job in queue manager assigns it to worker. If not then
46  * worker goes to rest until manager has something to assign.
47  * <p>
48  * Worker manager uses instance of <code>WorkerThreadFactoryIF</code> specified on creation
49  * to create new worker threads. Client application should use it to create and initialize
50  * task-specific workers, which will be started as independent threads.</p>
51  * <p>
52  * It's possible to tune number of threads working at a given time using
53  * <code>setWorkerThreads(int)</code> method call. Right after the number of threads will change
54  * manager will stop unnecessary threads (only after running task completion) or create
55  * new worker threads.</p>
56  *
57  * @author Aleksey Gureev (spyromus@noizeramp.com)
58  */

59 public class WorkersManager {
60   private static final int DEFAULT_WORKER_THREADS = 5;
61   private static final int DEFAULT_QUEUE_LIMIT = 25;
62
63   private List JavaDoc workers = new ArrayList JavaDoc();
64
65   private JobSource jobSource = new JobSource();
66
67   // This code intentionally uses deprecated classes! Old commons library was 150 Kb and
68
// current is 550 Kb. Many applications (including BlogBridge) will be enforced to replace
69
// small lib with giantic new one if we start using new classes instead these.
70
// When support of these will be finished, please replace:
71
// PriorityQueue -> Buffer
72
// SynchronizedPriorityQueue -> SynchronizedBuffer
73
// BinaryHeap -> PriorityBuffer
74
private PriorityQueue queue =
75     new SynchronizedPriorityQueue(new BinaryHeap(DEFAULT_QUEUE_LIMIT, new PriorityComparator()));
76
77   private WorkerThreadFactoryIF workerThreadsFactory;
78
79   /**
80    * Creates worker manager with default number of worker threads.
81    *
82    * @param factory worker threads factory.
83    */

84   public WorkersManager(WorkerThreadFactoryIF factory) {
85     this(factory, DEFAULT_WORKER_THREADS);
86   }
87
88   /**
89    * Creates worker manager.
90    *
91    * @param factory worker threads factory.
92    * @param workerThreads number of worker threads.
93    */

94   public WorkersManager(WorkerThreadFactoryIF factory, int workerThreads) {
95     this.workerThreadsFactory = factory;
96
97     // Protect ourselves from incorrect parameters.
98
if (workerThreads <= 0) {
99       workerThreads = DEFAULT_WORKER_THREADS;
100     }
101
102     setWorkerThreads(workerThreads);
103   }
104
105   /**
106    * Changes number of worker threads.
107    *
108    * @param count new number of worker threads.
109    */

110   public final void setWorkerThreads(int count) {
111     synchronized (workers) {
112       // If we have more than specified number of working threads then terminate unwanted.
113
int curWorkerThreads = workers.size();
114       for (int i = curWorkerThreads - 1; i >= count; i--) {
115         final WorkerThread worker = (WorkerThread) workers.get(i);
116         worker.terminate();
117         workers.remove(worker);
118       }
119
120       // If we have less than specified number of thread then add some.
121
curWorkerThreads = workers.size();
122       for (int i = curWorkerThreads; i < count; i++) {
123         // Create new worker using custom factory.
124
final WorkerThread worker = workerThreadsFactory.create();
125         worker.setJobSource(jobSource);
126
127         // Add worker to the list and start.
128
workers.add(worker);
129         worker.start();
130       }
131     }
132   }
133
134   /**
135    * Terminates all worker threads.
136    */

137   public final void terminateAll() {
138     synchronized (workers) {
139       int count = workers.size();
140       for (int i = count - 1; i >= 0; i--) {
141         ((WorkerThread) workers.get(i)).terminate();
142         workers.remove(i);
143       }
144     }
145   }
146
147   /**
148    * Put the record in processing.
149    *
150    * @param record record to process.
151    */

152   public final void process(ChannelRecord record) {
153     if (!isInProcess(record)) {
154       if (!loadFreeWorker(record)) {
155         // No free worker available.
156
putRecordInQueue(record);
157       }
158     }
159   }
160
161   /**
162    * Checks if the channel is not currently in processing.
163    *
164    * @param record channel record.
165    * @return TRUE if is in processing.
166    */

167   private boolean isInProcess(ChannelRecord record) {
168     boolean found = false;
169
170     synchronized (workers) {
171       Iterator JavaDoc i = workers.iterator();
172       while (!found && i.hasNext()) {
173         WorkerThread worker = (WorkerThread) i.next();
174         found = worker.getChannelInProcess() == record;
175       }
176     }
177
178     return found;
179   }
180
181   /**
182    * Walk through all workers, find not busy one and load it with job.
183    *
184    * @param record record to process.
185    * @return TRUE if loaded someone.
186    */

187   private boolean loadFreeWorker(ChannelRecord record) {
188     boolean loaded = false;
189
190     synchronized (workers) {
191       Iterator JavaDoc i = workers.iterator();
192       while (!loaded && i.hasNext()) {
193         WorkerThread worker = (WorkerThread) i.next();
194         if (!worker.isBusy()) {
195           loaded = worker.startJob(record);
196         }
197       }
198     }
199
200     return loaded;
201   }
202
203   /**
204    * Put task in processing queue. During putting the task algorythm analyzes the priority of
205    * insertion and evaluates index of new item in queue.
206    *
207    * @param record record to put in queue.
208    */

209   private void putRecordInQueue(ChannelRecord record) {
210     queue.insert(record);
211   }
212
213   /**
214    * Takes top-most task from the queue.
215    *
216    * @return Task record or NULL if queue is empty.
217    */

218   private synchronized ChannelRecord getRecordFromQueue() {
219     ChannelRecord record = null;
220     while (record == null && !queue.isEmpty()) {
221       record = (ChannelRecord) queue.pop();
222       if (record.isCanceled()) record = null;
223     }
224
225     return record;
226   }
227
228   /**
229    * Listener for all job events from worker threads.
230    */

231   private class JobSource implements JobSourceIF {
232     /**
233      * Invoked by thread when job processing has finished.
234      *
235      * @return next job from queue or NULL if no jobs left.
236      */

237     public ChannelRecord getNextJob() {
238       return getRecordFromQueue();
239     }
240   }
241 }
242
Popular Tags