KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > util > timeout > TimeoutPriorityQueueImpl


1 /*
2  * JBoss, Home of Professional Open Source
3  * Copyright 2006, JBoss Inc., and individual contributors as indicated
4  * by the @authors tag. See the copyright.txt in the distribution for a
5  * full listing of individual contributors.
6  *
7  * This is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as
9  * published by the Free Software Foundation; either version 2.1 of
10  * the License, or (at your option) any later version.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this software; if not, write to the Free
19  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21  */

22 package org.jboss.util.timeout;
23
24 /**
25  * TimeoutPriorityQueueImpl.
26  *
27  * This is a balanced binary tree. If nonempty, the root is at index 1,
28  * and all nodes are at indices 1..size. Nodes with index greater than
29  * size are null. Index 0 is never used.
30  * Children of the node at index <code>j</code> are at <code>j*2</code>
31  * and <code>j*2+1</code>. The children of a node always fire the timeout
32  * no earlier than the node.
33  *
34  *
35  * Or, more formally:
36  *
37  * Only indices <code>1</code>..<code>size</code> of this array are used.
38  * All other indices contain the null reference.
39  * This array represent a balanced binary tree.
40  *
41  * If <code>size</code> is <code>0</code> the tree is empty, otherwise
42  * the root of the tree is at index <code>1</code>.
43  *
44  * Given an arbitrary node at index <code>n</code> that is not the root
45  * node, the parent node of <code>n</code> is at index <code>n/2</code>.
46  *
47  * Given an arbitrary node at index <code>n</code>; if
48  * <code>2*n <= size</code> the node at <code>n</code> has its left child
49  * at index <code>2*n</code>, otherwise the node at <code>n</code> has
50  * no left child.
51  *
52  * Given an arbitrary node at index <code>n</code>; if
53  * <code>2*n+1 <= size</code> the node at <code>n</code> has its right child
54  * at index <code>2*n+1</code>, otherwise the node at <code>n</code> has
55  * no right child.
56  *
57  * The priority function is called T. Given a node <code>n</code>,
58  * <code>T(n)</code> denotes the absolute time (in milliseconds since
59  * the epoch) that the timeout for node <code>n</code> should happen.
60  * Smaller values of <code>T</code> means higher priority.
61  *
62  * The tree satisfies the following invariant:
63  * <i>
64  * For any node <code>n</code> in the tree:
65  * If node <code>n</code> has a left child <code>l</code>,
66  * <code>T(n) <= T(l)</code>.
67  * If node <code>n</code> has a right child <code>r</code>,
68  * <code>T(n) <= T(r)</code>.
69  * </i>
70  *
71  *
72  * The invariant may be temporarily broken while executing synchronized
73  * on <code>this</code> instance, but is always reestablished before
74  * leaving the synchronized code.
75  *
76  * The node at index <code>1</code> is always the first node to timeout,
77  * as can be deduced from the invariant.
78  *
79  * For the following algorithm pseudocode, the operation
80  * <code>swap(n,m)</code> denotes the exchange of the nodes at indices
81  * <code>n</code> and <code>m</code> in the tree.
82  *
83  * Insertion of a new node happend as follows:
84  * <pre>
85  * IF size = q.length THEN
86  * "expand q array to be larger";
87  * ENDIF
88  * size <- size + 1;
89  * q[size] <- "new node";
90  * n <- size;
91  * WHILE n > 1 AND T(n/2) > T(n) DO
92  * swap(n/2, n);
93  * n <- n/2;
94  * ENDWHILE
95  * </pre>
96  * Proof that this insertion algorithm respects the invariant is left to
97  * the interested reader.
98  *
99  * The removal algorithm is a bit more complicated. To remove the node
100  * at index <code>n</code>:
101  * <pre>
102  * swap(n, size);
103  * size <- size - 1;
104  * IF n > 1 AND T(n/2) > T(n) THEN
105  * WHILE n > 1 AND T(n/2) > T(n) DO
106  * swap(n/2, n);
107  * n <- n/2;
108  * ENDWHILE
109  * ELSE
110  * WHILE 2*n <= size DO
111  * IF 2*n+1 <= size THEN
112  * // Both children present
113  * IF T(2*n) <= T(2*n+1) THEN
114  * IF T(n) <= T(2*n) THEN
115  * EXIT;
116  * ENDIF
117  * swap(n, 2*n);
118  * n <- 2*n;
119  * ELSE
120  * IF T(n) <= T(2*n+1) THEN
121  * EXIT;
122  * ENDIF
123  * swap(n, 2*n+1);
124  * n <- 2*n+1;
125  * ENDIF
126  * ELSE
127  * // Only left child, right child not present.
128  * IF T(n) <= T(2*n) THEN
129  * EXIT;
130  * ENDIF
131  * swap(n, 2*n);
132  * n <- 2*n;
133  * ENDIF
134  * ENDWHILE
135  * ENDIF
136  * </pre>
137  * Proof that this removal algorithm respects the invariant is left to
138  * the interested reader. Really, I am not going to prove it here.
139  *
140  * If you are interested, you can find this data structure and its
141  * associated operations in most textbooks on algorithmics.
142  *
143  * @see checkTree
144  *
145  * @author <a HREF="osh@sparre.dk">Ole Husgaard</a>
146  * @author <a HREF="dimitris@jboss.org">Dimitris Andreadis</a>
147  * @author <a HREF="genman@maison-otaku.net">Elias Ross</a>
148  * @author <a HREF="adrian@jboss.com">Adrian Brock</a>
149  * @version $Revision: 1958 $
150  */

