1 28 29 30 31 package org.jruby.libraries; 32 33 import java.io.IOException ; 34 import java.util.LinkedList ; 35 36 import org.jruby.Ruby; 37 import org.jruby.RubyObject; 38 import org.jruby.RubyClass; 39 import org.jruby.RubyBoolean; 40 import org.jruby.RubyThread; 41 import org.jruby.RubyInteger; 42 import org.jruby.RubyNumeric; 43 import org.jruby.exceptions.RaiseException; 44 import org.jruby.runtime.Block; 45 import org.jruby.runtime.CallbackFactory; 46 import org.jruby.runtime.ObjectAllocator; 47 import org.jruby.runtime.load.Library; 48 import org.jruby.runtime.builtin.IRubyObject; 49 50 53 public class ThreadLibrary implements Library { 54 public void load(final Ruby runtime) throws IOException { 55 Mutex.setup(runtime); 56 ConditionVariable.setup(runtime); 57 Queue.setup(runtime); 58 SizedQueue.setup(runtime); 59 } 60 61 public static class Mutex extends RubyObject { 62 private RubyThread owner = null; 63 64 public static Mutex newInstance(IRubyObject recv, IRubyObject[] args, Block block) { 65 Mutex result = new Mutex(recv.getRuntime(), (RubyClass)recv); 66 result.callInit(args, block); 67 return result; 68 } 69 70 public Mutex(Ruby runtime, RubyClass type) { 71 super(runtime, type); 72 } 73 74 public static void setup(Ruby runtime) { 75 RubyClass cMutex = runtime.defineClass("Mutex", runtime.getClass("Object"), new ObjectAllocator() { 76 public IRubyObject allocate(Ruby runtime, RubyClass klass) { 77 return new Mutex(runtime, klass); 78 } 79 }); 80 CallbackFactory cb = runtime.callbackFactory(Mutex.class); 81 cMutex.getMetaClass().defineMethod("new", cb.getOptSingletonMethod("newInstance")); 82 cMutex.defineFastMethod("locked?", cb.getFastMethod("locked_p")); 83 cMutex.defineFastMethod("try_lock", cb.getFastMethod("try_lock")); 84 cMutex.defineFastMethod("lock", cb.getFastMethod("lock")); 85 cMutex.defineFastMethod("unlock", cb.getFastMethod("unlock")); 86 cMutex.defineMethod("synchronize", cb.getMethod("synchronize")); 87 } 88 89 public synchronized RubyBoolean locked_p() { 90 return ( owner != null ? getRuntime().getTrue() : getRuntime().getFalse() ); 91 } 92 93 public RubyBoolean try_lock() throws InterruptedException { 94 if (Thread.interrupted()) { 95 throw new InterruptedException (); 96 } 97 synchronized (this) { 98 if ( owner != null ) { 99 return getRuntime().getFalse(); 100 } 101 lock(); 102 } 103 return getRuntime().getTrue(); 104 } 105 106 public IRubyObject lock() throws InterruptedException { 107 if (Thread.interrupted()) { 108 throw new InterruptedException (); 109 } 110 synchronized (this) { 111 try { 112 while ( owner != null ) { 113 wait(); 114 } 115 owner = getRuntime().getCurrentContext().getThread(); 116 } catch (InterruptedException ex) { 117 if ( owner == null ) { 118 notify(); 119 } 120 throw ex; 121 } 122 } 123 return this; 124 } 125 126 public synchronized RubyBoolean unlock() { 127 if ( owner != null ) { 128 owner = null; 129 notify(); 130 return getRuntime().getTrue(); 131 } else { 132 return getRuntime().getFalse(); 133 } 134 } 135 136 public IRubyObject synchronize(Block block) throws InterruptedException { 137 try { 138 lock(); 139 return getRuntime().getCurrentContext().yield(null, block); 140 } finally { 141 unlock(); 142 } 143 } 144 } 145 146 public static class ConditionVariable extends RubyObject { 147 public static ConditionVariable newInstance(IRubyObject recv, IRubyObject[] args, Block block) { 148 ConditionVariable result = new ConditionVariable(recv.getRuntime(), (RubyClass)recv); 149 result.callInit(args, block); 150 return result; 151 } 152 153 public ConditionVariable(Ruby runtime, RubyClass type) { 154 super(runtime, type); 155 } 156 157 public static void setup(Ruby runtime) { 158 RubyClass cConditionVariable = runtime.defineClass("ConditionVariable", runtime.getClass("Object"), new ObjectAllocator() { 159 public IRubyObject allocate(Ruby runtime, RubyClass klass) { 160 return new ConditionVariable(runtime, klass); 161 } 162 }); 163 CallbackFactory cb = runtime.callbackFactory(ConditionVariable.class); 164 cConditionVariable.getMetaClass().defineMethod("new", cb.getOptSingletonMethod("newInstance")); 165 cConditionVariable.defineFastMethod("wait", cb.getFastMethod("wait_ruby", Mutex.class)); 166 cConditionVariable.defineFastMethod("broadcast", cb.getFastMethod("broadcast")); 167 cConditionVariable.defineFastMethod("signal", cb.getFastMethod("signal")); 168 } 169 170 public IRubyObject wait_ruby(Mutex mutex) throws InterruptedException { 171 if (Thread.interrupted()) { 172 throw new InterruptedException (); 173 } 174 try { 175 synchronized (this) { 176 mutex.unlock(); 177 try { 178 wait(); 179 } catch (InterruptedException e) { 180 notify(); 181 throw e; 182 } 183 } 184 } finally { 185 mutex.lock(); 186 } 187 return getRuntime().getNil(); 188 } 189 190 public synchronized IRubyObject broadcast() { 191 notifyAll(); 192 return getRuntime().getNil(); 193 } 194 195 public synchronized IRubyObject signal() { 196 notify(); 197 return getRuntime().getNil(); 198 } 199 } 200 201 public static class Queue extends RubyObject { 202 private LinkedList entries; 203 204 public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) { 205 Queue result = new Queue(recv.getRuntime(), (RubyClass)recv); 206 result.callInit(args, block); 207 return result; 208 } 209 210 public Queue(Ruby runtime, RubyClass type) { 211 super(runtime, type); 212 entries = new LinkedList (); 213 } 214 215 public static void setup(Ruby runtime) { 216 RubyClass cQueue = runtime.defineClass("Queue", runtime.getClass("Object"), new ObjectAllocator() { 217 public IRubyObject allocate(Ruby runtime, RubyClass klass) { 218 return new Queue(runtime, klass); 219 } 220 }); 221 CallbackFactory cb = runtime.callbackFactory(Queue.class); 222 cQueue.getMetaClass().defineMethod("new", cb.getOptSingletonMethod("newInstance")); 223 224 cQueue.defineFastMethod("clear", cb.getFastMethod("clear")); 225 cQueue.defineFastMethod("empty?", cb.getFastMethod("empty_p")); 226 cQueue.defineFastMethod("length", cb.getFastMethod("length")); 227 cQueue.defineFastMethod("num_waiting", cb.getFastMethod("num_waiting")); 228 cQueue.defineFastMethod("pop", cb.getFastOptMethod("pop")); 229 cQueue.defineFastMethod("push", cb.getFastMethod("push", IRubyObject.class)); 230 231 cQueue.defineAlias("<<", "push"); 232 cQueue.defineAlias("deq", "pop"); 233 cQueue.defineAlias("shift", "pop"); 234 cQueue.defineAlias("size", "length"); 235 } 236 237 public synchronized IRubyObject clear() { 238 entries.clear(); 239 return getRuntime().getNil(); 240 } 241 242 public synchronized RubyBoolean empty_p() { 243 return ( entries.size() == 0 ? getRuntime().getTrue() : getRuntime().getFalse() ); 244 } 245 246 public synchronized RubyNumeric length() { 247 return RubyNumeric.int2fix(getRuntime(), entries.size()); 248 } 249 250 public int num_waiting() { return 0; } 251 252 public synchronized IRubyObject pop(IRubyObject[] args) { 253 boolean should_block = true; 254 if ( checkArgumentCount(args, 0, 1) == 1 ) { 255 should_block = args[0].isTrue(); 256 } 257 if ( !should_block && entries.size() == 0 ) { 258 throw new RaiseException(getRuntime(), getRuntime().getClass("ThreadError"), "queue empty", false); 259 } 260 while ( entries.size() == 0 ) { 261 try { 262 wait(); 263 } catch (InterruptedException e) { 264 } 265 } 266 return (IRubyObject)entries.removeFirst(); 267 } 268 269 public synchronized IRubyObject push(IRubyObject value) { 270 entries.addLast(value); 271 notify(); 272 return getRuntime().getNil(); 273 } 274 } 275 276 277 public static class SizedQueue extends Queue { 278 private int capacity; 279 280 public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) { 281 SizedQueue result = new SizedQueue(recv.getRuntime(), (RubyClass)recv); 282 result.callInit(args, block); 283 return result; 284 } 285 286 public SizedQueue(Ruby runtime, RubyClass type) { 287 super(runtime, type); 288 capacity = 1; 289 } 290 291 public static void setup(Ruby runtime) { 292 RubyClass cSizedQueue = runtime.defineClass("SizedQueue", runtime.getClass("Queue"), new ObjectAllocator() { 293 public IRubyObject allocate(Ruby runtime, RubyClass klass) { 294 return new SizedQueue(runtime, klass); 295 } 296 }); 297 CallbackFactory cb = runtime.callbackFactory(SizedQueue.class); 298 cSizedQueue.getMetaClass().defineMethod("new", cb.getOptSingletonMethod("newInstance")); 299 300 cSizedQueue.defineFastMethod("initialize", cb.getFastMethod("max_set", RubyInteger.class)); 301 302 cSizedQueue.defineFastMethod("clear", cb.getFastMethod("clear")); 303 cSizedQueue.defineFastMethod("max", cb.getFastMethod("max")); 304 cSizedQueue.defineFastMethod("max=", cb.getFastMethod("max_set", RubyInteger.class)); 305 cSizedQueue.defineFastMethod("pop", cb.getFastOptMethod("pop")); 306 cSizedQueue.defineFastMethod("push", cb.getFastMethod("push", IRubyObject.class)); 307 308 cSizedQueue.defineAlias("<<", "push"); 309 cSizedQueue.defineAlias("deq", "pop"); 310 cSizedQueue.defineAlias("shift", "pop"); 311 } 312 313 public synchronized IRubyObject clear() { 314 super.clear(); 315 notifyAll(); 316 return getRuntime().getNil(); 317 } 318 319 public synchronized RubyNumeric max() { 320 return RubyNumeric.int2fix(getRuntime(), capacity); 321 } 322 323 public synchronized IRubyObject max_set(RubyInteger arg) { 324 int new_capacity = RubyNumeric.fix2int(arg); 325 if ( new_capacity <= 0 ) { 326 getRuntime().newArgumentError("queue size must be positive"); 327 } 328 int difference; 329 if ( new_capacity > capacity ) { 330 difference = new_capacity - capacity; 331 } else { 332 difference = 0; 333 } 334 capacity = new_capacity; 335 if ( difference > 0 ) { 336 notifyAll(); 337 } 338 return getRuntime().getNil(); 339 } 340 341 public synchronized IRubyObject pop(IRubyObject args[]) { 342 IRubyObject result = super.pop(args); 343 notifyAll(); 344 return result; 345 } 346 347 public synchronized IRubyObject push(IRubyObject value) { 348 while ( RubyNumeric.fix2int(length()) >= capacity ) { 349 try { 350 wait(); 351 } catch (InterruptedException e) { 352 } 353 } 354 super.push(value); 355 notifyAll(); 356 return getRuntime().getNil(); 357 } 358 } 359 } 360 | Popular Tags |