1 15 16 package EDU.oswego.cs.dl.util.concurrent; 17 import java.lang.reflect.*; 18 19 24 25 public abstract class SemaphoreControlledChannel implements BoundedChannel { 26 protected final Semaphore putGuard_; 27 protected final Semaphore takeGuard_; 28 protected int capacity_; 29 30 35 36 public SemaphoreControlledChannel(int capacity) 37 throws IllegalArgumentException { 38 if (capacity <= 0) throw new IllegalArgumentException (); 39 capacity_ = capacity; 40 putGuard_ = new Semaphore(capacity); 41 takeGuard_ = new Semaphore(0); 42 } 43 44 45 58 public SemaphoreControlledChannel(int capacity, Class semaphoreClass) 59 throws IllegalArgumentException , 60 NoSuchMethodException , 61 SecurityException , 62 InstantiationException , 63 IllegalAccessException , 64 InvocationTargetException { 65 if (capacity <= 0) throw new IllegalArgumentException (); 66 capacity_ = capacity; 67 Class [] longarg = { Long.TYPE }; 68 Constructor ctor = semaphoreClass.getDeclaredConstructor(longarg); 69 Long [] cap = { new Long (capacity) }; 70 putGuard_ = (Semaphore)(ctor.newInstance(cap)); 71 Long [] zero = { new Long (0) }; 72 takeGuard_ = (Semaphore)(ctor.newInstance(zero)); 73 } 74 75 76 77 public int capacity() { return capacity_; } 78 79 84 85 public int size() { return (int)(takeGuard_.permits()); } 86 87 90 protected abstract void insert(Object x); 91 92 95 protected abstract Object extract(); 96 97 public void put(Object x) throws InterruptedException { 98 if (x == null) throw new IllegalArgumentException (); 99 if (Thread.interrupted()) throw new InterruptedException (); 100 putGuard_.acquire(); 101 try { 102 insert(x); 103 takeGuard_.release(); 104 } 105 catch (ClassCastException ex) { 106 putGuard_.release(); 107 throw ex; 108 } 109 } 110 111 public boolean offer(Object x, long msecs) throws InterruptedException { 112 if (x == null) throw new IllegalArgumentException (); 113 if (Thread.interrupted()) throw new InterruptedException (); 114 if (!putGuard_.attempt(msecs)) 115 return false; 116 else { 117 try { 118 insert(x); 119 takeGuard_.release(); 120 return true; 121 } 122 catch (ClassCastException ex) { 123 putGuard_.release(); 124 throw ex; 125 } 126 } 127 } 128 129 public Object take() throws InterruptedException { 130 if (Thread.interrupted()) throw new InterruptedException (); 131 takeGuard_.acquire(); 132 try { 133 Object x = extract(); 134 putGuard_.release(); 135 return x; 136 } 137 catch (ClassCastException ex) { 138 takeGuard_.release(); 139 throw ex; 140 } 141 } 142 143 public Object poll(long msecs) throws InterruptedException { 144 if (Thread.interrupted()) throw new InterruptedException (); 145 if (!takeGuard_.attempt(msecs)) 146 return null; 147 else { 148 try { 149 Object x = extract(); 150 putGuard_.release(); 151 return x; 152 } 153 catch (ClassCastException ex) { 154 takeGuard_.release(); 155 throw ex; 156 } 157 } 158 } 159 160 } 161 | Popular Tags |