1 9 package javolution.realtime; 10 11 import javolution.JavolutionError; 12 13 82 public final class ConcurrentContext extends Context { 83 84 87 private static final int QUEUE_SIZE = 256; 88 89 92 private static final int ARGS_SIZE = 6; 93 94 97 private Logic[] _logics = new Logic[QUEUE_SIZE]; 98 99 102 private Object[][] _args = new Object[QUEUE_SIZE][]; 103 104 107 private Object[][][] _argsPool = new Object[QUEUE_SIZE][ARGS_SIZE][]; 108 109 112 private int _logicsCount; 113 114 117 private static final LocalReference ENABLED 118 = new LocalReference(new Boolean(true)); 119 120 124 private int _concurrency; 125 126 130 private int _threadsDone; 131 132 135 private Throwable _error; 136 137 140 ConcurrentContext() { 141 } 142 143 149 public static void setEnabled(boolean enabled) { 150 ENABLED.set(new Boolean(enabled)); 151 } 152 153 160 public static boolean isEnabled() { 161 return ((Boolean) ENABLED.get()).booleanValue(); 162 } 163 164 167 public static void enter() { 168 ConcurrentContext ctx = (ConcurrentContext) push(CONCURRENT_CONTEXT_CLASS); 169 if (ctx == null) { 170 ctx = new ConcurrentContext(); 171 push(ctx); 172 } 173 } 174 private static final Class CONCURRENT_CONTEXT_CLASS = new ConcurrentContext().getClass(); 175 176 187 public static void execute(Logic logic) { 188 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 189 if (ctx._logicsCount >= QUEUE_SIZE) { 190 ctx.flush(); 191 } 192 ctx._args[ctx._logicsCount] = Logic.NO_ARG; 193 ctx._logics[ctx._logicsCount++] = logic; 194 } 195 196 205 public static void execute(Logic logic, Object arg0) { 206 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 207 if (ctx._logicsCount >= QUEUE_SIZE) { 208 ctx.flush(); 209 } 210 Object[] args = ctx.getArgs(1); 211 args[0] = arg0; 212 ctx._logics[ctx._logicsCount++] = logic; 213 } 214 215 225 public static void execute(Logic logic, Object arg0, Object arg1) { 226 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 227 if (ctx._logicsCount >= QUEUE_SIZE) { 228 ctx.flush(); 229 } 230 Object[] args = ctx.getArgs(2); 231 args[0] = arg0; 232 args[1] = arg1; 233 ctx._logics[ctx._logicsCount++] = logic; 234 } 235 236 247 public static void execute(Logic logic, Object arg0, Object arg1, 248 Object arg2) { 249 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 250 if (ctx._logicsCount >= QUEUE_SIZE) { 251 ctx.flush(); 252 } 253 Object[] args = ctx.getArgs(3); 254 args[0] = arg0; 255 args[1] = arg1; 256 args[2] = arg2; 257 ctx._logics[ctx._logicsCount++] = logic; 258 } 259 260 272 public static void execute(Logic logic, Object arg0, Object arg1, 273 Object arg2, Object arg3) { 274 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 275 if (ctx._logicsCount >= QUEUE_SIZE) { 276 ctx.flush(); 277 } 278 Object[] args = ctx.getArgs(4); 279 args[0] = arg0; 280 args[1] = arg1; 281 args[2] = arg2; 282 args[3] = arg3; 283 ctx._logics[ctx._logicsCount++] = logic; 284 } 285 286 299 public static void execute(Logic logic, Object arg0, Object arg1, 300 Object arg2, Object arg3, Object arg4) { 301 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 302 if (ctx._logicsCount >= QUEUE_SIZE) { 303 ctx.flush(); 304 } 305 Object[] args = ctx.getArgs(5); 306 args[0] = arg0; 307 args[1] = arg1; 308 args[2] = arg2; 309 args[3] = arg3; 310 args[4] = arg4; 311 ctx._logics[ctx._logicsCount++] = logic; 312 } 313 314 328 public static void execute(Logic logic, Object arg0, Object arg1, 329 Object arg2, Object arg3, Object arg4, Object arg5) { 330 ConcurrentContext ctx = (ConcurrentContext) currentContext(); 331 if (ctx._logicsCount >= QUEUE_SIZE) { 332 ctx.flush(); 333 } 334 Object[] args = ctx.getArgs(6); 335 args[0] = arg0; 336 args[1] = arg1; 337 args[2] = arg2; 338 args[3] = arg3; 339 args[4] = arg4; 340 args[5] = arg5; 341 ctx._logics[ctx._logicsCount++] = logic; 342 } 343 344 354 public static void exit() throws ConcurrentException { 355 ConcurrentContext ctx = (ConcurrentContext) Context.currentContext(); 356 ctx.flush(); Context.pop(); 358 359 if (ctx._error != null) { 361 ConcurrentException error = new ConcurrentException(ctx._error); 362 ctx._error = null; throw error; 364 } 365 } 366 367 372 synchronized void setError(Throwable error) { 373 if (_error == null) { _error = error; 375 } } 377 378 384 boolean executeNext() { 385 int index; 386 synchronized (this) { 387 if (_logicsCount > 0) { 388 index = --_logicsCount; 389 } else { 390 _threadsDone++; 391 this.notify(); 392 return false; 393 } 394 } 395 try { 396 Object[] args = _args[index]; 397 _logics[index].run(args); 398 _logics[index] = null; 399 for (int j = args.length; j > 0;) { 400 args[--j] = null; 401 } 402 } catch (Throwable error) { 403 setError(error); 404 } 405 return true; 406 } 407 408 411 private void flush() { 412 PoolContext.enter(); 415 try { 416 _concurrency = ConcurrentContext.isEnabled() ? ConcurrentThread 417 .execute(this) : 0; 418 while (executeNext()) { 419 ((PoolContext) Context.currentContext()).recyclePools(); 420 } 421 synchronized (this) { 422 while (_threadsDone <= _concurrency) { 423 this.wait(); 424 } } 426 } catch (InterruptedException e) { 427 throw new JavolutionError(e); 428 } finally { 429 _threadsDone = 0; 430 PoolContext.exit(); 431 } 432 } 433 434 440 private Object[] getArgs(int length) { 441 Object[] args = _argsPool[_logicsCount][length - 1]; 442 if (args == null) { 443 args = new Object[length]; 444 _argsPool[_logicsCount][length - 1] = args; 445 } 446 _args[_logicsCount] = args; 447 return args; 448 } 449 450 protected void dispose() { 452 _logics = null; 453 _args = null; 454 _argsPool = null; 455 } 456 457 461 public static abstract class Logic implements Runnable { 462 463 466 public final void run() { 467 run(NO_ARG); 468 } 469 470 private static final Object[] NO_ARG = new Object[0]; 471 472 481 public abstract void run(Object[] args); 482 } 483 484 } | Popular Tags |