1 17 package org.apache.servicemix.jbi.util; 18 19 import java.util.ArrayList ; 20 import java.util.List ; 21 22 public class BoundedLinkedQueue { 23 24 public static class LinkedNode { 25 public Object value; 26 27 public LinkedNode next; 28 29 public LinkedNode() { 30 } 31 32 public LinkedNode(Object x) { 33 value = x; 34 } 35 36 public LinkedNode(Object x, LinkedNode n) { 37 value = x; 38 next = n; 39 } 40 } 41 42 48 49 53 protected LinkedNode head_; 54 55 58 protected LinkedNode last_; 59 60 63 64 protected final Object putGuard_ = new Object (); 65 66 69 70 protected final Object takeGuard_ = new Object (); 71 72 73 protected int capacity_; 74 75 86 87 protected int putSidePutPermits_; 88 89 90 protected int takeSidePutPermits_ = 0; 91 92 93 protected volatile boolean closed; 94 95 101 public BoundedLinkedQueue(int capacity) { 102 if (capacity <= 0) 103 throw new IllegalArgumentException (); 104 capacity_ = capacity; 105 putSidePutPermits_ = capacity; 106 head_ = new LinkedNode(null); 107 last_ = head_; 108 } 109 110 113 114 public BoundedLinkedQueue() { 115 this(1024); 116 } 117 118 123 protected final int reconcilePutPermits() { 124 putSidePutPermits_ += takeSidePutPermits_; 125 takeSidePutPermits_ = 0; 126 return putSidePutPermits_; 127 } 128 129 130 public synchronized int capacity() { 131 return capacity_; 132 } 133 134 141 public synchronized int size() { 142 148 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_); 149 } 150 151 159 160 public void setCapacity(int newCapacity) { 161 if (newCapacity <= 0) 162 throw new IllegalArgumentException (); 163 synchronized (putGuard_) { 164 synchronized (this) { 165 takeSidePutPermits_ += (newCapacity - capacity_); 166 capacity_ = newCapacity; 167 168 reconcilePutPermits(); 170 notifyAll(); 171 } 172 } 173 } 174 175 176 protected synchronized Object extract() { 177 synchronized (head_) { 178 Object x = null; 179 LinkedNode first = head_.next; 180 if (first != null) { 181 x = first.value; 182 first.value = null; 183 head_ = first; 184 ++takeSidePutPermits_; 185 notify(); 186 } 187 return x; 188 } 189 } 190 191 public Object peek() { 192 if (closed) 193 throw new IllegalStateException ("Channel is closed"); 194 synchronized (head_) { 195 LinkedNode first = head_.next; 196 if (first != null) 197 return first.value; 198 else 199 return null; 200 } 201 } 202 203 public Object take() throws InterruptedException { 204 if (Thread.interrupted()) 205 throw new InterruptedException (); 206 if (closed) 207 throw new IllegalStateException ("Channel is closed"); 208 209 Object x = extract(); 210 if (x != null) 211 return x; 212 else { 213 synchronized (takeGuard_) { 214 try { 215 for (;;) { 216 x = extract(); 217 if (x != null) { 218 return x; 219 } else { 220 if (closed) 221 throw new IllegalStateException ("Channel is closed"); 222 takeGuard_.wait(); 223 } 224 } 225 } catch (InterruptedException ex) { 226 takeGuard_.notify(); 227 throw ex; 228 } 229 } 230 } 231 } 232 233 public Object poll(long msecs) throws InterruptedException { 234 if (Thread.interrupted()) 235 throw new InterruptedException (); 236 if (closed) 237 throw new IllegalStateException ("Channel is closed"); 238 239 Object x = extract(); 240 if (x != null) 241 return x; 242 else { 243 synchronized (takeGuard_) { 244 try { 245 long waitTime = msecs; 246 long start = (msecs <= 0) ? 0 : System.currentTimeMillis(); 247 for (;;) { 248 x = extract(); 249 if (x != null || waitTime <= 0) { 250 return x; 251 } else { 252 if (closed) 253 throw new IllegalStateException ("Channel is closed"); 254 takeGuard_.wait(waitTime); 255 waitTime = msecs 256 - (System.currentTimeMillis() - start); 257 } 258 } 259 } catch (InterruptedException ex) { 260 takeGuard_.notify(); 261 throw ex; 262 } 263 } 264 } 265 } 266 267 268 protected final void allowTake() { 269 synchronized (takeGuard_) { 270 takeGuard_.notify(); 271 } 272 } 273 274 277 protected void insert(Object x) { 278 --putSidePutPermits_; 279 LinkedNode p = new LinkedNode(x); 280 synchronized (last_) { 281 last_.next = p; 282 last_ = p; 283 } 284 } 285 286 289 290 public void put(Object x) throws InterruptedException { 291 if (x == null) 292 throw new IllegalArgumentException (); 293 if (Thread.interrupted()) 294 throw new InterruptedException (); 295 if (closed) 296 throw new IllegalStateException ("Channel is closed"); 297 298 synchronized (putGuard_) { 299 300 if (putSidePutPermits_ <= 0) { synchronized (this) { 302 if (reconcilePutPermits() <= 0) { 303 try { 304 for (;;) { 305 if (closed) 306 throw new IllegalStateException ("Channel is closed"); 307 wait(); 308 if (reconcilePutPermits() > 0) { 309 break; 310 } 311 } 312 } catch (InterruptedException ex) { 313 notify(); 314 throw ex; 315 } 316 } 317 } 318 } 319 insert(x); 320 } 321 allowTake(); 323 } 324 325 public boolean offer(Object x, long msecs) throws InterruptedException { 326 if (x == null) 327 throw new IllegalArgumentException (); 328 if (Thread.interrupted()) 329 throw new InterruptedException (); 330 if (closed) 331 throw new IllegalStateException ("Channel is closed"); 332 333 synchronized (putGuard_) { 334 335 if (putSidePutPermits_ <= 0) { 336 synchronized (this) { 337 if (reconcilePutPermits() <= 0) { 338 if (msecs <= 0) 339 return false; 340 else { 341 try { 342 long waitTime = msecs; 343 long start = System.currentTimeMillis(); 344 345 for (;;) { 346 if (closed) 347 throw new IllegalStateException ("Channel is closed"); 348 wait(waitTime); 349 if (reconcilePutPermits() > 0) { 350 break; 351 } else { 352 waitTime = msecs 353 - (System.currentTimeMillis() - start); 354 if (waitTime <= 0) { 355 return false; 356 } 357 } 358 } 359 } catch (InterruptedException ex) { 360 notify(); 361 throw ex; 362 } 363 } 364 } 365 } 366 } 367 368 insert(x); 369 } 370 371 allowTake(); 372 return true; 373 } 374 375 public boolean isEmpty() { 376 synchronized (head_) { 377 return head_.next == null; 378 } 379 } 380 381 public synchronized List closeAndFlush() { 382 closed = true; 384 synchronized (putGuard_) { 386 synchronized (this) { 387 takeSidePutPermits_ -= capacity_; 388 capacity_ = 0; 389 390 reconcilePutPermits(); 392 notifyAll(); 393 } 394 } 395 synchronized (takeGuard_) { 396 takeGuard_.notifyAll(); 397 } 398 ArrayList l = new ArrayList (); 399 Object o; 400 while ((o = extract()) != null) { 401 l.add(o); 402 } 403 return l; 404 } 405 406 } 407 | Popular Tags |