1 33 package org.jruby; 34 35 import java.util.HashMap ; 36 import java.util.Map ; 37 38 import org.jruby.exceptions.RaiseException; 39 import org.jruby.exceptions.ThreadKill; 40 import org.jruby.internal.runtime.NativeThread; 41 import org.jruby.internal.runtime.ThreadService; 42 import org.jruby.runtime.Block; 43 import org.jruby.runtime.CallbackFactory; 44 import org.jruby.runtime.ObjectAllocator; 45 import org.jruby.runtime.builtin.IRubyObject; 46 47 60 public class RubyThread extends RubyObject { 61 private NativeThread threadImpl; 62 private Map threadLocalVariables = new HashMap (); 63 private boolean abortOnException; 64 private IRubyObject finalResult; 65 private RaiseException exitingException; 66 private IRubyObject receivedException; 67 private RubyThreadGroup threadGroup; 68 69 private ThreadService threadService; 70 private Object hasStartedLock = new Object (); 71 private boolean hasStarted = false; 72 private volatile boolean isStopped = false; 73 public Object stopLock = new Object (); 74 75 private volatile boolean killed = false; 76 public Object killLock = new Object (); 77 private RubyThread joinedByCriticalThread; 78 79 public static RubyClass createThreadClass(Ruby runtime) { 80 RubyClass threadClass = runtime.defineClass("Thread", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR); 84 CallbackFactory callbackFactory = runtime.callbackFactory(RubyThread.class); 85 86 threadClass.defineFastMethod("[]", callbackFactory.getFastMethod("aref", RubyKernel.IRUBY_OBJECT)); 87 threadClass.defineFastMethod("[]=", callbackFactory.getFastMethod("aset", RubyKernel.IRUBY_OBJECT, RubyKernel.IRUBY_OBJECT)); 88 threadClass.defineFastMethod("abort_on_exception", callbackFactory.getFastMethod("abort_on_exception")); 89 threadClass.defineFastMethod("abort_on_exception=", callbackFactory.getFastMethod("abort_on_exception_set", RubyKernel.IRUBY_OBJECT)); 90 threadClass.defineFastMethod("alive?", callbackFactory.getFastMethod("is_alive")); 91 threadClass.defineFastMethod("group", callbackFactory.getFastMethod("group")); 92 threadClass.defineFastMethod("join", callbackFactory.getFastOptMethod("join")); 93 threadClass.defineFastMethod("value", callbackFactory.getFastMethod("value")); 94 threadClass.defineFastMethod("key?", callbackFactory.getFastMethod("has_key", RubyKernel.IRUBY_OBJECT)); 95 threadClass.defineFastMethod("priority", callbackFactory.getFastMethod("priority")); 96 threadClass.defineFastMethod("priority=", callbackFactory.getFastMethod("priority_set", RubyKernel.IRUBY_OBJECT)); 97 threadClass.defineFastMethod("raise", callbackFactory.getFastMethod("raise", RubyKernel.IRUBY_OBJECT)); 98 threadClass.defineFastMethod("run", callbackFactory.getFastMethod("run")); 99 threadClass.defineFastMethod("status", callbackFactory.getFastMethod("status")); 100 threadClass.defineFastMethod("stop?", callbackFactory.getFastMethod("isStopped")); 101 threadClass.defineFastMethod("wakeup", callbackFactory.getFastMethod("wakeup")); 102 threadClass.defineFastMethod("kill", callbackFactory.getFastMethod("kill")); 105 threadClass.defineMethod("exit", callbackFactory.getMethod("exit")); 106 107 threadClass.getMetaClass().defineFastMethod("current", callbackFactory.getFastSingletonMethod("current")); 108 threadClass.getMetaClass().defineMethod("fork", callbackFactory.getOptSingletonMethod("newInstance")); 109 threadClass.getMetaClass().defineMethod("new", callbackFactory.getOptSingletonMethod("newInstance")); 110 threadClass.getMetaClass().defineFastMethod("list", callbackFactory.getFastSingletonMethod("list")); 111 threadClass.getMetaClass().defineFastMethod("pass", callbackFactory.getFastSingletonMethod("pass")); 112 threadClass.getMetaClass().defineMethod("start", callbackFactory.getOptSingletonMethod("start")); 113 threadClass.getMetaClass().defineFastMethod("critical=", callbackFactory.getFastSingletonMethod("critical_set", RubyBoolean.class)); 114 threadClass.getMetaClass().defineFastMethod("critical", callbackFactory.getFastSingletonMethod("critical")); 115 threadClass.getMetaClass().defineFastMethod("stop", callbackFactory.getFastSingletonMethod("stop")); 116 threadClass.getMetaClass().defineMethod("kill", callbackFactory.getSingletonMethod("s_kill", RubyThread.class)); 117 threadClass.getMetaClass().defineMethod("exit", callbackFactory.getSingletonMethod("s_exit")); 118 threadClass.getMetaClass().defineFastMethod("abort_on_exception", callbackFactory.getFastSingletonMethod("abort_on_exception")); 119 threadClass.getMetaClass().defineFastMethod("abort_on_exception=", callbackFactory.getFastSingletonMethod("abort_on_exception_set", RubyKernel.IRUBY_OBJECT)); 120 121 RubyThread rubyThread = new RubyThread(runtime, threadClass); 122 rubyThread.hasStarted = true; 124 rubyThread.threadImpl = new NativeThread(rubyThread, Thread.currentThread()); 126 runtime.getThreadService().setMainThread(rubyThread); 127 128 threadClass.getMetaClass().defineFastMethod("main", callbackFactory.getFastSingletonMethod("main")); 129 130 return threadClass; 131 } 132 133 149 public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) { 150 return startThread(recv, args, true, block); 151 } 152 153 158 public static RubyThread start(IRubyObject recv, IRubyObject[] args, Block block) { 159 return startThread(recv, args, false, block); 160 } 161 162 public static RubyThread adopt(IRubyObject recv, Thread t) { 163 return adoptThread(recv, t, Block.NULL_BLOCK); 164 } 165 166 private static RubyThread adoptThread(final IRubyObject recv, Thread t, Block block) { 167 final Ruby runtime = recv.getRuntime(); 168 final RubyThread rubyThread = new RubyThread(runtime, (RubyClass) recv, false); 169 170 rubyThread.threadImpl = new NativeThread(rubyThread, t); 171 runtime.getThreadService().registerNewThread(rubyThread); 172 173 runtime.getCurrentContext().preAdoptThread(); 174 175 rubyThread.callInit(new IRubyObject[0], block); 176 177 rubyThread.notifyStarted(); 178 179 return rubyThread; 180 } 181 182 private static RubyThread startThread(final IRubyObject recv, final IRubyObject[] args, boolean callInit, Block block) { 183 if (!block.isGiven()) throw recv.getRuntime().newThreadError("must be called with a block"); 184 185 RubyThread rubyThread = new RubyThread(recv.getRuntime(), (RubyClass) recv); 186 187 if (callInit) rubyThread.callInit(args, block); 188 189 rubyThread.threadImpl = new NativeThread(rubyThread, args, block); 190 rubyThread.threadImpl.start(); 191 192 rubyThread.ensureStarted(); 194 195 return rubyThread; 196 } 197 198 public void cleanTerminate(IRubyObject result) { 199 try { 200 finalResult = result; 201 isStopped = true; 202 waitIfCriticalized(); 203 } catch (InterruptedException ie) { 204 } 206 } 207 208 public void waitIfCriticalized() throws InterruptedException { 209 RubyThread criticalThread = getRuntime().getThreadService().getCriticalThread(); 210 if (criticalThread != null && criticalThread != this && criticalThread != joinedByCriticalThread) { 211 synchronized (criticalThread) { 212 criticalThread.wait(); 213 } 214 } 215 } 216 217 public void notifyStarted() { 218 assert isCurrent(); 219 synchronized (hasStartedLock) { 220 hasStarted = true; 221 hasStartedLock.notifyAll(); 222 } 223 } 224 225 public void pollThreadEvents() { 226 pollReceivedExceptions(); 228 229 criticalizeOrDieIfKilled(); 231 } 232 233 private void pollReceivedExceptions() { 234 if (receivedException != null) { 235 IRubyObject raiseException = receivedException; 237 receivedException = null; 238 RubyModule kernelModule = getRuntime().getModule("Kernel"); 239 kernelModule.callMethod(getRuntime().getCurrentContext(), "raise", raiseException); 240 } 241 } 242 243 public void criticalizeOrDieIfKilled() { 244 try { 245 waitIfCriticalized(); 246 } catch (InterruptedException ie) { 247 throw new ThreadKill(); 248 } 249 dieIfKilled(); 250 } 251 252 private RubyThread(Ruby runtime, RubyClass type) { 253 super(runtime, type); 254 this.threadService = runtime.getThreadService(); 255 RubyThreadGroup defaultThreadGroup = (RubyThreadGroup)runtime.getClass("ThreadGroup").getConstant("Default"); 257 defaultThreadGroup.add(this, Block.NULL_BLOCK); 258 finalResult = runtime.getNil(); 259 } 260 261 private RubyThread(Ruby runtime, RubyClass type, boolean narf) { 262 super(runtime, type); 263 this.threadService = runtime.getThreadService(); 264 265 RubyThreadGroup defaultThreadGroup = (RubyThreadGroup)runtime.getClass("ThreadGroup").getConstant("Default"); 267 defaultThreadGroup.add(this, Block.NULL_BLOCK); 268 finalResult = runtime.getNil(); 269 } 270 271 277 public static RubyBoolean abort_on_exception(IRubyObject recv) { 278 Ruby runtime = recv.getRuntime(); 279 return runtime.isGlobalAbortOnExceptionEnabled() ? recv.getRuntime().getTrue() : recv.getRuntime().getFalse(); 280 } 281 282 public static IRubyObject abort_on_exception_set(IRubyObject recv, IRubyObject value) { 283 recv.getRuntime().setGlobalAbortOnExceptionEnabled(value.isTrue()); 284 return value; 285 } 286 287 public static RubyThread current(IRubyObject recv) { 288 return recv.getRuntime().getCurrentContext().getThread(); 289 } 290 291 public static RubyThread main(IRubyObject recv) { 292 return recv.getRuntime().getThreadService().getMainThread(); 293 } 294 295 public static IRubyObject pass(IRubyObject recv) { 296 Ruby runtime = recv.getRuntime(); 297 ThreadService ts = runtime.getThreadService(); 298 RubyThread criticalThread = ts.getCriticalThread(); 299 RubyThread currentThread = ts.getCurrentContext().getThread(); 300 301 if (criticalThread == currentThread) { 302 ts.setCritical(false); 304 } 305 306 Thread.yield(); 307 308 if (criticalThread != null) { 309 ts.setCritical(true); 311 } 312 313 return recv.getRuntime().getNil(); 314 } 315 316 public static RubyArray list(IRubyObject recv) { 317 RubyThread[] activeThreads = recv.getRuntime().getThreadService().getActiveRubyThreads(); 318 319 return recv.getRuntime().newArrayNoCopy(activeThreads); 320 } 321 322 public IRubyObject aref(IRubyObject key) { 323 String name = keyName(key); 324 if (!threadLocalVariables.containsKey(name)) { 325 return getRuntime().getNil(); 326 } 327 return (IRubyObject) threadLocalVariables.get(name); 328 } 329 330 public IRubyObject aset(IRubyObject key, IRubyObject value) { 331 String name = keyName(key); 332 threadLocalVariables.put(name, value); 333 return value; 334 } 335 336 private String keyName(IRubyObject key) { 337 String name; 338 if (key instanceof RubySymbol) { 339 name = key.asSymbol(); 340 } else if (key instanceof RubyString) { 341 name = ((RubyString) key).toString(); 342 } else { 343 throw getRuntime().newArgumentError(key.inspect() + " is not a symbol"); 344 } 345 return name; 346 } 347 348 public RubyBoolean abort_on_exception() { 349 return abortOnException ? getRuntime().getTrue() : getRuntime().getFalse(); 350 } 351 352 public IRubyObject abort_on_exception_set(IRubyObject val) { 353 abortOnException = val.isTrue(); 354 return val; 355 } 356 357 public RubyBoolean is_alive() { 358 return threadImpl.isAlive() ? getRuntime().getTrue() : getRuntime().getFalse(); 359 } 360 361 public RubyThread join(IRubyObject[] args) { 362 if (isCurrent()) { 363 throw getRuntime().newThreadError("thread tried to join itself"); 364 } 365 ensureStarted(); 366 try { 367 RubyThread criticalThread = getRuntime().getThreadService().getCriticalThread(); 368 if (criticalThread != null) { 369 joinedByCriticalThread = criticalThread; 371 threadImpl.interrupt(); } 373 threadImpl.join(); 374 } catch (InterruptedException iExcptn) { 375 assert false : iExcptn; 376 } 377 if (exitingException != null) { 378 throw exitingException; 379 } 380 return this; 381 } 382 383 public IRubyObject value() { 384 join(new IRubyObject[0]); 385 return finalResult; 386 } 387 388 public IRubyObject group() { 389 if (threadGroup == null) { 390 return getRuntime().getNil(); 391 } 392 393 return threadGroup; 394 } 395 396 void setThreadGroup(RubyThreadGroup rubyThreadGroup) { 397 threadGroup = rubyThreadGroup; 398 } 399 400 public RubyBoolean has_key(IRubyObject key) { 401 String name = keyName(key); 402 return getRuntime().newBoolean(threadLocalVariables.containsKey(name)); 403 } 404 405 public static IRubyObject critical_set(IRubyObject receiver, RubyBoolean value) { 406 receiver.getRuntime().getThreadService().setCritical(value.isTrue()); 407 408 return value; 409 } 410 411 public static IRubyObject critical(IRubyObject receiver) { 412 return receiver.getRuntime().newBoolean(receiver.getRuntime().getThreadService().getCriticalThread() != null); 413 } 414 415 public static IRubyObject stop(IRubyObject receiver) { 416 RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread(); 417 Object stopLock = rubyThread.stopLock; 418 419 synchronized (stopLock) { 420 try { 421 rubyThread.isStopped = true; 422 receiver.getRuntime().getThreadService().setCritical(false); 424 425 stopLock.wait(); 426 } catch (InterruptedException ie) { 427 } 429 rubyThread.isStopped = false; 430 } 431 432 return receiver.getRuntime().getNil(); 433 } 434 435 public static IRubyObject s_kill(IRubyObject receiver, RubyThread rubyThread, Block block) { 436 return rubyThread.kill(); 437 } 438 439 public static IRubyObject s_exit(IRubyObject receiver, Block block) { 440 RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread(); 441 442 rubyThread.killed = true; 443 receiver.getRuntime().getThreadService().setCritical(false); 445 446 throw new ThreadKill(); 447 } 448 449 public RubyBoolean isStopped() { 450 return getRuntime().newBoolean(isStopped); 452 } 453 454 public RubyThread wakeup() { 455 synchronized (stopLock) { 456 stopLock.notifyAll(); 457 } 458 459 return this; 460 } 461 462 public RubyFixnum priority() { 463 return getRuntime().newFixnum(threadImpl.getPriority()); 464 } 465 466 public IRubyObject priority_set(IRubyObject priority) { 467 int iPriority = RubyNumeric.fix2int(priority); 469 470 if (iPriority < Thread.MIN_PRIORITY) { 471 iPriority = Thread.MIN_PRIORITY; 472 } else if (iPriority > Thread.MAX_PRIORITY) { 473 iPriority = Thread.MAX_PRIORITY; 474 } 475 476 threadImpl.setPriority(iPriority); 477 return priority; 478 } 479 480 public IRubyObject raise(IRubyObject exc) { 481 receivedException = exc; 482 483 485 487 return this; 488 } 489 490 public IRubyObject run() { 491 if (isStopped) { 493 synchronized (stopLock) { 494 isStopped = false; 495 stopLock.notifyAll(); 496 } 497 } 498 499 503 return this; 504 } 505 506 public void sleep(long millis) throws InterruptedException { 507 try { 508 synchronized (stopLock) { 509 isStopped = true; 510 stopLock.wait(millis); 511 } 512 } finally { 513 isStopped = false; 514 } 515 } 516 517 public IRubyObject status() { 518 if (threadImpl.isAlive()) { 519 if (isStopped) { 520 return getRuntime().newString("sleep"); 521 } else if (killed) { 522 return getRuntime().newString("aborting"); 523 } 524 525 return getRuntime().newString("run"); 526 } else if (exitingException != null) { 527 return getRuntime().getNil(); 528 } else { 529 return getRuntime().newBoolean(false); 530 } 531 } 532 533 public IRubyObject kill() { 534 synchronized (this) { 536 if (killed) return this; 537 538 killed = true; 539 540 threadImpl.interrupt(); try { 542 if (!threadImpl.isInterrupted()) { 543 threadImpl.join(); 546 } 547 } catch (InterruptedException ie) { 548 throw new ThreadKill(); 549 } 550 } 551 552 return this; 553 } 554 555 public IRubyObject exit(Block block) { 556 return kill(); 557 } 558 559 public void dieIfKilled() { 560 if (killed) throw new ThreadKill(); 561 } 562 563 private boolean isCurrent() { 564 return threadImpl.isCurrent(); 565 } 566 567 private void ensureStarted() { 568 573 574 if (!hasStarted) { 576 synchronized (hasStartedLock) { 577 while (!hasStarted) { 578 try { 579 hasStartedLock.wait(); 580 } catch (InterruptedException iExcptn) { 581 } 583 } 584 } 585 } 586 } 587 588 public void exceptionRaised(RaiseException exception) { 589 assert isCurrent(); 590 591 Ruby runtime = exception.getException().getRuntime(); 592 if (abortOnException(runtime)) { 593 RubyException re = RubyException.newException(getRuntime(), getRuntime().getClass("SystemExit"), exception.getMessage()); 597 re.setInstanceVariable("status", getRuntime().newFixnum(1)); 598 threadService.getMainThread().raise(re); 599 } else { 600 exitingException = exception; 601 } 602 } 603 604 private boolean abortOnException(Ruby runtime) { 605 return (runtime.isGlobalAbortOnExceptionEnabled() || abortOnException); 606 } 607 608 public static RubyThread mainThread(IRubyObject receiver) { 609 return receiver.getRuntime().getThreadService().getMainThread(); 610 } 611 } 612 | Popular Tags |