1 17 package org.apache.excalibur.event.command; 18 19 import java.util.ArrayList ; 20 import java.util.Collections ; 21 import java.util.HashMap ; 22 import java.util.Iterator ; 23 import java.util.Map ; 24 25 import org.apache.avalon.framework.activity.Disposable; 26 import org.apache.avalon.framework.logger.AbstractLogEnabled; 27 import org.apache.avalon.framework.logger.NullLogger; 28 import org.apache.commons.collections.Buffer; 29 import org.apache.commons.collections.UnboundedFifoBuffer; 30 import org.apache.excalibur.event.*; 31 import org.apache.excalibur.event.impl.DefaultQueue; 32 33 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock; 34 35 68 public class CommandManager extends AbstractLogEnabled 69 implements EventPipeline, Disposable, EnqueuePredicate { 70 private final Queue m_queue; 71 private final HashMap m_signalHandlers; 72 private final ReentrantLock m_mutex; 73 private final EventHandler m_eventHandler; 74 private final Source[] m_sources; 75 private CommandFailureHandler m_failureHandler; 76 private boolean m_isAccepting; 77 78 81 public CommandManager() { 82 m_queue = new DefaultQueue(); 83 m_signalHandlers = new HashMap (); 84 m_mutex = new ReentrantLock(); 85 m_eventHandler = new CommandEventHandler(Collections.unmodifiableMap(m_signalHandlers)); 86 m_sources = new Source[]{m_queue}; 87 m_failureHandler = NullCommandFailureHandler.SHARED_INSTANCE; 88 m_queue.setEnqueuePredicate(this); 89 m_isAccepting = true; 90 91 enableLogging(new NullLogger()); 93 } 94 95 102 public void setCommandFailureHandler(final CommandFailureHandler handler) { 103 if (null == handler) throw new NullPointerException ("handler"); 104 m_failureHandler = handler; 105 } 106 107 112 protected CommandFailureHandler getCommandFailureHandler() { 113 return m_failureHandler; 114 } 115 116 121 public final Sink getCommandSink() { 122 return m_queue; 123 } 124 125 134 public final void registerSignalHandler(Signal signal, EventHandler handler) { 135 try { 136 m_mutex.acquire(); 137 ArrayList handlers = (ArrayList ) m_signalHandlers.get(signal.getClass()); 138 139 if (null == handlers) { 140 handlers = new ArrayList (); 141 } 142 143 if (!handlers.contains(handler)) { 144 handlers.add(handler); 145 146 m_signalHandlers.put(signal.getClass(), handlers); 147 } 148 } catch (InterruptedException ie) { 149 } finally { 151 m_mutex.release(); 152 } 153 } 154 155 162 public final void deregisterSignalHandler(Signal signal, EventHandler handler) { 163 try { 164 m_mutex.acquire(); 165 ArrayList handlers = (ArrayList ) m_signalHandlers.get(signal.getClass()); 166 167 if (null != handlers) { 168 if (handlers.remove(handler)) { 169 m_signalHandlers.put(signal.getClass(), handlers); 170 } 171 172 if (0 == handlers.size()) { 173 m_signalHandlers.remove(signal.getClass()); 174 } 175 } 176 } catch (InterruptedException ie) { 177 } finally { 179 m_mutex.release(); 180 } 181 } 182 183 187 public void dispose() { 188 m_isAccepting = false; 189 Object [] remainingElements = m_queue.dequeueAll(); 190 for (int i = 0; i < remainingElements.length; i++) { 191 getEventHandler().handleEvent(remainingElements[i]); 192 } 193 } 194 195 201 public final Source[] getSources() { 202 return m_sources; 203 } 204 205 211 public final EventHandler getEventHandler() { 212 return m_eventHandler; 213 } 214 215 private final class CommandEventHandler implements EventHandler { 216 private final Map m_signalHandlers; 217 private final Buffer m_delayedCommands = new UnboundedFifoBuffer(); 218 219 protected CommandEventHandler(Map signalHandlers) { 220 m_signalHandlers = signalHandlers; 221 } 222 223 public final void handleEvents(Object [] elements) { 224 for (int i = 0; i < elements.length; i++) { 225 handleEvent(elements[i]); 226 } 227 228 int size = m_delayedCommands.size(); 229 for (int i = 0; i < size; i++) { 230 DelayedCommandInfo command = (DelayedCommandInfo) m_delayedCommands.remove(); 231 232 if (System.currentTimeMillis() >= command.m_nextRunTime) { 233 try { 234 command.m_command.execute(); 235 } catch (Exception e) { 236 if (getLogger().isWarnEnabled()) { 237 getLogger().warn("Exception during Command.execute()", e); 238 } 239 } 240 241 command.m_numExecutions++; 242 243 if (command.m_repeatable) { 244 RepeatedCommand cmd = (RepeatedCommand) command.m_command; 245 int numRepeats = cmd.getNumberOfRepeats(); 246 247 if ((numRepeats < 1) || (command.m_numExecutions < numRepeats)) { 248 command.m_nextRunTime = System.currentTimeMillis() + 249 cmd.getRepeatInterval(); 250 m_delayedCommands.add(command); 251 } 252 } 253 } else { 254 m_delayedCommands.add(command); 256 } 257 } 258 } 259 260 public final void handleEvent(Object element) { 261 if (!(element instanceof Signal)) { 262 return; 263 } 264 265 if (!(element instanceof Command)) { 266 ArrayList handlers = (ArrayList ) m_signalHandlers.get(element.getClass()); 267 268 if (null != handlers) { 269 Iterator i = handlers.iterator(); 270 271 while (i.hasNext()) { 272 EventHandler handler = (EventHandler) i.next(); 273 handler.handleEvent(element); 274 } 275 } 276 277 return; 278 } 279 280 if (element instanceof DelayedCommand) { 281 DelayedCommandInfo commandInfo = new DelayedCommandInfo(); 282 commandInfo.m_command = (DelayedCommand) element; 283 commandInfo.m_nextRunTime = System.currentTimeMillis() + 284 commandInfo.m_command.getDelayInterval(); 285 commandInfo.m_numExecutions = 0; 286 commandInfo.m_repeatable = element instanceof RepeatedCommand; 287 288 m_delayedCommands.add(commandInfo); 289 return; 290 } 291 292 try { 293 ((Command) element).execute(); 294 } catch (Exception e) { 295 boolean stopProcessing = 296 getCommandFailureHandler().handleCommandFailure((Command) element, e); 297 298 301 if (stopProcessing) { 302 m_isAccepting = false; 303 m_queue.dequeueAll(); 304 } 305 } 306 } 307 } 308 309 private static final class DelayedCommandInfo { 310 protected DelayedCommand m_command; 311 protected long m_nextRunTime; 312 protected int m_numExecutions; 313 protected boolean m_repeatable; 314 } 315 316 326 public boolean accept(Object element, Sink modifyingSink) { 327 return m_isAccepting; 328 } 329 330 340 public boolean accept(Object elements[], Sink modifyingSink) { 341 return m_isAccepting; 342 } 343 } 344 | Popular Tags |