KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > util > timeout > HashedTimeoutPriorityQueueImpl


1 /*
2  * JBoss, Home of Professional Open Source
3  * Copyright 2006, JBoss Inc., and individual contributors as indicated
4  * by the @authors tag. See the copyright.txt in the distribution for a
5  * full listing of individual contributors.
6  *
7  * This is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as
9  * published by the Free Software Foundation; either version 2.1 of
10  * the License, or (at your option) any later version.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this software; if not, write to the Free
19  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21  */

22 package org.jboss.util.timeout;
23
24 import org.jboss.util.JBossStringBuilder;
25
26 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
27
28 /**
29  * HashedTimeoutPriorityQueueImpl.
30  *
31  * This is a balanced binary tree. If nonempty, the root is at index 1,
32  * and all nodes are at indices 1..size. Nodes with index greater than
33  * size are null. Index 0 is never used.
34  * Children of the node at index <code>j</code> are at <code>j*2</code>
35  * and <code>j*2+1</code>. The children of a node always fire the timeout
36  * no earlier than the node.
37  *
38  *
39  * Or, more formally:
40  *
41  * Only indices <code>1</code>..<code>size</code> of this array are used.
42  * All other indices contain the null reference.
43  * This array represent a balanced binary tree.
44  *
45  * If <code>size</code> is <code>0</code> the tree is empty, otherwise
46  * the root of the tree is at index <code>1</code>.
47  *
48  * Given an arbitrary node at index <code>n</code> that is not the root
49  * node, the parent node of <code>n</code> is at index <code>n/2</code>.
50  *
51  * Given an arbitrary node at index <code>n</code>; if
52  * <code>2*n <= size</code> the node at <code>n</code> has its left child
53  * at index <code>2*n</code>, otherwise the node at <code>n</code> has
54  * no left child.
55  *
56  * Given an arbitrary node at index <code>n</code>; if
57  * <code>2*n+1 <= size</code> the node at <code>n</code> has its right child
58  * at index <code>2*n+1</code>, otherwise the node at <code>n</code> has
59  * no right child.
60  *
61  * The priority function is called T. Given a node <code>n</code>,
62  * <code>T(n)</code> denotes the absolute time (in milliseconds since
63  * the epoch) that the timeout for node <code>n</code> should happen.
64  * Smaller values of <code>T</code> means higher priority.
65  *
66  * The tree satisfies the following invariant:
67  * <i>
68  * For any node <code>n</code> in the tree:
69  * If node <code>n</code> has a left child <code>l</code>,
70  * <code>T(n) <= T(l)</code>.
71  * If node <code>n</code> has a right child <code>r</code>,
72  * <code>T(n) <= T(r)</code>.
73  * </i>
74  *
75  *
76  * The invariant may be temporarily broken while executing synchronized
77  * on <code>this</code> instance, but is always reestablished before
78  * leaving the synchronized code.
79  *
80  * The node at index <code>1</code> is always the first node to timeout,
81  * as can be deduced from the invariant.
82  *
83  * For the following algorithm pseudocode, the operation
84  * <code>swap(n,m)</code> denotes the exchange of the nodes at indices
85  * <code>n</code> and <code>m</code> in the tree.
86  *
87  * Insertion of a new node happend as follows:
88  * <pre>
89  * IF size = q.length THEN
90  * "expand q array to be larger";
91  * ENDIF
92  * size <- size + 1;
93  * q[size] <- "new node";
94  * n <- size;
95  * WHILE n > 1 AND T(n/2) > T(n) DO
96  * swap(n/2, n);
97  * n <- n/2;
98  * ENDWHILE
99  * </pre>
100  * Proof that this insertion algorithm respects the invariant is left to
101  * the interested reader.
102  *
103  * The removal algorithm is a bit more complicated. To remove the node
104  * at index <code>n</code>:
105  * <pre>
106  * swap(n, size);
107  * size <- size - 1;
108  * IF n > 1 AND T(n/2) > T(n) THEN
109  * WHILE n > 1 AND T(n/2) > T(n) DO
110  * swap(n/2, n);
111  * n <- n/2;
112  * ENDWHILE
113  * ELSE
114  * WHILE 2*n <= size DO
115  * IF 2*n+1 <= size THEN
116  * // Both children present
117  * IF T(2*n) <= T(2*n+1) THEN
118  * IF T(n) <= T(2*n) THEN
119  * EXIT;
120  * ENDIF
121  * swap(n, 2*n);
122  * n <- 2*n;
123  * ELSE
124  * IF T(n) <= T(2*n+1) THEN
125  * EXIT;
126  * ENDIF
127  * swap(n, 2*n+1);
128  * n <- 2*n+1;
129  * ENDIF
130  * ELSE
131  * // Only left child, right child not present.
132  * IF T(n) <= T(2*n) THEN
133  * EXIT;
134  * ENDIF
135  * swap(n, 2*n);
136  * n <- 2*n;
137  * ENDIF
138  * ENDWHILE
139  * ENDIF
140  * </pre>
141  * Proof that this removal algorithm respects the invariant is left to
142  * the interested reader. Really, I am not going to prove it here.
143  *
144  * If you are interested, you can find this data structure and its
145  * associated operations in most textbooks on algorithmics.
146  *
147  * @see checkTree
148  *
149  * @author <a HREF="osh@sparre.dk">Ole Husgaard</a>
150  * @author <a HREF="dimitris@jboss.org">Dimitris Andreadis</a>
151  * @author <a HREF="genman@maison-otaku.net">Elias Ross</a>
152  * @author <a HREF="adrian@jboss.com">Adrian Brock</a>
153  * @version $Revision: 1958 $
154  */

