KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jruby > RubyThread


1 /***** BEGIN LICENSE BLOCK *****
2  * Version: CPL 1.0/GPL 2.0/LGPL 2.1
3  *
4  * The contents of this file are subject to the Common Public
5  * License Version 1.0 (the "License"); you may not use this file
6  * except in compliance with the License. You may obtain a copy of
7  * the License at http://www.eclipse.org/legal/cpl-v10.html
8  *
9  * Software distributed under the License is distributed on an "AS
10  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
11  * implied. See the License for the specific language governing
12  * rights and limitations under the License.
13  *
14  * Copyright (C) 2002 Jason Voegele <jason@jvoegele.com>
15  * Copyright (C) 2002-2004 Anders Bengtsson <ndrsbngtssn@yahoo.se>
16  * Copyright (C) 2002-2004 Jan Arne Petersen <jpetersen@uni-bonn.de>
17  * Copyright (C) 2004 Thomas E Enebo <enebo@acm.org>
18  * Copyright (C) 2004-2005 Charles O Nutter <headius@headius.com>
19  * Copyright (C) 2004 Stefan Matthias Aust <sma@3plus4.de>
20  *
21  * Alternatively, the contents of this file may be used under the terms of
22  * either of the GNU General Public License Version 2 or later (the "GPL"),
23  * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
24  * in which case the provisions of the GPL or the LGPL are applicable instead
25  * of those above. If you wish to allow use of your version of this file only
26  * under the terms of either the GPL or the LGPL, and not to allow others to
27  * use your version of this file under the terms of the CPL, indicate your
28  * decision by deleting the provisions above and replace them with the notice
29  * and other provisions required by the GPL or the LGPL. If you do not delete
30  * the provisions above, a recipient may use your version of this file under
31  * the terms of any one of the CPL, the GPL or the LGPL.
32  ***** END LICENSE BLOCK *****/

33 package org.jruby;
34
35 import java.util.HashMap JavaDoc;
36 import java.util.Map JavaDoc;
37
38 import org.jruby.exceptions.RaiseException;
39 import org.jruby.exceptions.ThreadKill;
40 import org.jruby.internal.runtime.NativeThread;
41 import org.jruby.internal.runtime.ThreadService;
42 import org.jruby.runtime.Block;
43 import org.jruby.runtime.CallbackFactory;
44 import org.jruby.runtime.ObjectAllocator;
45 import org.jruby.runtime.builtin.IRubyObject;
46
47 /**
48  * Implementation of Ruby's <code>Thread</code> class. Each Ruby thread is
49  * mapped to an underlying Java Virtual Machine thread.
50  * <p>
51  * Thread encapsulates the behavior of a thread of execution, including the main
52  * thread of the Ruby script. In the descriptions that follow, the parameter
53  * <code>aSymbol</code> refers to a symbol, which is either a quoted string or a
54  * <code>Symbol</code> (such as <code>:name</code>).
55  *
56  * Note: For CVS history, see ThreadClass.java.
57  *
58  * @author Jason Voegele (jason@jvoegele.com)
59  */

