1 2 package EDU.oswego.cs.dl.util.concurrent.misc; 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 5 6 public class CVBuffer implements BoundedChannel { 7 private final Mutex mutex; 8 private final CondVar notFull; 9 private final CondVar notEmpty; 10 private int count = 0; 11 private int takePtr = 0; 12 private int putPtr = 0; 13 private final Object [] array; 14 15 public CVBuffer(int cap) { 16 array = new Object [cap]; 17 mutex = new Mutex(); 18 notFull = new CondVar(mutex); 19 notEmpty = new CondVar(mutex); 20 } 21 22 public CVBuffer() { 23 this(DefaultChannelCapacity.get()); 24 } 25 26 public int capacity() { return array.length; } 27 28 public void put(Object x) throws InterruptedException { 29 mutex.acquire(); 30 try { 31 while (count == array.length) { 32 notFull.await(); 33 } 34 array[putPtr] = x; 35 putPtr = (putPtr + 1) % array.length; 36 ++count; 37 notEmpty.signal(); 38 } 39 finally { 40 mutex.release(); 41 } 42 } 43 44 public Object take() throws InterruptedException { 45 Object x = null; 46 mutex.acquire(); 47 try { 48 while (count == 0) { 49 notEmpty.await(); 50 } 51 x = array[takePtr]; 52 array[takePtr] = null; 53 takePtr = (takePtr + 1) % array.length; 54 --count; 55 notFull.signal(); 56 } 57 finally { 58 mutex.release(); 59 } 60 return x; 61 } 62 63 public boolean offer(Object x, long msecs) throws InterruptedException { 64 mutex.acquire(); 65 try { 66 if (count == array.length) { 67 notFull.timedwait(msecs); 68 if (count == array.length) 69 return false; 70 } 71 array[putPtr] = x; 72 putPtr = (putPtr + 1) % array.length; 73 ++count; 74 notEmpty.signal(); 75 return true; 76 } 77 finally { 78 mutex.release(); 79 } 80 } 81 82 public Object poll(long msecs) throws InterruptedException { 83 Object x = null; 84 mutex.acquire(); 85 try { 86 if (count == 0) { 87 notEmpty.timedwait(msecs); 88 if (count == 0) 89 return null; 90 } 91 x = array[takePtr]; 92 array[takePtr] = null; 93 takePtr = (takePtr + 1) % array.length; 94 --count; 95 notFull.signal(); 96 } 97 finally { 98 mutex.release(); 99 } 100 return x; 101 } 102 103 public Object peek() { 104 try { 105 mutex.acquire(); 106 try { 107 if (count == 0) 108 return null; 109 else 110 return array[takePtr]; 111 } 112 finally { 113 mutex.release(); 114 } 115 } 116 catch (InterruptedException ex) { 117 Thread.currentThread().interrupt(); 118 return null; 119 } 120 } 121 122 } 123 124 | Popular Tags |