KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > excalibur > event > command > CommandManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12  * implied.
13  *
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.excalibur.event.command;
18
19 import java.util.ArrayList JavaDoc;
20 import java.util.Collections JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.Map JavaDoc;
24
25 import org.apache.avalon.framework.activity.Disposable;
26 import org.apache.avalon.framework.logger.AbstractLogEnabled;
27 import org.apache.avalon.framework.logger.NullLogger;
28 import org.apache.commons.collections.Buffer;
29 import org.apache.commons.collections.UnboundedFifoBuffer;
30 import org.apache.excalibur.event.*;
31 import org.apache.excalibur.event.impl.DefaultQueue;
32
33 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
34
35 /**
36  * The CommandManager handles asynchronous commands from the rest of the
37  * system. The only exposed piece is the Queue that other components use to
38  * give Commands to this system. You <strong>must</strong> register this
39  * with a ThreadManager for it to work.
40  *
41  * <p><strong>Source Example</strong></p>
42  * <pre>
43
44  //
45  // Set up the ThreadManager that the CommandManager will use
46  //
47
48  ThreadManager threadManager = new TPCThreadManager();
49  threadManager.enableLogging( getLogger().getChildLogger("threadmanager") );
50  Parameters params = new Parameters();
51  params.setParameter( "threads-per-processor", "2" );
52  params.setParameter( "sleep-time", "1000" );
53  params.setParameter( "block-timeout", "250" );
54  threadManager.parameterize( params );
55  threadManager.initialize();
56
57  //
58  // Set up the CommandManager
59  //
60
61  CommandManager commandManager = new CommandManager();
62  commandManager.enableLogging( getLogger().getChildLogger("commandmanager") );
63  threadManager.register( commandManager );
64  * </pre>
65  *
66  * @author <a HREF="mailto:dev@avalon.apache.org">Avalon Development Team</a>
67  */