60 public class RubyThread extends RubyObject {
61     private NativeThread threadImpl;
62     private Map JavaDoc threadLocalVariables = new HashMap JavaDoc();
63     private boolean abortOnException;
64     private IRubyObject finalResult;
65     private RaiseException exitingException;
66     private IRubyObject receivedException;
67     private RubyThreadGroup threadGroup;
68
69     private ThreadService threadService;
70     private Object JavaDoc hasStartedLock = new Object JavaDoc();
71     private boolean hasStarted = false;
72     private volatile boolean isStopped = false;
73     public Object JavaDoc stopLock = new Object JavaDoc();
74     
75     private volatile boolean killed = false;
76     public Object JavaDoc killLock = new Object JavaDoc();
77     private RubyThread joinedByCriticalThread;
78     
79     public static RubyClass createThreadClass(Ruby runtime) {
80         // FIXME: In order for Thread to play well with the standard 'new' behavior,
81
// it must provide an allocator that can create empty object instances which
82
// initialize then fills with appropriate data.
83
RubyClass threadClass = runtime.defineClass("Thread", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
84         CallbackFactory callbackFactory = runtime.callbackFactory(RubyThread.class);
85
86         threadClass.defineFastMethod("[]", callbackFactory.getFastMethod("aref", RubyKernel.IRUBY_OBJECT));
87         threadClass.defineFastMethod("[]=", callbackFactory.getFastMethod("aset", RubyKernel.IRUBY_OBJECT, RubyKernel.IRUBY_OBJECT));
88         threadClass.defineFastMethod("abort_on_exception", callbackFactory.getFastMethod("abort_on_exception"));
89         threadClass.defineFastMethod("abort_on_exception=", callbackFactory.getFastMethod("abort_on_exception_set", RubyKernel.IRUBY_OBJECT));
90         threadClass.defineFastMethod("alive?", callbackFactory.getFastMethod("is_alive"));
91         threadClass.defineFastMethod("group", callbackFactory.getFastMethod("group"));
92         threadClass.defineFastMethod("join", callbackFactory.getFastOptMethod("join"));
93         threadClass.defineFastMethod("value", callbackFactory.getFastMethod("value"));
94         threadClass.defineFastMethod("key?", callbackFactory.getFastMethod("has_key", RubyKernel.IRUBY_OBJECT));
95         threadClass.defineFastMethod("priority", callbackFactory.getFastMethod("priority"));
96         threadClass.defineFastMethod("priority=", callbackFactory.getFastMethod("priority_set", RubyKernel.IRUBY_OBJECT));
97         threadClass.defineFastMethod("raise", callbackFactory.getFastMethod("raise", RubyKernel.IRUBY_OBJECT));
98         threadClass.defineFastMethod("run", callbackFactory.getFastMethod("run"));
99         threadClass.defineFastMethod("status", callbackFactory.getFastMethod("status"));
100         threadClass.defineFastMethod("stop?", callbackFactory.getFastMethod("isStopped"));
101         threadClass.defineFastMethod("wakeup", callbackFactory.getFastMethod("wakeup"));
102         // threadClass.defineMethod("value",
103
// callbackFactory.getMethod("value"));
104
threadClass.defineFastMethod("kill", callbackFactory.getFastMethod("kill"));
105         threadClass.defineMethod("exit", callbackFactory.getMethod("exit"));
106         
107         threadClass.getMetaClass().defineFastMethod("current", callbackFactory.getFastSingletonMethod("current"));
108         threadClass.getMetaClass().defineMethod("fork", callbackFactory.getOptSingletonMethod("newInstance"));
109         threadClass.getMetaClass().defineMethod("new", callbackFactory.getOptSingletonMethod("newInstance"));
110         threadClass.getMetaClass().defineFastMethod("list", callbackFactory.getFastSingletonMethod("list"));
111         threadClass.getMetaClass().defineFastMethod("pass", callbackFactory.getFastSingletonMethod("pass"));
112         threadClass.getMetaClass().defineMethod("start", callbackFactory.getOptSingletonMethod("start"));
113         threadClass.getMetaClass().defineFastMethod("critical=", callbackFactory.getFastSingletonMethod("critical_set", RubyBoolean.class));
114         threadClass.getMetaClass().defineFastMethod("critical", callbackFactory.getFastSingletonMethod("critical"));
115         threadClass.getMetaClass().defineFastMethod("stop", callbackFactory.getFastSingletonMethod("stop"));
116         threadClass.getMetaClass().defineMethod("kill", callbackFactory.getSingletonMethod("s_kill", RubyThread.class));
117         threadClass.getMetaClass().defineMethod("exit", callbackFactory.getSingletonMethod("s_exit"));
118         threadClass.getMetaClass().defineFastMethod("abort_on_exception", callbackFactory.getFastSingletonMethod("abort_on_exception"));
119         threadClass.getMetaClass().defineFastMethod("abort_on_exception=", callbackFactory.getFastSingletonMethod("abort_on_exception_set", RubyKernel.IRUBY_OBJECT));
120
121         RubyThread rubyThread = new RubyThread(runtime, threadClass);
122         // set hasStarted to true, otherwise Thread.main.status freezes
123
rubyThread.hasStarted = true;
124         // TODO: need to isolate the "current" thread from class creation
125
rubyThread.threadImpl = new NativeThread(rubyThread, Thread.currentThread());
126         runtime.getThreadService().setMainThread(rubyThread);
127         
128         threadClass.getMetaClass().defineFastMethod("main", callbackFactory.getFastSingletonMethod("main"));
129         
130         return threadClass;
131     }
132
133     /**
134      * <code>Thread.new</code>
135      * <p>
136      * Thread.new( <i>[ arg ]*</i> ) {| args | block } -> aThread
137      * <p>
138      * Creates a new thread to execute the instructions given in block, and
139      * begins running it. Any arguments passed to Thread.new are passed into the
140      * block.
141      * <pre>
142      * x = Thread.new { sleep .1; print "x"; print "y"; print "z" }
143      * a = Thread.new { print "a"; print "b"; sleep .2; print "c" }
144      * x.join # Let the threads finish before
145      * a.join # main thread exits...
146      * </pre>
147      * <i>produces:</i> abxyzc
148      */

149     public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) {
150         return startThread(recv, args, true, block);
151     }
152
153     /**
154      * Basically the same as Thread.new . However, if class Thread is
155      * subclassed, then calling start in that subclass will not invoke the
156      * subclass's initialize method.
157      */

158     public static RubyThread start(IRubyObject recv, IRubyObject[] args, Block block) {
159         return startThread(recv, args, false, block);
160     }
161     
162     public static RubyThread adopt(IRubyObject recv, Thread JavaDoc t) {
163         return adoptThread(recv, t, Block.NULL_BLOCK);
164     }
165
166     private static RubyThread adoptThread(final IRubyObject recv, Thread JavaDoc t, Block block) {
167         final Ruby runtime = recv.getRuntime();
168         final RubyThread rubyThread = new RubyThread(runtime, (RubyClass) recv, false);
169         
170         rubyThread.threadImpl = new NativeThread(rubyThread, t);
171         runtime.getThreadService().registerNewThread(rubyThread);
172         
173         runtime.getCurrentContext().preAdoptThread();
174         
175         rubyThread.callInit(new IRubyObject[0], block);
176         
177         rubyThread.notifyStarted();
178         
179         return rubyThread;
180     }
181
182     private static RubyThread startThread(final IRubyObject recv, final IRubyObject[] args, boolean callInit, Block block) {
183         if (!block.isGiven()) throw recv.getRuntime().newThreadError("must be called with a block");
184
185         RubyThread rubyThread = new RubyThread(recv.getRuntime(), (RubyClass) recv);
186         
187         if (callInit) rubyThread.callInit(args, block);
188
189         rubyThread.threadImpl = new NativeThread(rubyThread, args, block);
190         rubyThread.threadImpl.start();
191         
192         // make sure the thread has started before continuing, so it will appear "runnable" to the rest of Ruby
193
rubyThread.ensureStarted();
194         
195         return rubyThread;
196     }
197     
198     public void cleanTerminate(IRubyObject result) {
199         try {
200             finalResult = result;
201             isStopped = true;
202             waitIfCriticalized();
203         } catch (InterruptedException JavaDoc ie) {
204             // ignore
205
}
206     }
207     
208     public void waitIfCriticalized() throws InterruptedException JavaDoc {
209         RubyThread criticalThread = getRuntime().getThreadService().getCriticalThread();
210         if (criticalThread != null && criticalThread != this && criticalThread != joinedByCriticalThread) {
211             synchronized (criticalThread) {
212                 criticalThread.wait();
213             }
214         }
215     }
216     
217     public void notifyStarted() {
218         assert isCurrent();
219         synchronized (hasStartedLock) {
220             hasStarted = true;
221             hasStartedLock.notifyAll();
222         }
223     }
224
225     public void pollThreadEvents() {
226         // Asserts.isTrue(isCurrent());
227
pollReceivedExceptions();
228         
229         // TODO: should exceptions trump thread control, or vice versa?
230
criticalizeOrDieIfKilled();
231     }
232
233     private void pollReceivedExceptions() {
234         if (receivedException != null) {
235             // clear this so we don't keep re-throwing
236
IRubyObject raiseException = receivedException;
237             receivedException = null;
238             RubyModule kernelModule = getRuntime().getModule("Kernel");
239             kernelModule.callMethod(getRuntime().getCurrentContext(), "raise", raiseException);
240         }
241     }
242
243     public void criticalizeOrDieIfKilled() {
244         try {
245             waitIfCriticalized();
246         } catch (InterruptedException JavaDoc ie) {
247             throw new ThreadKill();
248         }
249         dieIfKilled();
250     }
251
252     private RubyThread(Ruby runtime, RubyClass type) {
253         super(runtime, type);
254         this.threadService = runtime.getThreadService();
255         // set to default thread group
256
RubyThreadGroup defaultThreadGroup = (RubyThreadGroup)runtime.getClass("ThreadGroup").getConstant("Default");
257         defaultThreadGroup.add(this, Block.NULL_BLOCK);
258         finalResult = runtime.getNil();
259     }
260
261     private RubyThread(Ruby runtime, RubyClass type, boolean narf) {
262         super(runtime, type);
263         this.threadService = runtime.getThreadService();
264         
265         // set to default thread group
266
RubyThreadGroup defaultThreadGroup = (RubyThreadGroup)runtime.getClass("ThreadGroup").getConstant("Default");
267         defaultThreadGroup.add(this, Block.NULL_BLOCK);
268         finalResult = runtime.getNil();
269     }
270
271     /**
272      * Returns the status of the global ``abort on exception'' condition. The
273      * default is false. When set to true, will cause all threads to abort (the
274      * process will exit(0)) if an exception is raised in any thread. See also
275      * Thread.abort_on_exception= .
276      */

277     public static RubyBoolean abort_on_exception(IRubyObject recv) {
278         Ruby runtime = recv.getRuntime();
279         return runtime.isGlobalAbortOnExceptionEnabled() ? recv.getRuntime().getTrue() : recv.getRuntime().getFalse();
280     }
281
282     public static IRubyObject abort_on_exception_set(IRubyObject recv, IRubyObject value) {
283         recv.getRuntime().setGlobalAbortOnExceptionEnabled(value.isTrue());
284         return value;
285     }
286
287     public static RubyThread current(IRubyObject recv) {
288         return recv.getRuntime().getCurrentContext().getThread();
289     }
290
291     public static RubyThread main(IRubyObject recv) {
292         return recv.getRuntime().getThreadService().getMainThread();
293     }
294
295     public static IRubyObject pass(IRubyObject recv) {
296         Ruby runtime = recv.getRuntime();
297         ThreadService ts = runtime.getThreadService();
298         RubyThread criticalThread = ts.getCriticalThread();
299         RubyThread currentThread = ts.getCurrentContext().getThread();
300         
301         if (criticalThread == currentThread) {
302             // we're currently the critical thread; decriticalize for pass
303
ts.setCritical(false);
304         }
305         
306         Thread.yield();
307         
308         if (criticalThread != null) {
309             // recriticalize
310
ts.setCritical(true);
311         }
312         
313         return recv.getRuntime().getNil();
314     }
315
316     public static RubyArray list(IRubyObject recv) {
317         RubyThread[] activeThreads = recv.getRuntime().getThreadService().getActiveRubyThreads();
318         
319         return recv.getRuntime().newArrayNoCopy(activeThreads);
320     }
321
322     public IRubyObject aref(IRubyObject key) {
323         String JavaDoc name = keyName(key);
324         if (!threadLocalVariables.containsKey(name)) {
325             return getRuntime().getNil();
326         }
327         return (IRubyObject) threadLocalVariables.get(name);
328     }
329
330     public IRubyObject aset(IRubyObject key, IRubyObject value) {
331         String JavaDoc name = keyName(key);
332         threadLocalVariables.put(name, value);
333         return value;
334     }
335
336     private String JavaDoc keyName(IRubyObject key) {
337         String JavaDoc name;
338         if (key instanceof RubySymbol) {
339             name = key.asSymbol();
340         } else if (key instanceof RubyString) {
341             name = ((RubyString) key).toString();
342         } else {
343             throw getRuntime().newArgumentError(key.inspect() + " is not a symbol");
344         }
345         return name;
346     }
347
348     public RubyBoolean abort_on_exception() {
349         return abortOnException ? getRuntime().getTrue() : getRuntime().getFalse();
350     }
351
352     public IRubyObject abort_on_exception_set(IRubyObject val) {
353         abortOnException = val.isTrue();
354         return val;
355     }
356
357     public RubyBoolean is_alive() {
358         return threadImpl.isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
359     }
360
361     public RubyThread join(IRubyObject[] args) {
362         if (isCurrent()) {
363             throw getRuntime().newThreadError("thread tried to join itself");
364         }
365         ensureStarted();
366         try {
367             RubyThread criticalThread = getRuntime().getThreadService().getCriticalThread();
368             if (criticalThread != null) {
369                 // set the target thread's joinedBy, so it knows it can execute during a critical section
370
joinedByCriticalThread = criticalThread;
371                 threadImpl.interrupt(); // break target thread out of critical
372
}
373             threadImpl.join();
374         } catch (InterruptedException JavaDoc iExcptn) {
375             assert false : iExcptn;
376         }
377         if (exitingException != null) {
378             throw exitingException;
379         }
380         return this;
381     }
382
383     public IRubyObject value() {
384         join(new IRubyObject[0]);
385         return finalResult;
386     }
387
388     public IRubyObject group() {
389         if (threadGroup == null) {
390             return getRuntime().getNil();
391         }
392         
393         return threadGroup;
394     }
395     
396     void setThreadGroup(RubyThreadGroup rubyThreadGroup) {
397         threadGroup = rubyThreadGroup;
398     }
399
400     public RubyBoolean has_key(IRubyObject key) {
401         String JavaDoc name = keyName(key);
402         return getRuntime().newBoolean(threadLocalVariables.containsKey(name));
403     }
404     
405     public static IRubyObject critical_set(IRubyObject receiver, RubyBoolean value) {
406         receiver.getRuntime().getThreadService().setCritical(value.isTrue());
407         
408         return value;
409     }
410
411     public static IRubyObject critical(IRubyObject receiver) {
412         return receiver.getRuntime().newBoolean(receiver.getRuntime().getThreadService().getCriticalThread() != null);
413     }
414
415     public static IRubyObject stop(IRubyObject receiver) {
416         RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
417         Object JavaDoc stopLock = rubyThread.stopLock;
418         
419         synchronized (stopLock) {
420             try {
421                 rubyThread.isStopped = true;
422                 // attempt to decriticalize all if we're the critical thread
423
receiver.getRuntime().getThreadService().setCritical(false);
424
425                 stopLock.wait();
426             } catch (InterruptedException JavaDoc ie) {
427                 // ignore, continue;
428
}
429             rubyThread.isStopped = false;
430         }
431         
432         return receiver.getRuntime().getNil();
433     }
434     
435     public static IRubyObject s_kill(IRubyObject receiver, RubyThread rubyThread, Block block) {
436         return rubyThread.kill();
437     }
438
439     public static IRubyObject s_exit(IRubyObject receiver, Block block) {
440         RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
441         
442         rubyThread.killed = true;
443         // attempt to decriticalize all if we're the critical thread
444
receiver.getRuntime().getThreadService().setCritical(false);
445         
446         throw new ThreadKill();
447     }
448
449     public RubyBoolean isStopped() {
450         // not valid for "dead" state
451
return getRuntime().newBoolean(isStopped);
452     }
453     
454     public RubyThread wakeup() {
455         synchronized (stopLock) {
456             stopLock.notifyAll();
457         }
458         
459         return this;
460     }
461     
462     public RubyFixnum priority() {
463         return getRuntime().newFixnum(threadImpl.getPriority());
464     }
465
466     public IRubyObject priority_set(IRubyObject priority) {
467         // FIXME: This should probably do some translation from Ruby priority levels to Java priority levels (until we have green threads)
468
int iPriority = RubyNumeric.fix2int(priority);
469         
470         if (iPriority < Thread.MIN_PRIORITY) {
471             iPriority = Thread.MIN_PRIORITY;
472         } else if (iPriority > Thread.MAX_PRIORITY) {
473             iPriority = Thread.MAX_PRIORITY;
474         }
475         
476         threadImpl.setPriority(iPriority);
477         return priority;
478     }
479
480     public IRubyObject raise(IRubyObject exc) {
481         receivedException = exc;
482
483         // FIXME: correct raise call
484

485         // FIXME: call the IRaiseListener#exceptionRaised method
486

487         return this;
488     }
489     
490     public IRubyObject run() {
491         // if stopped, unstop
492
if (isStopped) {
493             synchronized (stopLock) {
494                 isStopped = false;
495                 stopLock.notifyAll();
496             }
497         }
498         
499         // Abort any sleep()s
500
// CON: Sleep now waits on the same stoplock, so it will have been woken up by the notify above
501
//threadImpl.interrupt();
502

503         return this;
504     }
505     
506     public void sleep(long millis) throws InterruptedException JavaDoc {
507         try {
508             synchronized (stopLock) {
509                 isStopped = true;
510                 stopLock.wait(millis);
511             }
512         } finally {
513             isStopped = false;
514         }
515     }
516
517     public IRubyObject status() {
518         if (threadImpl.isAlive()) {
519             if (isStopped) {
520                 return getRuntime().newString("sleep");
521             } else if (killed) {
522                 return getRuntime().newString("aborting");
523             }
524             
525             return getRuntime().newString("run");
526         } else if (exitingException != null) {
527             return getRuntime().getNil();
528         } else {
529             return getRuntime().newBoolean(false);
530         }
531     }
532
533     public IRubyObject kill() {
534         // need to reexamine this
535
synchronized (this) {
536             if (killed) return this;
537             
538             killed = true;
539             
540             threadImpl.interrupt(); // break out of wait states and blocking IO
541
try {
542                 if (!threadImpl.isInterrupted()) {
543                     // we did not interrupt the thread, so wait for it to complete
544
// TODO: test that this is correct...should killer wait for killee to die?
545
threadImpl.join();
546                 }
547             } catch (InterruptedException JavaDoc ie) {
548                 throw new ThreadKill();
549             }
550         }
551
552         return this;
553     }
554     
555     public IRubyObject exit(Block block) {
556         return kill();
557     }
558     
559     public void dieIfKilled() {
560         if (killed) throw new ThreadKill();
561     }
562
563     private boolean isCurrent() {
564         return threadImpl.isCurrent();
565     }
566
567     private void ensureStarted() {
568         // The JVM's join() method may return immediately
569
// and isAlive() give the wrong result if the thread
570
// hasn't started yet. We give it a chance to start
571
// before we try to do anything.
572

573
574         // Yes, I know double-check locking is broken.
575
if (!hasStarted) {
576             synchronized (hasStartedLock) {
577                 while (!hasStarted) {
578                     try {
579                         hasStartedLock.wait();
580                     } catch (InterruptedException JavaDoc iExcptn) {
581                         // assert false : iExcptn;
582
}
583                 }
584             }
585         }
586     }
587
588     public void exceptionRaised(RaiseException exception) {
589         assert isCurrent();
590
591         Ruby runtime = exception.getException().getRuntime();
592         if (abortOnException(runtime)) {
593             // FIXME: printError explodes on some nullpointer
594
//getRuntime().getRuntime().printError(exception.getException());
595
// TODO: Doesn't SystemExit have its own method to make this less wordy..
596
RubyException re = RubyException.newException(getRuntime(), getRuntime().getClass("SystemExit"), exception.getMessage());
597             re.setInstanceVariable("status", getRuntime().newFixnum(1));
598             threadService.getMainThread().raise(re);
599         } else {
600             exitingException = exception;
601         }
602     }
603
604     private boolean abortOnException(Ruby runtime) {
605         return (runtime.isGlobalAbortOnExceptionEnabled() || abortOnException);
606     }
607
608     public static RubyThread mainThread(IRubyObject receiver) {
609         return receiver.getRuntime().getThreadService().getMainThread();
610     }
611 }
612
Popular Tags