1 15 16 17 package EDU.oswego.cs.dl.util.concurrent; 18 19 23 24 25 public abstract class QueuedSemaphore extends Semaphore { 26 27 protected final WaitQueue wq_; 28 29 QueuedSemaphore(WaitQueue q, long initialPermits) { 30 super(initialPermits); 31 wq_ = q; 32 } 33 34 public void acquire() throws InterruptedException { 35 if (Thread.interrupted()) throw new InterruptedException (); 36 if (precheck()) return; 37 WaitQueue.WaitNode w = new WaitQueue.WaitNode(); 38 w.doWait(this); 39 } 40 41 public boolean attempt(long msecs) throws InterruptedException { 42 if (Thread.interrupted()) throw new InterruptedException (); 43 if (precheck()) return true; 44 if (msecs <= 0) return false; 45 46 WaitQueue.WaitNode w = new WaitQueue.WaitNode(); 47 return w.doTimedWait(this, msecs); 48 } 49 50 protected synchronized boolean precheck() { 51 boolean pass = (permits_ > 0); 52 if (pass) --permits_; 53 return pass; 54 } 55 56 protected synchronized boolean recheck(WaitQueue.WaitNode w) { 57 boolean pass = (permits_ > 0); 58 if (pass) --permits_; 59 else wq_.insert(w); 60 return pass; 61 } 62 63 64 protected synchronized WaitQueue.WaitNode getSignallee() { 65 WaitQueue.WaitNode w = wq_.extract(); 66 if (w == null) ++permits_; return w; 68 } 69 70 public void release() { 71 for (;;) { 72 WaitQueue.WaitNode w = getSignallee(); 73 if (w == null) return; if (w.signal()) return; } 76 } 77 78 79 public void release(long n) { 80 if (n < 0) throw new IllegalArgumentException ("Negative argument"); 81 82 for (long i = 0; i < n; ++i) release(); 83 } 84 85 89 90 protected static abstract class WaitQueue { 91 92 protected abstract void insert(WaitNode w); protected abstract WaitNode extract(); 95 protected static class WaitNode { 96 boolean waiting = true; 97 WaitNode next = null; 98 99 protected synchronized boolean signal() { 100 boolean signalled = waiting; 101 if (signalled) { 102 waiting = false; 103 notify(); 104 } 105 return signalled; 106 } 107 108 protected synchronized boolean doTimedWait(QueuedSemaphore sem, 109 long msecs) 110 throws InterruptedException { 111 if (sem.recheck(this) || !waiting) 112 return true; 113 else if (msecs <= 0) { 114 waiting = false; 115 return false; 116 } 117 else { 118 long waitTime = msecs; 119 long start = System.currentTimeMillis(); 120 121 try { 122 for (;;) { 123 wait(waitTime); 124 if (!waiting) return true; 126 else { 127 waitTime = msecs - (System.currentTimeMillis() - start); 128 if (waitTime <= 0) { waiting = false; 130 return false; 131 } 132 } 133 } 134 } 135 catch(InterruptedException ex) { 136 if (waiting) { waiting = false; throw ex; 139 } 140 else { Thread.currentThread().interrupt(); 142 return true; 143 } 144 } 145 } 146 } 147 148 protected synchronized void doWait(QueuedSemaphore sem) 149 throws InterruptedException { 150 if (!sem.recheck(this)) { 151 try { 152 while (waiting) wait(); 153 } 154 catch(InterruptedException ex) { 155 if (waiting) { waiting = false; throw ex; 158 } 159 else { Thread.currentThread().interrupt(); 161 return; 162 } 163 } 164 } 165 } 166 } 167 168 } 169 170 171 } 172 | Popular Tags |