68 public class CommandManager extends AbstractLogEnabled
69         implements EventPipeline, Disposable, EnqueuePredicate {
70     private final Queue m_queue;
71     private final HashMap JavaDoc m_signalHandlers;
72     private final ReentrantLock m_mutex;
73     private final EventHandler m_eventHandler;
74     private final Source[] m_sources;
75     private CommandFailureHandler m_failureHandler;
76     private boolean m_isAccepting;
77
78     /**
79      * Create the CommandManager
80      */

81     public CommandManager() {
82         m_queue = new DefaultQueue();
83         m_signalHandlers = new HashMap JavaDoc();
84         m_mutex = new ReentrantLock();
85         m_eventHandler = new CommandEventHandler(Collections.unmodifiableMap(m_signalHandlers));
86         m_sources = new Source[]{m_queue};
87         m_failureHandler = NullCommandFailureHandler.SHARED_INSTANCE;
88         m_queue.setEnqueuePredicate(this);
89         m_isAccepting = true;
90
91         // if no logger is set ensure a valid one is there.
92
enableLogging(new NullLogger());
93     }
94
95     /**
96      * Set the failure handler that the application can use to override what happens when a command
97      * failure happens.
98      *
99      * @param handler the new Handler
100      * @throws NullPointerException if "handler" is null.
101      */

102     public void setCommandFailureHandler(final CommandFailureHandler handler) {
103         if (null == handler) throw new NullPointerException JavaDoc("handler");
104         m_failureHandler = handler;
105     }
106
107     /**
108      * Get the failure handler so that CommandManager can use it when a problem happens.
109      *
110      * @return the failure handler.
111      */

112     protected CommandFailureHandler getCommandFailureHandler() {
113         return m_failureHandler;
114     }
115
116     /**
117      * Get the Command Sink so that you can enqueue new commands.
118      *
119      * @return the Sink that feeds the CommandManager
120      */

121     public final Sink getCommandSink() {
122         return m_queue;
123     }
124
125     /**
126      * Register a Signal with an EventHandler. The Signal is a special object
127      * that implements the {@link Signal} interface. When CommandManager recieves
128      * events that match the Signal, it will send a copy of it to all the
129      * {@link EventHandler}s attached to it.
130      *
131      * @param signal The signal we are listening for.
132      * @param handler The handler that wants to be notified
133      */

134     public final void registerSignalHandler(Signal signal, EventHandler handler) {
135         try {
136             m_mutex.acquire();
137             ArrayList JavaDoc handlers = (ArrayList JavaDoc) m_signalHandlers.get(signal.getClass());
138
139             if (null == handlers) {
140                 handlers = new ArrayList JavaDoc();
141             }
142
143             if (!handlers.contains(handler)) {
144                 handlers.add(handler);
145
146                 m_signalHandlers.put(signal.getClass(), handlers);
147             }
148         } catch (InterruptedException JavaDoc ie) {
149             // ignore for now
150
} finally {
151             m_mutex.release();
152         }
153     }
154
155     /**
156      * Deregister a Signal with an EventHandler. Stop notifying the particular
157      * EventHandler that is passed in about the associated Signal.
158      *
159      * @param signal The signal we are listening for.
160      * @param handler The handler that wants to be notified
161      */

162     public final void deregisterSignalHandler(Signal signal, EventHandler handler) {
163         try {
164             m_mutex.acquire();
165             ArrayList JavaDoc handlers = (ArrayList JavaDoc) m_signalHandlers.get(signal.getClass());
166
167             if (null != handlers) {
168                 if (handlers.remove(handler)) {
169                     m_signalHandlers.put(signal.getClass(), handlers);
170                 }
171
172                 if (0 == handlers.size()) {
173                     m_signalHandlers.remove(signal.getClass());
174                 }
175             }
176         } catch (InterruptedException JavaDoc ie) {
177             // ignore for now
178
} finally {
179             m_mutex.release();
180         }
181     }
182
183     /**
184      * When you are done with CommandManager, call this and it will
185      * clean up all its resources.
186      */

187     public void dispose() {
188         m_isAccepting = false;
189         Object JavaDoc[] remainingElements = m_queue.dequeueAll();
190         for (int i = 0; i < remainingElements.length; i++) {
191             getEventHandler().handleEvent(remainingElements[i]);
192         }
193     }
194
195     /**
196      * Used by the Threadmanager to get the sources that are feeding
197      * the CommandManager.
198      *
199      * @return the Array of one Source
200      */

201     public final Source[] getSources() {
202         return m_sources;
203     }
204
205     /**
206      * Used by the ThreadManager to get the EventHandler for the
207      * CommandManager.
208      *
209      * @return the EventHandler
210      */

211     public final EventHandler getEventHandler() {
212         return m_eventHandler;
213     }
214
215     private final class CommandEventHandler implements EventHandler {
216         private final Map JavaDoc m_signalHandlers;
217         private final Buffer m_delayedCommands = new UnboundedFifoBuffer();
218
219         protected CommandEventHandler(Map JavaDoc signalHandlers) {
220             m_signalHandlers = signalHandlers;
221         }
222
223         public final void handleEvents(Object JavaDoc[] elements) {
224             for (int i = 0; i < elements.length; i++) {
225                 handleEvent(elements[i]);
226             }
227
228             int size = m_delayedCommands.size();
229             for (int i = 0; i < size; i++) {
230                 DelayedCommandInfo command = (DelayedCommandInfo) m_delayedCommands.remove();
231
232                 if (System.currentTimeMillis() >= command.m_nextRunTime) {
233                     try {
234                         command.m_command.execute();
235                     } catch (Exception JavaDoc e) {
236                         if (getLogger().isWarnEnabled()) {
237                             getLogger().warn("Exception during Command.execute()", e);
238                         }
239                     }
240
241                     command.m_numExecutions++;
242
243                     if (command.m_repeatable) {
244                         RepeatedCommand cmd = (RepeatedCommand) command.m_command;
245                         int numRepeats = cmd.getNumberOfRepeats();
246
247                         if ((numRepeats < 1) || (command.m_numExecutions < numRepeats)) {
248                             command.m_nextRunTime = System.currentTimeMillis() +
249                                     cmd.getRepeatInterval();
250                             m_delayedCommands.add(command);
251                         }
252                     }
253                 } else {
254                     // not yet executed, wait some more
255
m_delayedCommands.add(command);
256                 }
257             }
258         }
259
260         public final void handleEvent(Object JavaDoc element) {
261             if (!(element instanceof Signal)) {
262                 return;
263             }
264
265             if (!(element instanceof Command)) {
266                 ArrayList JavaDoc handlers = (ArrayList JavaDoc) m_signalHandlers.get(element.getClass());
267
268                 if (null != handlers) {
269                     Iterator JavaDoc i = handlers.iterator();
270
271                     while (i.hasNext()) {
272                         EventHandler handler = (EventHandler) i.next();
273                         handler.handleEvent(element);
274                     }
275                 }
276
277                 return;
278             }
279
280             if (element instanceof DelayedCommand) {
281                 DelayedCommandInfo commandInfo = new DelayedCommandInfo();
282                 commandInfo.m_command = (DelayedCommand) element;
283                 commandInfo.m_nextRunTime = System.currentTimeMillis() +
284                         commandInfo.m_command.getDelayInterval();
285                 commandInfo.m_numExecutions = 0;
286                 commandInfo.m_repeatable = element instanceof RepeatedCommand;
287
288                 m_delayedCommands.add(commandInfo);
289                 return;
290             }
291
292             try {
293                 ((Command) element).execute();
294             } catch (Exception JavaDoc e) {
295                 boolean stopProcessing =
296                         getCommandFailureHandler().handleCommandFailure((Command) element, e);
297
298                 /* If we are no longer processing, then we clear out the Queue and refuse to accept
299                  * any more commands. Essentially the CommandManager is closed.
300                  */

301                 if (stopProcessing) {
302                     m_isAccepting = false;
303                     m_queue.dequeueAll();
304                 }
305             }
306         }
307     }
308
309     private static final class DelayedCommandInfo {
310         protected DelayedCommand m_command;
311         protected long m_nextRunTime;
312         protected int m_numExecutions;
313         protected boolean m_repeatable;
314     }
315
316     /**
317      * Tests the given element for acceptance onto the m_sink.
318      * @since Feb 10, 2003
319      *
320      * @param element The element to enqueue
321      * @param modifyingSink The sink that is used for this predicate
322      * @return
323      * <code>true</code> if the sink accepts the element;
324      * <code>false</code> otherwise.
325      */

326     public boolean accept(Object JavaDoc element, Sink modifyingSink) {
327         return m_isAccepting;
328     }
329
330     /**
331      * Tests the given element for acceptance onto the m_sink.
332      * @since Feb 10, 2003
333      *
334      * @param elements The array of elements to enqueue
335      * @param modifyingSink The sink that is used for this predicate
336      * @return
337      * <code>true</code> if the sink accepts all the elements;
338      * <code>false</code> otherwise.
339      */

340     public boolean accept(Object JavaDoc elements[], Sink modifyingSink) {
341         return m_isAccepting;
342     }
343 }
344
Popular Tags