KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > columba > core > command > CommandProcessor


1 //The contents of this file are subject to the Mozilla Public License Version 1.1
2
//(the "License"); you may not use this file except in compliance with the
3
//License. You may obtain a copy of the License at http://www.mozilla.org/MPL/
4
//
5
//Software distributed under the License is distributed on an "AS IS" basis,
6
//WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
7
//for the specific language governing rights and
8
//limitations under the License.
9
//
10
//The Original Code is "The Columba Project"
11
//
12
//The Initial Developers of the Original Code are Frederik Dietz and Timo Stich.
13
//Portions created by Frederik Dietz and Timo Stich are Copyright (C) 2003.
14
//
15
//All Rights Reserved.
16
package org.columba.core.command;
17
18 import java.util.ArrayList JavaDoc;
19 import java.util.List JavaDoc;
20 import java.util.logging.Logger JavaDoc;
21
22 import org.columba.api.command.ICommand;
23 import org.columba.core.base.Mutex;
24 import org.columba.core.gui.exception.ExceptionHandler;
25
26 /**
27  * Scheduler for background threads
28  * <p>
29  * DefaultProcessor keeps a pool of {@link Worker}instances, which are assigned
30  * to {@link Command}, when executed.
31  *
32  * @author tstich
33  */

34 public class CommandProcessor implements Runnable JavaDoc {
35     /** JDK 1.4+ logging framework logger, used for logging. */
36     private static final Logger JavaDoc LOG = Logger
37         .getLogger("org.columba.api.command"); //$NON-NLS-1$
38

39     public final static int MAX_WORKERS = 5;
40
41     List JavaDoc<OperationItem> operationQueue;
42
43     List JavaDoc<Worker> worker;
44
45     private Mutex oneMutex;
46
47     private int timeStamp;
48
49     private boolean stopped = false;
50
51     private static CommandProcessor instance = new CommandProcessor();
52
53     public CommandProcessor() {
54     this(true);
55     }
56
57     public static CommandProcessor getInstance() {
58     return instance;
59     }
60
61     /**
62          * Constructs a DefaultProcessor.
63          */

64     public CommandProcessor(boolean start) {
65     operationQueue = new ArrayList JavaDoc<OperationItem>(10);
66
67     worker = new ArrayList JavaDoc<Worker>(MAX_WORKERS);
68
69     // Create the workers
70
for (int i = 0; i < MAX_WORKERS; i++) {
71         Worker w = new Worker(this);
72         w.addExceptionListener(new ExceptionHandler());
73         worker.add(w);
74
75     }
76
77     oneMutex = new Mutex();
78
79     timeStamp = 0;
80
81     if (start)
82         new Thread JavaDoc(this).start();
83     }
84
85     /**
86          * Add a Command to the Queue. Calls {@link #addOp(Command, int)}with
87          * Command.FIRST_EXECUTION.
88          *
89          * @param op
90          * the command to add
91          */

92     public void addOp(final Command op) {
93     addOp(op, Command.FIRST_EXECUTION);
94     }
95
96     /**
97          * Adds a Command to the queue.
98          *
99          * @param op
100          * the command
101          * @param operationMode
102          * the mode in wich the command should be processed
103          */

104     public void addOp(final Command op, final int operationMode) {
105     try {
106         oneMutex.lock();
107
108         LOG.finest("Command " + op.toString() + " added"); //$NON-NLS-1$ //$NON-NLS-2$
109

110         int p = operationQueue.size() - 1;
111         OperationItem nextOp;
112
113         // Sort in with respect to priority and synchronize:
114
// Commands with higher priority will be processed
115
// before commands with lower priority.
116
// If there is a command that is of type synchronize
117
// don't put this command in front.
118
while (p != -1) {
119         nextOp = operationQueue.get(p);
120
121         if ((nextOp.getOperation().getPriority() < op.getPriority())
122             && !nextOp.getOperation().isSynchronize()) {
123             p--;
124         } else {
125             break;
126         }
127         }
128
129         operationQueue.add(p + 1, new OperationItem(op, operationMode));
130     } finally {
131         oneMutex.release();
132     }
133
134     wakeUp();
135     }
136
137     /**
138          * Checks if the command can be processed. This is true if all
139          * references are not blocked.
140          *
141          * @param opItem
142          * the internal command structure
143          * @return true if the operation will not be blocked
144          */

145     private boolean canBeProcessed(final OperationItem opItem) {
146     return opItem.getOperation().canBeProcessed();
147     }
148
149     /**
150          * Get the next Operation from the queue.
151          *
152          * @return the next non-blocking operation or null if none found.
153          */

154     private OperationItem nextOpItem() {
155     OperationItem nextOp = null;
156
157     for (int i = 0; i < operationQueue.size() && nextOp == null; i++) {
158         nextOp = operationQueue.get(i);
159
160         if ((i != 0) && (nextOp.getOperation().isSynchronize())) {
161         nextOp = null;
162
163         // We have to process this command first
164
// -> break here!
165
break;
166         }
167
168         try {
169         if (!canBeProcessed(nextOp)) {
170             nextOp = null;
171         }
172         } catch (RuntimeException JavaDoc e) {
173         // Remove bogus Operation
174
operationQueue.remove(nextOp);
175         nextOp = null;
176
177         LOG.warning("Operation failed: " + e.getMessage()); //$NON-NLS-1$
178
}
179
180     }
181
182     return nextOp;
183     }
184
185     /**
186          * Called by the worker to signal that his operation has finished.
187          *
188          * @param op
189          * the command the worker has processed
190          * @param w
191          * the worker himself
192          */

193     public void operationFinished(final ICommand op, final Worker w) {
194
195     try {
196         oneMutex.lock();
197
198         worker.add(w);
199     } finally {
200         oneMutex.release();
201     }
202
203     // notify that a new worker is available
204
wakeUp();
205     }
206
207     /**
208          * Get an available Worker from the workerpool. Reserve one worker for
209          * Real-Time Priority tasks
210          *
211          * @param priority
212          * @return an available worker or null if none available.
213          */

214     Worker getWorker(int priority) {
215     Worker result = null;
216     if (worker.size() > 1) {
217         result = worker.remove(0);
218     } else if (worker.size() > 0 && priority >= Command.REALTIME_PRIORITY) {
219         result = worker.remove(0);
220     }
221
222     return result;
223     }
224
225     /**
226          * Wait until a worker is available or a new command is added.
227          */

228     private synchronized void waitForNotify() {
229     try {
230         wait();
231     } catch (InterruptedException JavaDoc e) {
232         e.printStackTrace();
233     }
234     }
235
236     private synchronized void wakeUp() {
237     notifyAll();
238     }
239
240     /**
241          * @see java.lang.Runnable#run()
242          */

243     public void run() {
244     boolean sleep;
245
246     while (true && !stopped) {
247         sleep = startOperation();
248
249         if (sleep) {
250         waitForNotify();
251         sleep = false;
252         }
253     }
254
255     }
256
257     /**
258          * @param sleep
259          * @return
260          */

261     boolean startOperation() {
262     boolean sleep = false;
263     try {
264         oneMutex.lock();
265         OperationItem _opItem;
266         Worker _worker;
267         _opItem = nextOpItem();
268         if (_opItem != null && !stopped) {
269         _worker = getWorker(_opItem.getOperation().getPriority());
270         if (_worker != null && !stopped) {
271             operationQueue.remove(_opItem);
272
273             _worker.process(_opItem.getOperation(), _opItem
274                 .getOperationMode(), timeStamp++);
275
276             _worker.start();
277         } else {
278             sleep = true;
279         }
280         } else {
281         sleep = true;
282         }
283     } finally {
284         oneMutex.release();
285     }
286     return sleep;
287     }
288
289     public synchronized void stop() {
290     stopped = true;
291     notify();
292     }
293
294 }
295
296 /**
297  * Intern represenation of the Commands.
298  *
299  * @author Timo Stich <tstich@users.sourceforge.net>
300  */

301
302 class OperationItem {
303     private Command operation;
304
305     private int operationMode;
306
307     public OperationItem(Command op, int opMode) {
308     operation = op;
309     operationMode = opMode;
310     }
311
312     public Command getOperation() {
313     return operation;
314     }
315
316     public int getOperationMode() {
317     return operationMode;
318     }
319 }
Popular Tags