1 16 17 package org.dbunit.util.concurrent; 18 19 24 25 public class BoundedBuffer implements BoundedChannel { 26 27 protected final Object [] array_; 29 protected int takePtr_ = 0; protected int putPtr_ = 0; 31 32 protected int usedSlots_ = 0; protected int emptySlots_; 35 38 protected final Object putMonitor_ = new Object (); 39 40 44 public BoundedBuffer(int capacity) throws IllegalArgumentException { 45 if (capacity <= 0) throw new IllegalArgumentException (); 46 array_ = new Object [capacity]; 47 emptySlots_ = capacity; 48 } 49 50 53 54 public BoundedBuffer() { 55 this(DefaultChannelCapacity.get()); 56 } 57 58 63 public synchronized int size() { return usedSlots_; } 64 65 public int capacity() { return array_.length; } 66 67 protected void incEmptySlots() { 68 synchronized(putMonitor_) { 69 ++emptySlots_; 70 putMonitor_.notify(); 71 } 72 } 73 74 protected synchronized void incUsedSlots() { 75 ++usedSlots_; 76 notify(); 77 } 78 79 protected final void insert(Object x) { --emptySlots_; 81 array_[putPtr_] = x; 82 if (++putPtr_ >= array_.length) putPtr_ = 0; 83 } 84 85 protected final Object extract() { --usedSlots_; 87 Object old = array_[takePtr_]; 88 array_[takePtr_] = null; 89 if (++takePtr_ >= array_.length) takePtr_ = 0; 90 return old; 91 } 92 93 public Object peek() { 94 synchronized(this) { 95 if (usedSlots_ > 0) 96 return array_[takePtr_]; 97 else 98 return null; 99 } 100 } 101 102 103 public void put(Object x) throws InterruptedException { 104 if (x == null) throw new IllegalArgumentException (); 105 if (Thread.interrupted()) throw new InterruptedException (); 106 107 synchronized(putMonitor_) { 108 while (emptySlots_ <= 0) { 109 try { putMonitor_.wait(); } 110 catch (InterruptedException ex) { 111 putMonitor_.notify(); 112 throw ex; 113 } 114 } 115 insert(x); 116 } 117 incUsedSlots(); 118 } 119 120 public boolean offer(Object x, long msecs) throws InterruptedException { 121 if (x == null) throw new IllegalArgumentException (); 122 if (Thread.interrupted()) throw new InterruptedException (); 123 124 synchronized(putMonitor_) { 125 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); 126 long waitTime = msecs; 127 while (emptySlots_ <= 0) { 128 if (waitTime <= 0) return false; 129 try { putMonitor_.wait(waitTime); } 130 catch (InterruptedException ex) { 131 putMonitor_.notify(); 132 throw ex; 133 } 134 waitTime = msecs - (System.currentTimeMillis() - start); 135 } 136 insert(x); 137 } 138 incUsedSlots(); 139 return true; 140 } 141 142 143 144 public Object take() throws InterruptedException { 145 if (Thread.interrupted()) throw new InterruptedException (); 146 Object old = null; 147 synchronized(this) { 148 while (usedSlots_ <= 0) { 149 try { wait(); } 150 catch (InterruptedException ex) { 151 notify(); 152 throw ex; 153 } 154 } 155 old = extract(); 156 } 157 incEmptySlots(); 158 return old; 159 } 160 161 public Object poll(long msecs) throws InterruptedException { 162 if (Thread.interrupted()) throw new InterruptedException (); 163 Object old = null; 164 synchronized(this) { 165 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); 166 long waitTime = msecs; 167 168 while (usedSlots_ <= 0) { 169 if (waitTime <= 0) return null; 170 try { wait(waitTime); } 171 catch (InterruptedException ex) { 172 notify(); 173 throw ex; 174 } 175 waitTime = msecs - (System.currentTimeMillis() - start); 176 177 } 178 old = extract(); 179 } 180 incEmptySlots(); 181 return old; 182 } 183 184 } 185 186 187 | Popular Tags |