151 public class TimeoutPriorityQueueImpl implements TimeoutPriorityQueue
152 {
153    // Code commented out with the mark "INV:" are runtime checks
154
// of invariants that are not needed for a production system.
155
// For problem solving, you can remove these comments.
156
// Multithreading notes:
157
//
158
// While a TimeoutImpl is enqueued, its index field contains the index
159
// of the instance in the queue; that is, for 1 <= n <= size,
160
// q[n].index = n.
161
// Modifications of an enqueued TimeoutImpl instance may only happen
162
// in code synchronized on the TimeoutFactory instance that has it
163
// enqueued.
164
// Modifications on the priority queue may only happen while running in
165
// code synchronized on the TimeoutFactory instance that holds the queue.
166
// When a TimeoutImpl instance is no longer enqueued, its index field
167
// changes to one of the negative constants declared in the TimeoutImpl
168
// class.
169
//
170
// Cancellation may race with the timeout.
171
// To avoid problems with this, the TimeoutImpl index field is set to
172
// TimeoutImpl.TIMEOUT when the TimeoutImpl is taken out of the queue.
173
// Finally the index field is set to TimeoutImpl.DONE, and
174
// the TimeoutImpl instance is discarded.
175

176    /** The lock object */
177    private Object JavaDoc lock = new Object JavaDoc();
178
179    /** The size of the timeout queue. */
180    private int size;
181
182    /** The timeouts */
183    private TimeoutExtImpl[] queue;
184
185    /**
186     * Create a new TimeoutPriorityQueueImpl.
187     */

188    public TimeoutPriorityQueueImpl()
189    {
190       queue = new TimeoutExtImpl[16];
191       size = 0;
192    }
193
194    public TimeoutExt offer(long time, TimeoutTarget target)
195    {
196       if (queue == null)
197          throw new IllegalStateException JavaDoc("TimeoutPriorityQueue has been cancelled");
198       if (time < 0)
199          throw new IllegalArgumentException JavaDoc("Negative time");
200       if (target == null)
201          throw new IllegalArgumentException JavaDoc("Null timeout target");
202
203       synchronized (lock)
204       {
205          // INV: checkTree();
206
// INV: assertExpr(size < queue.length);
207
if (++size == queue.length)
208          {
209             TimeoutExtImpl[] newQ = new TimeoutExtImpl[2 * queue.length];
210             System.arraycopy(queue, 0, newQ, 0, queue.length);
211             queue = newQ;
212          }
213          // INV: assertExpr(size < queue.length);
214
// INV: assertExpr(queue[size] == null);
215
TimeoutExtImpl timeout;
216          timeout = queue[size] = new TimeoutExtImpl();
217          timeout.index = size;
218          timeout.time = time;
219          timeout.target = target;
220          normalizeUp(size);
221          if (timeout.index == 1)
222             lock.notify();
223          // INV: checkTree();
224
return timeout;
225       }
226    }
227
228    public TimeoutExt take()
229    {
230       return poll(-1);
231    }
232
233    public TimeoutExt poll()
234    {
235       return poll(1);
236    }
237
238    public TimeoutExt poll(long wait)
239    {
240       long endWait = -1;
241       if (wait > 0)
242          endWait = System.currentTimeMillis() + wait;
243       // Look for work
244
synchronized (lock)
245       {
246          while (queue != null && (wait >= 0 || endWait == -1))
247          {
248             if (size == 0)
249             {
250                try
251                {
252                   if (endWait == -1)
253                      lock.wait();
254                   else
255                      lock.wait(wait);
256                }
257                catch (InterruptedException JavaDoc ex)
258                {
259                }
260             }
261             else
262             {
263                long now = System.currentTimeMillis();
264                if (queue[1].time > now)
265                {
266                   long waitForFirst = queue[1].time - now;
267                   if (endWait != -1 && waitForFirst > wait)
268                      waitForFirst = wait;
269                   try
270                   {
271                      lock.wait(waitForFirst);
272                   }
273                   catch (InterruptedException JavaDoc ex)
274                   {
275                   }
276                }
277                if (size > 0 && queue != null && queue[1].time <= System.currentTimeMillis())
278                {
279                   TimeoutExtImpl result = removeNode(1);
280                   result.index = TimeoutExtImpl.TIMEOUT;
281                   return result;
282                }
283             }
284             if (endWait != -1)
285                wait = endWait - System.currentTimeMillis();
286          }
287       }
288       return null;
289    }
290
291    public TimeoutExt peek()
292    {
293       synchronized (lock)
294       {
295          if (size > 0)
296             return queue[1];
297          else
298             return null;
299       }
300    }
301
302    public boolean remove(TimeoutExt timeout)
303    {
304       TimeoutExtImpl timeoutImpl = (TimeoutExtImpl) timeout;
305       synchronized (lock)
306       {
307          if (timeoutImpl.index > 0)
308          {
309             // Active timeout, remove it.
310
// INV: assertExpr(queue[timeoutImpl.index] == timeout);
311
// INV: checkTree();
312
removeNode(timeoutImpl.index);
313             // INV: checkTree();
314
timeoutImpl.index = TimeoutExtImpl.DONE;
315
316             // execution cancelled
317
return true;
318          }
319          else
320          {
321             // has already been executed (DONE) or
322
// is currently executing (TIMEOUT)
323
return false;
324          }
325       }
326    }
327
328    public void clear()
329    {
330       synchronized (lock)
331       {
332          if (queue == null)
333             return;
334
335          // cleanup queue
336
for (int i = 1; i <= size; ++i)
337             queue[i] = cleanupTimeoutExtImpl(queue[i]);
338       }
339    }
340
341    public void cancel()
342    {
343       synchronized (lock)
344       {
345          if (queue == null)
346             return;
347          clear();
348          queue = null;
349          size = 0;
350          lock.notifyAll();
351       }
352    }
353
354    public int size()
355    {
356       return size;
357    }
358    
359    /**
360     * Whether the queue is cancelled
361     *
362     * @return true when cancelled
363     */

364    public boolean isCancelled()
365    {
366       return queue == null;
367    }
368
369    /**
370     * A new node has been added at index <code>index</code>.
371     * Normalize the tree by moving the new node up the tree.
372     *
373     * @return true if the tree was modified.
374     */

375    private boolean normalizeUp(int index)
376    {
377       // INV: assertExpr(index > 0);
378
// INV: assertExpr(index <= size);
379
// INV: assertExpr(queue[index] != null);
380
if (index == 1)
381          return false; // at root
382
boolean ret = false;
383       long t = queue[index].time;
384       int p = index >> 1;
385       while (queue[p].time > t)
386       {
387          // INV: assertExpr(queue[index].time == t);
388
swap(p, index);
389          ret = true;
390          if (p == 1)
391             break; // at root
392
index = p;
393          p >>= 1;
394       }
395       return ret;
396    }
397
398    /**
399     * Swap two nodes in the tree.
400     *
401     * @param a the first index
402     * @param b the second index
403     */

404    private void swap(int a, int b)
405    {
406       // INV: assertExpr(a > 0);
407
// INV: assertExpr(a <= size);
408
// INV: assertExpr(b > 0);
409
// INV: assertExpr(b <= size);
410
// INV: assertExpr(queue[a] != null);
411
// INV: assertExpr(queue[b] != null);
412
// INV: assertExpr(queue[a].index == a);
413
// INV: assertExpr(queue[b].index == b);
414
TimeoutExtImpl temp = queue[a];
415       queue[a] = queue[b];
416       queue[a].index = a;
417       queue[b] = temp;
418       queue[b].index = b;
419    }
420
421    /**
422     * Remove a node from the tree and normalize.
423     *
424     * @param index the index in the queue
425     * @return the removed node.
426     */

427    private TimeoutExtImpl removeNode(int index)
428    {
429       // INV: assertExpr(index > 0);
430
// INV: assertExpr(index <= size);
431
TimeoutExtImpl res = queue[index];
432       // INV: assertExpr(res != null);
433
// INV: assertExpr(res.index == index);
434
if (index == size)
435       {
436          --size;
437          queue[index] = null;
438          return res;
439       }
440       swap(index, size); // Exchange removed node with last leaf node
441
--size;
442       // INV: assertExpr(res.index == size + 1);
443
queue[res.index] = null;
444       if (normalizeUp(index))
445          return res; // Node moved up, so it shouldn't move down
446
long t = queue[index].time;
447       int c = index << 1;
448       while (c <= size)
449       {
450          // INV: assertExpr(q[index].time == t);
451
TimeoutExtImpl l = queue[c];
452          // INV: assertExpr(l != null);
453
// INV: assertExpr(l.index == c);
454
if (c + 1 <= size)
455          {
456             // two children, swap with smallest
457
TimeoutExtImpl r = queue[c + 1];
458             // INV: assertExpr(r != null);
459
// INV: assertExpr(r.index == c+1);
460
if (l.time <= r.time)
461             {
462                if (t <= l.time)
463                   break; // done
464
swap(index, c);
465                index = c;
466             }
467             else
468             {
469                if (t <= r.time)
470                   break; // done
471
swap(index, c + 1);
472                index = c + 1;
473             }
474          }
475          else
476          { // one child
477
if (t <= l.time)
478                break; // done
479
swap(index, c);
480             index = c;
481          }
482          c = index << 1;
483       }
484       return res;
485    }
486
487    /**
488     * Recursive cleanup of a TimeoutImpl
489     *
490     * @return null
491     */

492    private TimeoutExtImpl cleanupTimeoutExtImpl(TimeoutExtImpl timeout)
493    {
494       if (timeout != null)
495          timeout.target = null;
496       return null;
497    }
498
499    /**
500     * Check invariants of the queue.
501     */

502    void checkTree()
503    {
504       assertExpr(size >= 0);
505       assertExpr(size < queue.length);
506       assertExpr(queue[0] == null);
507       if (size > 0)
508       {
509          assertExpr(queue[1] != null);
510          assertExpr(queue[1].index == 1);
511          for (int i = 2; i <= size; ++i)
512          {
513             assertExpr(queue[i] != null);
514             assertExpr(queue[i].index == i);
515             assertExpr(queue[i >> 1].time <= queue[i].time); // parent fires first
516
}
517          for (int i = size + 1; i < queue.length; ++i)
518             assertExpr(queue[i] == null);
519       }
520    }
521
522    /**
523     * Debugging helper.
524     */

525    private void assertExpr(boolean expr)
526    {
527       if (!expr)
528          throw new IllegalStateException JavaDoc("***** assert failed *****");
529    }
530
531    /**
532     * Our private Timeout implementation.
533     */

534    private class TimeoutExtImpl implements TimeoutExt
535    {
536       /** Done */
537       static final int DONE = -1;
538
539       /** In timeout */
540       static final int TIMEOUT = -2;
541
542       /** Index in the queue */
543       int index;
544
545       /** Time of the timeout */
546       long time;
547
548       /** The timeout target */
549       TimeoutTarget target;
550
551       public long getTime()
552       {
553          return time;
554       }
555
556       public TimeoutTarget getTimeoutTarget()
557       {
558          return target;
559       }
560
561       public void done()
562       {
563          index = DONE;
564       }
565       
566       public boolean cancel()
567       {
568          return remove(this);
569       }
570    }
571 }
572
Popular Tags