KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > util > BoundedLinkedQueue


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.util;
18
19 import java.util.ArrayList JavaDoc;
20 import java.util.List JavaDoc;
21
22 public class BoundedLinkedQueue {
23
24     public static class LinkedNode {
25         public Object JavaDoc value;
26
27         public LinkedNode next;
28
29         public LinkedNode() {
30         }
31
32         public LinkedNode(Object JavaDoc x) {
33             value = x;
34         }
35
36         public LinkedNode(Object JavaDoc x, LinkedNode n) {
37             value = x;
38             next = n;
39         }
40     }
41
42     /*
43      * It might be a bit nicer if this were declared as a subclass of
44      * LinkedQueue, or a sibling class of a common abstract class. It shares
45      * much of the basic design and bookkeeping fields. But too many details
46      * differ to make this worth doing.
47      */

48
49     /**
50      * Dummy header node of list. The first actual node, if it exists, is always
51      * at head_.next. After each take, the old first node becomes the head.
52      */

53     protected LinkedNode head_;
54
55     /**
56      * The last node of list. Put() appends to list, so modifies last_
57      */

58     protected LinkedNode last_;
59
60     /**
61      * Helper monitor. Ensures that only one put at a time executes.
62      */

63
64     protected final Object JavaDoc putGuard_ = new Object JavaDoc();
65
66     /**
67      * Helper monitor. Protects and provides wait queue for takes
68      */

69
70     protected final Object JavaDoc takeGuard_ = new Object JavaDoc();
71
72     /** Number of elements allowed * */
73     protected int capacity_;
74
75     /**
76      * One side of a split permit count. The counts represent permits to do a
77      * put. (The queue is full when zero). Invariant: putSidePutPermits_ +
78      * takeSidePutPermits_ = capacity_ - length. (The length is never separately
79      * recorded, so this cannot be checked explicitly.) To minimize contention
80      * between puts and takes, the put side uses up all of its permits before
81      * transfering them from the take side. The take side just increments the
82      * count upon each take. Thus, most puts and take can run independently of
83      * each other unless the queue is empty or full. Initial value is queue
84      * capacity.
85      */

86
87     protected int putSidePutPermits_;
88
89     /** Number of takes since last reconcile * */
90     protected int takeSidePutPermits_ = 0;
91     
92     /** Close flag */
93     protected volatile boolean closed;
94
95     /**
96      * Create a queue with the given capacity
97      *
98      * @exception IllegalArgumentException
99      * if capacity less or equal to zero
100      */

101     public BoundedLinkedQueue(int capacity) {
102         if (capacity <= 0)
103             throw new IllegalArgumentException JavaDoc();
104         capacity_ = capacity;
105         putSidePutPermits_ = capacity;
106         head_ = new LinkedNode(null);
107         last_ = head_;
108     }
109
110     /**
111      * Create a queue with the current default capacity
112      */

113
114     public BoundedLinkedQueue() {
115         this(1024);
116     }
117
118     /**
119      * Move put permits from take side to put side; return the number of put
120      * side permits that are available. Call only under synch on puGuard_ AND
121      * this.
122      */

123     protected final int reconcilePutPermits() {
124         putSidePutPermits_ += takeSidePutPermits_;
125         takeSidePutPermits_ = 0;
126         return putSidePutPermits_;
127     }
128
129     /** Return the current capacity of this queue * */
130     public synchronized int capacity() {
131         return capacity_;
132     }
133
134     /**
135      * Return the number of elements in the queue. This is only a snapshot
136      * value, that may be in the midst of changing. The returned value will be
137      * unreliable in the presence of active puts and takes, and should only be
138      * used as a heuristic estimate, for example for resource monitoring
139      * purposes.
140      */

141     public synchronized int size() {
142         /*
143          * This should ideally synch on putGuard_, but doing so would cause it
144          * to block waiting for an in-progress put, which might be stuck. So we
145          * instead use whatever value of putSidePutPermits_ that we happen to
146          * read.
147          */

148         return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
149     }
150
151     /**
152      * Reset the capacity of this queue. If the new capacity is less than the
153      * old capacity, existing elements are NOT removed, but incoming puts will
154      * not proceed until the number of elements is less than the new capacity.
155      *
156      * @exception IllegalArgumentException
157      * if capacity less or equal to zero
158      */

159
160     public void setCapacity(int newCapacity) {
161         if (newCapacity <= 0)
162             throw new IllegalArgumentException JavaDoc();
163         synchronized (putGuard_) {
164             synchronized (this) {
165                 takeSidePutPermits_ += (newCapacity - capacity_);
166                 capacity_ = newCapacity;
167
168                 // Force immediate reconcilation.
169
reconcilePutPermits();
170                 notifyAll();
171             }
172         }
173     }
174
175     /** Main mechanics for take/poll * */
176     protected synchronized Object JavaDoc extract() {
177         synchronized (head_) {
178             Object JavaDoc x = null;
179             LinkedNode first = head_.next;
180             if (first != null) {
181                 x = first.value;
182                 first.value = null;
183                 head_ = first;
184                 ++takeSidePutPermits_;
185                 notify();
186             }
187             return x;
188         }
189     }
190
191     public Object JavaDoc peek() {
192         if (closed)
193             throw new IllegalStateException JavaDoc("Channel is closed");
194         synchronized (head_) {
195             LinkedNode first = head_.next;
196             if (first != null)
197                 return first.value;
198             else
199                 return null;
200         }
201     }
202
203     public Object JavaDoc take() throws InterruptedException JavaDoc {
204         if (Thread.interrupted())
205             throw new InterruptedException JavaDoc();
206         if (closed)
207             throw new IllegalStateException JavaDoc("Channel is closed");
208         
209         Object JavaDoc x = extract();
210         if (x != null)
211             return x;
212         else {
213             synchronized (takeGuard_) {
214                 try {
215                     for (;;) {
216                         x = extract();
217                         if (x != null) {
218                             return x;
219                         } else {
220                             if (closed)
221                                 throw new IllegalStateException JavaDoc("Channel is closed");
222                             takeGuard_.wait();
223                         }
224                     }
225                 } catch (InterruptedException JavaDoc ex) {
226                     takeGuard_.notify();
227                     throw ex;
228                 }
229             }
230         }
231     }
232
233     public Object JavaDoc poll(long msecs) throws InterruptedException JavaDoc {
234         if (Thread.interrupted())
235             throw new InterruptedException JavaDoc();
236         if (closed)
237             throw new IllegalStateException JavaDoc("Channel is closed");
238
239         Object JavaDoc x = extract();
240         if (x != null)
241             return x;
242         else {
243             synchronized (takeGuard_) {
244                 try {
245                     long waitTime = msecs;
246                     long start = (msecs <= 0) ? 0 : System.currentTimeMillis();
247                     for (;;) {
248                         x = extract();
249                         if (x != null || waitTime <= 0) {
250                             return x;
251                         } else {
252                             if (closed)
253                                 throw new IllegalStateException JavaDoc("Channel is closed");
254                             takeGuard_.wait(waitTime);
255                             waitTime = msecs
256                                     - (System.currentTimeMillis() - start);
257                         }
258                     }
259                 } catch (InterruptedException JavaDoc ex) {
260                     takeGuard_.notify();
261                     throw ex;
262                 }
263             }
264         }
265     }
266
267     /** Notify a waiting take if needed * */
268     protected final void allowTake() {
269         synchronized (takeGuard_) {
270             takeGuard_.notify();
271         }
272     }
273
274     /**
275      * Create and insert a node. Call only under synch on putGuard_
276      */

277     protected void insert(Object JavaDoc x) {
278         --putSidePutPermits_;
279         LinkedNode p = new LinkedNode(x);
280         synchronized (last_) {
281             last_.next = p;
282             last_ = p;
283         }
284     }
285
286     /*
287      * put and offer(ms) differ only in policy before insert/allowTake
288      */

289
290     public void put(Object JavaDoc x) throws InterruptedException JavaDoc {
291         if (x == null)
292             throw new IllegalArgumentException JavaDoc();
293         if (Thread.interrupted())
294             throw new InterruptedException JavaDoc();
295         if (closed)
296             throw new IllegalStateException JavaDoc("Channel is closed");
297
298         synchronized (putGuard_) {
299
300             if (putSidePutPermits_ <= 0) { // wait for permit.
301
synchronized (this) {
302                     if (reconcilePutPermits() <= 0) {
303                         try {
304                             for (;;) {
305                                 if (closed)
306                                     throw new IllegalStateException JavaDoc("Channel is closed");
307                                 wait();
308                                 if (reconcilePutPermits() > 0) {
309                                     break;
310                                 }
311                             }
312                         } catch (InterruptedException JavaDoc ex) {
313                             notify();
314                             throw ex;
315                         }
316                     }
317                 }
318             }
319             insert(x);
320         }
321         // call outside of lock to loosen put/take coupling
322
allowTake();
323     }
324
325     public boolean offer(Object JavaDoc x, long msecs) throws InterruptedException JavaDoc {
326         if (x == null)
327             throw new IllegalArgumentException JavaDoc();
328         if (Thread.interrupted())
329             throw new InterruptedException JavaDoc();
330         if (closed)
331             throw new IllegalStateException JavaDoc("Channel is closed");
332
333         synchronized (putGuard_) {
334
335             if (putSidePutPermits_ <= 0) {
336                 synchronized (this) {
337                     if (reconcilePutPermits() <= 0) {
338                         if (msecs <= 0)
339                             return false;
340                         else {
341                             try {
342                                 long waitTime = msecs;
343                                 long start = System.currentTimeMillis();
344
345                                 for (;;) {
346                                     if (closed)
347                                         throw new IllegalStateException JavaDoc("Channel is closed");
348                                     wait(waitTime);
349                                     if (reconcilePutPermits() > 0) {
350                                         break;
351                                     } else {
352                                         waitTime = msecs
353                                                 - (System.currentTimeMillis() - start);
354                                         if (waitTime <= 0) {
355                                             return false;
356                                         }
357                                     }
358                                 }
359                             } catch (InterruptedException JavaDoc ex) {
360                                 notify();
361                                 throw ex;
362                             }
363                         }
364                     }
365                 }
366             }
367
368             insert(x);
369         }
370
371         allowTake();
372         return true;
373     }
374
375     public boolean isEmpty() {
376         synchronized (head_) {
377             return head_.next == null;
378         }
379     }
380     
381     public synchronized List JavaDoc closeAndFlush() {
382         // Set this queue as closed
383
closed = true;
384         // No more puts is allowed
385
synchronized (putGuard_) {
386             synchronized (this) {
387                 takeSidePutPermits_ -= capacity_;
388                 capacity_ = 0;
389
390                 // Force immediate reconcilation.
391
reconcilePutPermits();
392                 notifyAll();
393             }
394         }
395         synchronized (takeGuard_) {
396             takeGuard_.notifyAll();
397         }
398         ArrayList JavaDoc l = new ArrayList JavaDoc();
399         Object JavaDoc o;
400         while ((o = extract()) != null) {
401             l.add(o);
402         }
403         return l;
404     }
405
406 }
407
Popular Tags