1 15 16 package org.dbunit.util.concurrent; 17 import java.lang.reflect.Constructor ; 18 import java.lang.reflect.InvocationTargetException ; 19 20 25 26 public abstract class SemaphoreControlledChannel implements BoundedChannel { 27 protected final Semaphore putGuard_; 28 protected final Semaphore takeGuard_; 29 protected int capacity_; 30 31 36 37 public SemaphoreControlledChannel(int capacity) 38 throws IllegalArgumentException { 39 if (capacity <= 0) throw new IllegalArgumentException (); 40 capacity_ = capacity; 41 putGuard_ = new Semaphore(capacity); 42 takeGuard_ = new Semaphore(0); 43 } 44 45 46 59 public SemaphoreControlledChannel(int capacity, Class semaphoreClass) 60 throws IllegalArgumentException , 61 NoSuchMethodException , 62 SecurityException , 63 InstantiationException , 64 IllegalAccessException , 65 InvocationTargetException { 66 if (capacity <= 0) throw new IllegalArgumentException (); 67 capacity_ = capacity; 68 Class [] longarg = { Long.TYPE }; 69 Constructor ctor = semaphoreClass.getDeclaredConstructor(longarg); 70 Long [] cap = { new Long (capacity) }; 71 putGuard_ = (Semaphore)(ctor.newInstance(cap)); 72 Long [] zero = { new Long (0) }; 73 takeGuard_ = (Semaphore)(ctor.newInstance(zero)); 74 } 75 76 77 78 public int capacity() { return capacity_; } 79 80 85 86 public int size() { return (int)(takeGuard_.permits()); } 87 88 91 protected abstract void insert(Object x); 92 93 96 protected abstract Object extract(); 97 98 public void put(Object x) throws InterruptedException { 99 if (x == null) throw new IllegalArgumentException (); 100 if (Thread.interrupted()) throw new InterruptedException (); 101 putGuard_.acquire(); 102 try { 103 insert(x); 104 takeGuard_.release(); 105 } 106 catch (ClassCastException ex) { 107 putGuard_.release(); 108 throw ex; 109 } 110 } 111 112 public boolean offer(Object x, long msecs) throws InterruptedException { 113 if (x == null) throw new IllegalArgumentException (); 114 if (Thread.interrupted()) throw new InterruptedException (); 115 if (!putGuard_.attempt(msecs)) 116 return false; 117 else { 118 try { 119 insert(x); 120 takeGuard_.release(); 121 return true; 122 } 123 catch (ClassCastException ex) { 124 putGuard_.release(); 125 throw ex; 126 } 127 } 128 } 129 130 public Object take() throws InterruptedException { 131 if (Thread.interrupted()) throw new InterruptedException (); 132 takeGuard_.acquire(); 133 try { 134 Object x = extract(); 135 putGuard_.release(); 136 return x; 137 } 138 catch (ClassCastException ex) { 139 takeGuard_.release(); 140 throw ex; 141 } 142 } 143 144 public Object poll(long msecs) throws InterruptedException { 145 if (Thread.interrupted()) throw new InterruptedException (); 146 if (!takeGuard_.attempt(msecs)) 147 return null; 148 else { 149 try { 150 Object x = extract(); 151 putGuard_.release(); 152 return x; 153 } 154 catch (ClassCastException ex) { 155 takeGuard_.release(); 156 throw ex; 157 } 158 } 159 } 160 161 } 162 | Popular Tags |