1 package org.columba.core.command; 17 18 import java.util.ArrayList ; 19 import java.util.List ; 20 import java.util.logging.Logger ; 21 22 import org.columba.api.command.ICommand; 23 import org.columba.core.base.Mutex; 24 import org.columba.core.gui.exception.ExceptionHandler; 25 26 34 public class CommandProcessor implements Runnable { 35 36 private static final Logger LOG = Logger 37 .getLogger("org.columba.api.command"); 39 public final static int MAX_WORKERS = 5; 40 41 List <OperationItem> operationQueue; 42 43 List <Worker> worker; 44 45 private Mutex oneMutex; 46 47 private int timeStamp; 48 49 private boolean stopped = false; 50 51 private static CommandProcessor instance = new CommandProcessor(); 52 53 public CommandProcessor() { 54 this(true); 55 } 56 57 public static CommandProcessor getInstance() { 58 return instance; 59 } 60 61 64 public CommandProcessor(boolean start) { 65 operationQueue = new ArrayList <OperationItem>(10); 66 67 worker = new ArrayList <Worker>(MAX_WORKERS); 68 69 for (int i = 0; i < MAX_WORKERS; i++) { 71 Worker w = new Worker(this); 72 w.addExceptionListener(new ExceptionHandler()); 73 worker.add(w); 74 75 } 76 77 oneMutex = new Mutex(); 78 79 timeStamp = 0; 80 81 if (start) 82 new Thread (this).start(); 83 } 84 85 92 public void addOp(final Command op) { 93 addOp(op, Command.FIRST_EXECUTION); 94 } 95 96 104 public void addOp(final Command op, final int operationMode) { 105 try { 106 oneMutex.lock(); 107 108 LOG.finest("Command " + op.toString() + " added"); 110 int p = operationQueue.size() - 1; 111 OperationItem nextOp; 112 113 while (p != -1) { 119 nextOp = operationQueue.get(p); 120 121 if ((nextOp.getOperation().getPriority() < op.getPriority()) 122 && !nextOp.getOperation().isSynchronize()) { 123 p--; 124 } else { 125 break; 126 } 127 } 128 129 operationQueue.add(p + 1, new OperationItem(op, operationMode)); 130 } finally { 131 oneMutex.release(); 132 } 133 134 wakeUp(); 135 } 136 137 145 private boolean canBeProcessed(final OperationItem opItem) { 146 return opItem.getOperation().canBeProcessed(); 147 } 148 149 154 private OperationItem nextOpItem() { 155 OperationItem nextOp = null; 156 157 for (int i = 0; i < operationQueue.size() && nextOp == null; i++) { 158 nextOp = operationQueue.get(i); 159 160 if ((i != 0) && (nextOp.getOperation().isSynchronize())) { 161 nextOp = null; 162 163 break; 166 } 167 168 try { 169 if (!canBeProcessed(nextOp)) { 170 nextOp = null; 171 } 172 } catch (RuntimeException e) { 173 operationQueue.remove(nextOp); 175 nextOp = null; 176 177 LOG.warning("Operation failed: " + e.getMessage()); } 179 180 } 181 182 return nextOp; 183 } 184 185 193 public void operationFinished(final ICommand op, final Worker w) { 194 195 try { 196 oneMutex.lock(); 197 198 worker.add(w); 199 } finally { 200 oneMutex.release(); 201 } 202 203 wakeUp(); 205 } 206 207 214 Worker getWorker(int priority) { 215 Worker result = null; 216 if (worker.size() > 1) { 217 result = worker.remove(0); 218 } else if (worker.size() > 0 && priority >= Command.REALTIME_PRIORITY) { 219 result = worker.remove(0); 220 } 221 222 return result; 223 } 224 225 228 private synchronized void waitForNotify() { 229 try { 230 wait(); 231 } catch (InterruptedException e) { 232 e.printStackTrace(); 233 } 234 } 235 236 private synchronized void wakeUp() { 237 notifyAll(); 238 } 239 240 243 public void run() { 244 boolean sleep; 245 246 while (true && !stopped) { 247 sleep = startOperation(); 248 249 if (sleep) { 250 waitForNotify(); 251 sleep = false; 252 } 253 } 254 255 } 256 257 261 boolean startOperation() { 262 boolean sleep = false; 263 try { 264 oneMutex.lock(); 265 OperationItem _opItem; 266 Worker _worker; 267 _opItem = nextOpItem(); 268 if (_opItem != null && !stopped) { 269 _worker = getWorker(_opItem.getOperation().getPriority()); 270 if (_worker != null && !stopped) { 271 operationQueue.remove(_opItem); 272 273 _worker.process(_opItem.getOperation(), _opItem 274 .getOperationMode(), timeStamp++); 275 276 _worker.start(); 277 } else { 278 sleep = true; 279 } 280 } else { 281 sleep = true; 282 } 283 } finally { 284 oneMutex.release(); 285 } 286 return sleep; 287 } 288 289 public synchronized void stop() { 290 stopped = true; 291 notify(); 292 } 293 294 } 295 296 301 302 class OperationItem { 303 private Command operation; 304 305 private int operationMode; 306 307 public OperationItem(Command op, int opMode) { 308 operation = op; 309 operationMode = opMode; 310 } 311 312 public Command getOperation() { 313 return operation; 314 } 315 316 public int getOperationMode() { 317 return operationMode; 318 } 319 } | Popular Tags |