Code - Class EDU.oswego.cs.dl.util.concurrent.misc.CVBuffer


1
2 package EDU.oswego.cs.dl.util.concurrent.misc;
3 import EDU.oswego.cs.dl.util.concurrent.*;
4
5
6 public class CVBuffer implements BoundedChannel {
7   private final Mutex mutex;
8   private final CondVar notFull;
9   private final CondVar notEmpty;
10   private int count = 0;
11   private int takePtr = 0;
12   private int putPtr = 0;
13   private final Object[] array;
14
15   public CVBuffer(int cap) {
16     array = new Object[cap];
17     mutex = new Mutex();
18     notFull = new CondVar(mutex);
19     notEmpty = new CondVar(mutex);
20   }
21
22   public CVBuffer() {
23     this(DefaultChannelCapacity.get());
24   }
25
26   public int capacity() { return array.length; }
27
28   public void put(Object x) throws InterruptedException {
29     mutex.acquire();
30     try {
31       while (count == array.length) {
32         notFull.await();
33       }
34       array[putPtr] = x;
35       putPtr = (putPtr + 1) % array.length;
36       ++count;
37       notEmpty.signal();
38     }
39     finally {
40       mutex.release();
41     }
42   }
43
44   public Object take() throws InterruptedException {
45     Object x = null;
46     mutex.acquire();
47     try {
48       while (count == 0) {
49         notEmpty.await();
50       }
51       x = array[takePtr];
52       array[takePtr] = null;
53       takePtr = (takePtr + 1) % array.length;
54       --count;
55       notFull.signal();
56     }
57     finally {
58       mutex.release();
59     }
60     return x;
61   }
62     
63   public boolean offer(Object x, long msecs) throws InterruptedException {
64     mutex.acquire();
65     try {
66       if (count == array.length) {
67         notFull.timedwait(msecs);
68         if (count == array.length)
69           return false;
70       }
71       array[putPtr] = x;
72       putPtr = (putPtr + 1) % array.length;
73       ++count;
74       notEmpty.signal();
75       return true;
76     }
77     finally {
78       mutex.release();
79     }
80   }
81   
82   public Object poll(long msecs) throws InterruptedException {
83     Object x = null;
84     mutex.acquire();
85     try {
86       if (count == 0) {
87         notEmpty.timedwait(msecs);
88         if (count == 0)
89           return null;
90       }
91       x = array[takePtr];
92       array[takePtr] = null;
93       takePtr = (takePtr + 1) % array.length;
94       --count;
95       notFull.signal();
96     }
97     finally {
98       mutex.release();
99     }
100     return x;
101   }
102
103   public Object peek() {
104     try {
105       mutex.acquire();
106       try {
107         if (count == 0)
108           return null;
109         else
110           return array[takePtr];
111       }
112       finally {
113         mutex.release();
114       }
115     }
116     catch (InterruptedException ex) {
117       Thread.currentThread().interrupt();
118       return null;
119     }
120   }
121
122 }
123
124

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates