1 17 18 package org.dbunit.util.concurrent; 19 20 45 46 public class BoundedLinkedQueue implements BoundedChannel { 47 48 55 56 57 58 62 protected LinkedNode head_; 63 64 67 protected LinkedNode last_; 68 69 70 73 74 protected final Object putGuard_ = new Object (); 75 76 79 80 protected final Object takeGuard_ = new Object (); 81 82 83 84 protected int capacity_; 85 86 87 100 101 protected int putSidePutPermits_; 102 103 104 protected int takeSidePutPermits_ = 0; 105 106 107 111 public BoundedLinkedQueue(int capacity) { 112 if (capacity <= 0) throw new IllegalArgumentException (); 113 capacity_ = capacity; 114 putSidePutPermits_ = capacity; 115 head_ = new LinkedNode(null); 116 last_ = head_; 117 } 118 119 122 123 public BoundedLinkedQueue() { 124 this(DefaultChannelCapacity.get()); 125 } 126 127 132 protected final int reconcilePutPermits() { 133 putSidePutPermits_ += takeSidePutPermits_; 134 takeSidePutPermits_ = 0; 135 return putSidePutPermits_; 136 } 137 138 139 140 public synchronized int capacity() { return capacity_; } 141 142 143 150 public synchronized int size() { 151 157 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_); 158 } 159 160 161 169 170 public void setCapacity(int newCapacity) { 171 if (newCapacity <= 0) throw new IllegalArgumentException (); 172 synchronized (putGuard_) { 173 synchronized(this) { 174 takeSidePutPermits_ += (newCapacity - capacity_); 175 capacity_ = newCapacity; 176 177 reconcilePutPermits(); 179 notifyAll(); 180 } 181 } 182 } 183 184 185 186 protected synchronized Object extract() { 187 synchronized(head_) { 188 Object x = null; 189 LinkedNode first = head_.next; 190 if (first != null) { 191 x = first.value; 192 first.value = null; 193 head_ = first; 194 ++takeSidePutPermits_; 195 notify(); 196 } 197 return x; 198 } 199 } 200 201 public Object peek() { 202 synchronized(head_) { 203 LinkedNode first = head_.next; 204 if (first != null) 205 return first.value; 206 else 207 return null; 208 } 209 } 210 211 public Object take() throws InterruptedException { 212 if (Thread.interrupted()) throw new InterruptedException (); 213 Object x = extract(); 214 if (x != null) 215 return x; 216 else { 217 synchronized(takeGuard_) { 218 try { 219 for (;;) { 220 x = extract(); 221 if (x != null) { 222 return x; 223 } 224 else { 225 takeGuard_.wait(); 226 } 227 } 228 } 229 catch(InterruptedException ex) { 230 takeGuard_.notify(); 231 throw ex; 232 } 233 } 234 } 235 } 236 237 public Object poll(long msecs) throws InterruptedException { 238 if (Thread.interrupted()) throw new InterruptedException (); 239 Object x = extract(); 240 if (x != null) 241 return x; 242 else { 243 synchronized(takeGuard_) { 244 try { 245 long waitTime = msecs; 246 long start = (msecs <= 0)? 0: System.currentTimeMillis(); 247 for (;;) { 248 x = extract(); 249 if (x != null || waitTime <= 0) { 250 return x; 251 } 252 else { 253 takeGuard_.wait(waitTime); 254 waitTime = msecs - (System.currentTimeMillis() - start); 255 } 256 } 257 } 258 catch(InterruptedException ex) { 259 takeGuard_.notify(); 260 throw ex; 261 } 262 } 263 } 264 } 265 266 267 protected final void allowTake() { 268 synchronized(takeGuard_) { 269 takeGuard_.notify(); 270 } 271 } 272 273 274 278 protected void insert(Object x) { 279 --putSidePutPermits_; 280 LinkedNode p = new LinkedNode(x); 281 synchronized(last_) { 282 last_.next = p; 283 last_ = p; 284 } 285 } 286 287 288 291 292 public void put(Object x) throws InterruptedException { 293 if (x == null) throw new IllegalArgumentException (); 294 if (Thread.interrupted()) throw new InterruptedException (); 295 296 synchronized(putGuard_) { 297 298 if (putSidePutPermits_ <= 0) { synchronized(this) { 300 if (reconcilePutPermits() <= 0) { 301 try { 302 for(;;) { 303 wait(); 304 if (reconcilePutPermits() > 0) { 305 break; 306 } 307 } 308 } 309 catch (InterruptedException ex) { 310 notify(); 311 throw ex; 312 } 313 } 314 } 315 } 316 insert(x); 317 } 318 allowTake(); 320 } 321 322 public boolean offer(Object x, long msecs) throws InterruptedException { 323 if (x == null) throw new IllegalArgumentException (); 324 if (Thread.interrupted()) throw new InterruptedException (); 325 326 synchronized(putGuard_) { 327 328 if (putSidePutPermits_ <= 0) { 329 synchronized(this) { 330 if (reconcilePutPermits() <= 0) { 331 if (msecs <= 0) 332 return false; 333 else { 334 try { 335 long waitTime = msecs; 336 long start = System.currentTimeMillis(); 337 338 for(;;) { 339 wait(waitTime); 340 if (reconcilePutPermits() > 0) { 341 break; 342 } 343 else { 344 waitTime = msecs - (System.currentTimeMillis() - start); 345 if (waitTime <= 0) { 346 return false; 347 } 348 } 349 } 350 } 351 catch (InterruptedException ex) { 352 notify(); 353 throw ex; 354 } 355 } 356 } 357 } 358 } 359 360 insert(x); 361 } 362 363 allowTake(); 364 return true; 365 } 366 367 public boolean isEmpty() { 368 synchronized(head_) { 369 return head_.next == null; 370 } 371 } 372 373 } 374 | Popular Tags |