1 9 package javolution.context; 10 11 import j2me.lang.ThreadLocal; 12 import j2me.lang.UnsupportedOperationException; 13 14 import javolution.context.ConcurrentExecutor.Status; 15 import javolution.lang.Configurable; 16 import javolution.lang.Reflection; 17 18 153 public class ConcurrentContext extends Context { 154 155 158 private static Factory FACTORY = new Factory() { 159 protected Object create() { 160 return new ConcurrentContext(); 161 } 162 }; 163 164 169 public static final ConfigurableCONCURRENCY = new Configurable( 170 concurrency()); 171 172 175 private static final LocalContext.Reference ENABLED = new LocalContext.Reference( 176 new Boolean (true)); 177 178 181 private static final ThreadLocal ARGUMENTS = new ThreadLocal (); 182 183 186 private static final Class CLASS = new ConcurrentContext().getClass(); 187 188 191 private static transient ConcurrentExecutor[] _DefaultExecutors; 192 193 197 private int _initiatedCount; 198 199 202 private int _completedCount; 203 204 207 private Throwable _error; 208 209 213 private ConcurrentExecutor[] _executors; 214 215 218 private boolean _isEnabled; 219 220 223 private ConcurrentExecutor[] _inheritedExecutors; 224 225 230 public ConcurrentContext() { 231 this(null); 232 } 233 234 239 public ConcurrentContext(ConcurrentExecutor[] executors) { 240 _executors = executors; 241 } 242 243 251 public static ConcurrentExecutor[] getDefaultExecutors() { 252 if (_DefaultExecutors != null) 253 return _DefaultExecutors; 254 synchronized (CLASS) { if (_DefaultExecutors != null) 256 return _DefaultExecutors; int concurrency = ((Integer ) CONCURRENCY.get()).intValue(); 258 ConcurrentThread[] executors = new ConcurrentThread[concurrency]; 259 for (int i = 0; i < concurrency; i++) { 260 executors[i] = new ConcurrentThread(); 261 executors[i].start(); 262 } 263 _DefaultExecutors = executors; 264 return _DefaultExecutors; 265 } 266 } 267 268 273 public static void setDefaultExecutors(ConcurrentExecutor[] executors) { 274 _DefaultExecutors = executors; 275 } 276 277 283 final ConcurrentExecutor[] getExecutors() { 284 if (_executors != null) 285 return _executors; 286 for (Context ctx = this.getOuter(); ctx != null; ctx = ctx.getOuter()) { 287 if (ctx instanceof ConcurrentContext) { 288 ConcurrentContext that = (ConcurrentContext) ctx; 289 if (that._executors != null) 290 return that._executors; 291 } 292 } 293 return getDefaultExecutors(); 294 } 295 296 299 public void clear() { 300 if (_executors != null) { 301 for (int i = 0; i < _executors.length; i++) { 302 _executors[i].terminate(); 303 } 304 _executors = null; 305 } 306 } 307 308 314 public staticContext current() { 315 Context ctx = Context.current(); 316 while (ctx != null) { 317 if (ctx instanceof ConcurrentContext) 318 return (ConcurrentContext) ctx; 319 ctx = ctx.getOuter(); 320 } 321 return null; 322 } 323 324 329 public static void enter() { 330 ConcurrentContext ctx = (ConcurrentContext) FACTORY.object(); 331 ctx._isInternal = true; 332 ctx._isEnabled = ConcurrentContext.isEnabled(); 333 Context.enter(ctx); 334 } 335 336 private transient boolean _isInternal; 337 338 344 public static void exit() { 345 ConcurrentContext ctx = (ConcurrentContext) Context.current(); 346 if (!ctx._isInternal) 347 throw new UnsupportedOperationException ( 348 "The context to exit must be specified"); 349 ctx._isInternal = false; 350 Context.exitNoCheck(ctx); 351 FACTORY.recycle(ctx); 352 } 353 354 361 public static void setEnabled(boolean enabled) { 362 ENABLED.set(enabled ? TRUE : FALSE); 363 } 364 365 private static final Boolean TRUE = new Boolean (true); 367 private static final Boolean FALSE = new Boolean (false); 369 377 public static boolean isEnabled() { 378 return ((Boolean ) ENABLED.get()).booleanValue(); 379 } 380 381 390 public static void execute(Logic logic) { 391 ConcurrentContext ctx = (ConcurrentContext) current(); 392 393 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 394 status._args = status._args0; 395 396 if ((ctx != null) && (ctx.doExecute(logic, status))) 397 return; ARGUMENTS.set(status._args); 400 logic.run(); 401 StatusImpl.FACTORY.recycle(status); 402 } 403 404 412 public static void execute(Logic logic, Object arg0) { 413 ConcurrentContext ctx = (ConcurrentContext) current(); 414 415 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 416 status._args = status._args1; 417 status._args[0] = arg0; 418 419 if ((ctx != null) && (ctx.doExecute(logic, status))) 420 return; ARGUMENTS.set(status._args); 423 logic.run(); 424 StatusImpl.FACTORY.recycle(status); 425 } 426 427 437 public static void execute(Logic logic, Object arg0, Object arg1) { 438 ConcurrentContext ctx = (ConcurrentContext) current(); 439 440 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 441 status._args = status._args2; 442 status._args[0] = arg0; 443 status._args[1] = arg1; 444 445 if ((ctx != null) && (ctx.doExecute(logic, status))) 446 return; ARGUMENTS.set(status._args); 449 logic.run(); 450 StatusImpl.FACTORY.recycle(status); 451 } 452 453 464 public static void execute(Logic logic, Object arg0, Object arg1, 465 Object arg2) { 466 ConcurrentContext ctx = (ConcurrentContext) current(); 467 468 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 469 status._args = status._args3; 470 status._args[0] = arg0; 471 status._args[1] = arg1; 472 status._args[2] = arg2; 473 474 if ((ctx != null) && (ctx.doExecute(logic, status))) 475 return; ARGUMENTS.set(status._args); 478 logic.run(); 479 StatusImpl.FACTORY.recycle(status); 480 } 481 482 494 public static void execute(Logic logic, Object arg0, Object arg1, 495 Object arg2, Object arg3) { 496 ConcurrentContext ctx = (ConcurrentContext) current(); 497 498 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 499 status._args = status._args4; 500 status._args[0] = arg0; 501 status._args[1] = arg1; 502 status._args[2] = arg2; 503 status._args[3] = arg3; 504 505 if ((ctx != null) && (ctx.doExecute(logic, status))) 506 return; ARGUMENTS.set(status._args); 509 logic.run(); 510 StatusImpl.FACTORY.recycle(status); 511 } 512 513 526 public static void execute(Logic logic, Object arg0, Object arg1, 527 Object arg2, Object arg3, Object arg4) { 528 ConcurrentContext ctx = (ConcurrentContext) current(); 529 530 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 531 status._args = status._args5; 532 status._args[0] = arg0; 533 status._args[1] = arg1; 534 status._args[2] = arg2; 535 status._args[3] = arg3; 536 status._args[4] = arg4; 537 538 if ((ctx != null) && (ctx.doExecute(logic, status))) 539 return; ARGUMENTS.set(status._args); 542 logic.run(); 543 StatusImpl.FACTORY.recycle(status); 544 } 545 546 560 public static void execute(Logic logic, Object arg0, Object arg1, 561 Object arg2, Object arg3, Object arg4, Object arg5) { 562 ConcurrentContext ctx = (ConcurrentContext) current(); 563 564 StatusImpl status = (StatusImpl) StatusImpl.FACTORY.object(); 565 status._args = status._args6; 566 status._args[0] = arg0; 567 status._args[1] = arg1; 568 status._args[2] = arg2; 569 status._args[3] = arg3; 570 status._args[4] = arg4; 571 status._args[5] = arg5; 572 573 if ((ctx != null) && (ctx.doExecute(logic, status))) 574 return; ARGUMENTS.set(status._args); 577 logic.run(); 578 StatusImpl.FACTORY.recycle(status); 579 } 580 581 protected void enterAction() { 583 _inheritedExecutors = getExecutors(); 584 _error = null; 585 _initiatedCount = 0; 586 _completedCount = 0; 587 } 588 589 protected void exitAction() { 591 synchronized (this) { 592 while (_initiatedCount != _completedCount) { 593 try { 594 this.wait(); 595 } catch (InterruptedException e) { 596 throw new ConcurrentException(e); 597 } 598 } 599 } 600 if (_error != null) { 602 if (_error instanceof RuntimeException ) 603 throw ((RuntimeException ) _error); 604 if (_error instanceof Error ) 605 throw ((Error ) _error); 606 throw new ConcurrentException(_error); } 608 } 609 610 private boolean doExecute(Logic logic, StatusImpl status) { 611 if (_isEnabled) { 612 status._context = this; 613 ConcurrentExecutor[] executors = _inheritedExecutors; 614 for (int i = 0; i < executors.length; i++) { 615 if (executors[i].execute(logic, status)) { 616 _initiatedCount++; 617 return true; } 619 } 620 } 621 return false; 622 } 623 624 628 public static abstract class Logic implements Runnable { 629 630 635 public Object [] getArguments() { 636 return (Object []) ARGUMENTS.get(); 637 } 638 639 644 publicObject getArgument(int i) { 645 return (Object ) ((Object []) ARGUMENTS.get())[i]; 646 } 647 648 652 public abstract void run(); 653 654 } 655 656 662 private static Integer concurrency() { 663 Reflection.Method availableProcessors = Reflection 664 .getMethod("java.lang.Runtime.availableProcessors()"); 665 if (availableProcessors != null) { 666 Integer processors = (Integer ) availableProcessors.invoke(Runtime 667 .getRuntime()); 668 return new Integer (processors.intValue()); 669 } else { 670 return new Integer (1); 671 } 672 } 673 674 680 private static class StatusImpl extends RealtimeObject implements Status { 681 682 private static StatusImpl.Factory FACTORY = new Factory() { 683 protected Object create() { 684 return new StatusImpl(); 685 } 686 protected void cleanup(Object status) { 687 ((StatusImpl)status).reset(); 688 } 689 690 }; 691 692 volatile Object [] _args; 693 694 Object [] _args0 = new Object [0]; 695 696 Object [] _args1 = new Object [1]; 697 698 Object [] _args2 = new Object [2]; 699 700 Object [] _args3 = new Object [3]; 701 702 Object [] _args4 = new Object [4]; 703 704 Object [] _args5 = new Object [5]; 705 706 Object [] _args6 = new Object [6]; 707 708 ConcurrentContext _context; 709 710 public void started() { 711 ARGUMENTS.set(_args); 712 Context.setCurrent(_context); 713 _context.getLocalPools().activatePools(); 714 } 715 716 public void completed() { 717 _context.getLocalPools().deactivatePools(); 718 719 synchronized (_context) { 721 _context._completedCount++; 722 _context.notify(); 723 } 724 } 725 726 public void error(Throwable error) { 727 synchronized (_context) { 728 if (_context._error == null) { 729 _context._error = error; 730 } 731 } 732 } 733 734 void reset() { 735 for (int i = 0; i < _args.length; i++) { 736 _args[i] = null; } 738 _context = null; 739 } 740 } 741 } | Popular Tags |