155 public class HashedTimeoutPriorityQueueImpl implements TimeoutPriorityQueue
156 {
157    // Code commented out with the mark "INV:" are runtime checks
158
// of invariants that are not needed for a production system.
159
// For problem solving, you can remove these comments.
160
// Multithreading notes:
161
//
162
// While a TimeoutImpl is enqueued, its index field contains the index
163
// of the instance in the queue; that is, for 1 <= n <= size,
164
// q[n].index = n.
165
// Modifications of an enqueued TimeoutImpl instance may only happen
166
// in code synchronized on the TimeoutFactory instance that has it
167
// enqueued.
168
// Modifications on the priority queue may only happen while running in
169
// code synchronized on the TimeoutFactory instance that holds the queue.
170
// When a TimeoutImpl instance is no longer enqueued, its index field
171
// changes to one of the negative constants declared in the TimeoutImpl
172
// class.
173
//
174
// Cancellation may race with the timeout.
175
// To avoid problems with this, the TimeoutImpl index field is set to
176
// TimeoutImpl.TIMEOUT when the TimeoutImpl is taken out of the queue.
177
// Finally the index field is set to TimeoutImpl.DONE, and
178
// the TimeoutImpl instance is discarded.
179

180    /** The lock object */
181    private Object JavaDoc topLock = new Object JavaDoc();
182
183    /** The top element */
184    private TimeoutExtImpl top;
185    
186    /** The hashed queues */
187    private InternalPriorityQueue[] queues;
188
189    private SynchronizedBoolean cancelled = new SynchronizedBoolean(false);
190    
191    /**
192     * Create a new TimeoutPriorityQueueImpl.
193     */

194    public HashedTimeoutPriorityQueueImpl()
195    {
196       queues = new InternalPriorityQueue[40];
197       for (int i = 0; i < queues.length; ++ i)
198          queues[i] = new InternalPriorityQueue();
199    }
200
201    public TimeoutExt offer(long time, TimeoutTarget target)
202    {
203       if (cancelled.get())
204          throw new IllegalStateException JavaDoc("TimeoutPriorityQueue has been cancelled");
205       if (time < 0)
206          throw new IllegalArgumentException JavaDoc("Negative time");
207       if (target == null)
208          throw new IllegalArgumentException JavaDoc("Null timeout target");
209
210       TimeoutExtImpl timeout = new TimeoutExtImpl();
211       timeout.time = time;
212       timeout.target = target;
213       int index = timeout.hashCode() % queues.length;
214       return queues[index].offer(timeout);
215    }
216
217    public TimeoutExt take()
218    {
219       return poll(-1);
220    }
221
222    public TimeoutExt poll()
223    {
224       return poll(1);
225    }
226    
227    public TimeoutExt poll(long wait)
228    {
229       long endWait = -1;
230       if (wait > 0)
231          endWait = System.currentTimeMillis() + wait;
232       // Look for work
233
synchronized (topLock)
234       {
235          while (cancelled.get() == false && (wait >= 0 || endWait == -1))
236          {
237             if (top == null)
238             {
239                try
240                {
241                   if (endWait == -1)
242                      topLock.wait();
243                   else
244                      topLock.wait(wait);
245                }
246                catch (InterruptedException JavaDoc ex)
247                {
248                }
249             }
250             else
251             {
252                long now = System.currentTimeMillis();
253                if (top.time > now)
254                {
255                   long waitForFirst = top.time - now;
256                   if (endWait != -1 && waitForFirst > wait)
257                      waitForFirst = wait;
258                   try
259                   {
260                      topLock.wait(waitForFirst);
261                   }
262                   catch (InterruptedException JavaDoc ex)
263                   {
264                   }
265                }
266                if (cancelled.get() == false && top != null && top.time <= System.currentTimeMillis())
267                {
268                   TimeoutExtImpl result = top;
269                   result.queue = null;
270                   result.index = TimeoutExtImpl.TIMEOUT;
271                   top = null;
272                   recalculateTop(false);
273                   return result;
274                }
275             }
276             if (endWait != -1)
277                wait = endWait - System.currentTimeMillis();
278          }
279       }
280       return null;
281    }
282    
283    public TimeoutExt peek()
284    {
285       synchronized (topLock)
286       {
287          return top;
288       }
289    }
290
291    public boolean remove(TimeoutExt timeout)
292    {
293       TimeoutExtImpl timeoutImpl = (TimeoutExtImpl) timeout;
294       
295       // Fast way of doing it
296
InternalPriorityQueue queue = timeoutImpl.queue;
297       if (queue != null && queue.remove(timeoutImpl))
298          return true;
299
300       synchronized (topLock)
301       {
302          // Check the top element
303
if (top == timeout)
304          {
305             top.done();
306             top = null;
307             recalculateTop(true);
308             return true;
309          }
310          
311          // Double check, it might have been the top then
312
// just got moved back into the queue
313
queue = timeoutImpl.queue;
314          if (queue != null)
315             return queue.remove(timeoutImpl);
316       }
317       return false;
318    }
319
320    public void clear()
321    {
322       synchronized (topLock)
323       {
324          if (cancelled.get())
325             return;
326          
327          // cleanup queues
328
for (int i = 1; i < queues.length; ++i)
329             queues[i].clear();
330
331          // cleanup the top
332
top = cleanupTimeoutExtImpl(top);
333       }
334    }
335
336    public void cancel()
337    {
338       synchronized (topLock)
339       {
340          if (cancelled.get())
341             return;
342
343          clear();
344          topLock.notifyAll();
345       }
346    }
347
348    public int size()
349    {
350       int size = 0;
351       if (top != null)
352          size =1;
353       for (int i = 0; i < queues.length; ++i)
354          size += queues[i].size();
355       return size;
356    }
357    
358    /**
359     * Whether the queue is cancelled
360     *
361     * @return true when cancelled
362     */

363    public boolean isCancelled()
364    {
365       return cancelled.get();
366    }
367
368    private void recalculateTop(boolean notify)
369    {
370       for (int i = 0; i < queues.length; ++i)
371          queues[i].compareAndSwapWithTop(notify);
372    }
373
374    /**
375     * Recursive cleanup of a TimeoutImpl
376     *
377     * @return null
378     */

379    private TimeoutExtImpl cleanupTimeoutExtImpl(TimeoutExtImpl timeout)
380    {
381       if (timeout != null)
382          timeout.target = null;
383       return null;
384    }
385
386    /**
387     * Debugging helper.
388     */

389    private void assertExpr(boolean expr)
390    {
391       if (!expr)
392          throw new IllegalStateException JavaDoc("***** assert failed *****");
393    }
394
395    /**
396     * Internal priority queue
397     */

398    private class InternalPriorityQueue
399    {
400       /** The lock object */
401       private Object JavaDoc lock = new Object JavaDoc();
402
403       /** The size of the timeout queue. */
404       private int size;
405
406       /** The timeouts */
407       private TimeoutExtImpl[] queue;
408
409       /**
410        * Create a new InternalPriorityQueue.
411        */

412       InternalPriorityQueue()
413       {
414          queue = new TimeoutExtImpl[16];
415          size = 0;
416       }
417
418       TimeoutExt offer(TimeoutExtImpl timeout)
419       {
420          boolean checkTop = false;
421          synchronized (lock)
422          {
423             // INV: checkTree();
424
// INV: assertExpr(size < queue.length);
425
if (++size == queue.length)
426             {
427                TimeoutExtImpl[] newQ = new TimeoutExtImpl[2 * queue.length];
428                System.arraycopy(queue, 0, newQ, 0, queue.length);
429                queue = newQ;
430             }
431             // INV: assertExpr(size < queue.length);
432
// INV: assertExpr(queue[size] == null);
433
queue[size] = timeout;
434             timeout.queue = this;
435             timeout.index = size;
436             normalizeUp(size);
437             if (timeout.index == 1)
438                checkTop = true;
439             // INV: checkTree();
440
}
441          if (checkTop)
442          {
443             synchronized (topLock)
444             {
445                compareAndSwapWithTop(true);
446             }
447          }
448          return timeout;
449       }
450
451       boolean compareAndSwapWithTop(boolean notify)
452       {
453          synchronized (lock)
454          {
455             if (size == 0)
456                return false;
457
458             if (top == null)
459             {
460                top = removeNode(1);
461                top.queue = null;
462                top.index = TimeoutExtImpl.TOP;
463                if (notify)
464                   topLock.notify();
465                return top != null;
466             }
467             
468             if (top.time > queue[1].time)
469             {
470                TimeoutExtImpl temp = top;
471                top = queue[1];
472                top.queue = null;
473                top.index = TimeoutExtImpl.TOP;
474                queue[1] = temp;
475                temp.queue = this;
476                temp.index = 1;
477                if (size > 1)
478                   normalizeDown(1);
479                if (notify)
480                   topLock.notify();
481             }
482          }
483          return false;
484       }
485       
486       boolean remove(TimeoutExt timeout)
487       {
488          synchronized (lock)
489          {
490             TimeoutExtImpl timeoutImpl = (TimeoutExtImpl) timeout;
491             if (timeoutImpl.queue == this && timeoutImpl.index > 0)
492             {
493                // Active timeout, remove it.
494
// INV: assertExpr(queue[timeoutImpl.index] == timeout);
495
// INV: checkTree();
496
removeNode(timeoutImpl.index);
497                // INV: checkTree();
498
timeoutImpl.queue = null;
499                timeoutImpl.index = TimeoutExtImpl.DONE;
500
501                // execution cancelled
502
return true;
503             }
504             else
505             {
506                // has already been executed (DONE) or
507
// is currently executing (TIMEOUT)
508
return false;
509             }
510          }
511       }
512
513       public void clear()
514       {
515          synchronized (lock)
516          {
517             if (cancelled.get())
518                return;
519
520             // cleanup queue
521
for (int i = 1; i <= size; ++i)
522                queue[i] = cleanupTimeoutExtImpl(queue[i]);
523          }
524       }
525
526       public void cancel()
527       {
528          synchronized (lock)
529          {
530             if (cancelled.get())
531                return;
532             clear();
533          }
534       }
535
536       public int size()
537       {
538          return size;
539       }
540
541       /**
542        * A new node has been added at index <code>index</code>.
543        * Normalize the tree by moving the new node up the tree.
544        *
545        * @return true if the tree was modified.
546        */

547       private boolean normalizeUp(int index)
548       {
549          // INV: assertExpr(index > 0);
550
// INV: assertExpr(index <= size);
551
// INV: assertExpr(queue[index] != null);
552
if (index == 1)
553             return false; // at root
554
boolean ret = false;
555          long t = queue[index].time;
556          int p = index >> 1;
557          while (queue[p].time > t)
558          {
559             // INV: assertExpr(queue[index].time == t);
560
swap(p, index);
561             ret = true;
562             if (p == 1)
563                break; // at root
564
index = p;
565             p >>= 1;
566          }
567          return ret;
568       }
569       
570       void normalizeDown(int index)
571       {
572          long t = queue[index].time;
573          int c = index << 1;
574          while (c <= size)
575          {
576             // INV: assertExpr(q[index].time == t);
577
TimeoutExtImpl l = queue[c];
578             // INV: assertExpr(l != null);
579
// INV: assertExpr(l.index == c);
580
if (c + 1 <= size)
581             {
582                // two children, swap with smallest
583
TimeoutExtImpl r = queue[c + 1];
584                // INV: assertExpr(r != null);
585
// INV: assertExpr(r.index == c+1);
586
if (l.time <= r.time)
587                {
588                   if (t <= l.time)
589                      break; // done
590
swap(index, c);
591                   index = c;
592                }
593                else
594                {
595                   if (t <= r.time)
596                      break; // done
597
swap(index, c + 1);
598                   index = c + 1;
599                }
600             }
601             else
602             { // one child
603
if (t <= l.time)
604                   break; // done
605
swap(index, c);
606                index = c;
607             }
608             c = index << 1;
609          }
610       }
611
612       /**
613        * Swap two nodes in the tree.
614        *
615        * @param a the first index
616        * @param b the second index
617        */

618       private void swap(int a, int b)
619       {
620          // INV: assertExpr(a > 0);
621
// INV: assertExpr(a <= size);
622
// INV: assertExpr(b > 0);
623
// INV: assertExpr(b <= size);
624
// INV: assertExpr(queue[a] != null);
625
// INV: assertExpr(queue[b] != null);
626
// INV: assertExpr(queue[a].index == a);
627
// INV: assertExpr(queue[b].index == b);
628
TimeoutExtImpl temp = queue[a];
629          queue[a] = queue[b];
630          queue[a].index = a;
631          queue[b] = temp;
632          queue[b].index = b;
633       }
634
635       /**
636        * Remove a node from the tree and normalize.
637        *
638        * @param index the index in the queue
639        * @return the removed node.
640        */

641       private TimeoutExtImpl removeNode(int index)
642       {
643          // INV: assertExpr(index > 0);
644
// INV: assertExpr(index <= size);
645
TimeoutExtImpl res = queue[index];
646          // INV: assertExpr(res != null);
647
// INV: assertExpr(res.index == index);
648
if (index == size)
649          {
650             --size;
651             queue[index] = null;
652             return res;
653          }
654          swap(index, size); // Exchange removed node with last leaf node
655
--size;
656          // INV: assertExpr(res.index == size + 1);
657
queue[res.index] = null;
658          if (normalizeUp(index))
659             return res; // Node moved up, so it shouldn't move down
660
normalizeDown(index);
661          return res;
662       }
663
664       /**
665        * Check invariants of the queue.
666        */

667       void checkTree()
668       {
669          assertExpr(size >= 0);
670          assertExpr(size < queue.length);
671          assertExpr(queue[0] == null);
672          if (size > 0)
673          {
674             assertExpr(queue[1] != null);
675             assertExpr(queue[1].index == 1);
676             assertExpr(queue[1].queue == this);
677             for (int i = 2; i <= size; ++i)
678             {
679                assertExpr(queue[i] != null);
680                assertExpr(queue[i].index == i);
681                assertExpr(queue[i].queue == this);
682                assertExpr(queue[i >> 1].time <= queue[i].time); // parent fires first
683
}
684             for (int i = size + 1; i < queue.length; ++i)
685                assertExpr(queue[i] == null);
686          }
687       }
688       
689    }
690    
691    /**
692     * Our private Timeout implementation.
693     */

694    private class TimeoutExtImpl implements TimeoutExt
695    {
696       /** Top */
697       static final int TOP = 0;
698
699       /** Done */
700       static final int DONE = -1;
701
702       /** In timeout */
703       static final int TIMEOUT = -2;
704
705       /** The internal priority queue */
706       InternalPriorityQueue queue;
707       
708       /** Index in the queue */
709       int index;
710
711       /** Time of the timeout */
712       long time;
713
714       /** The timeout target */
715       TimeoutTarget target;
716
717       public long getTime()
718       {
719          return time;
720       }
721
722       public TimeoutTarget getTimeoutTarget()
723       {
724          return target;
725       }
726
727       public void done()
728       {
729          queue = null;
730          index = DONE;
731       }
732       
733       public boolean cancel()
734       {
735          return remove(this);
736       }
737    }
738    
739    public String JavaDoc dump()
740    {
741       JBossStringBuilder buffer = new JBossStringBuilder();
742       buffer.append("TOP=");
743       if (top == null)
744          buffer.append("null");
745       else
746          buffer.append(top.time);
747       buffer.append(" size=").append(size()).append('\n');
748       for (int i = 0; i < queues.length; ++i)
749       {
750          buffer.append(i).append("=");
751          for (int j = 1; j <= queues[i].size; ++j)
752             buffer.append(queues[i].queue[j].time).append(',');
753          buffer.append('\n');
754       }
755       return buffer.toString();
756    }
757 }
758
Popular Tags