KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tctest > QueueMultiplexer


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tctest;
5
6 import com.tctest.util.ThreadUtil;
7
8 import java.util.ArrayList JavaDoc;
9 import java.util.Iterator JavaDoc;
10 import java.util.List JavaDoc;
11 import java.util.concurrent.BlockingQueue JavaDoc;
12 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
13
14 public class QueueMultiplexer
15 {
16     private static final int MAX_BUFFER = 10;
17     
18     private transient BlockingQueue JavaDoc<Object JavaDoc> inputQueue;
19
20     private List JavaDoc<BlockingQueue JavaDoc<Object JavaDoc>> outputQueues = new ArrayList JavaDoc<BlockingQueue JavaDoc<Object JavaDoc>>();
21     
22     public BlockingQueue JavaDoc<Object JavaDoc> getNewOutputQueue()
23     {
24         synchronized (outputQueues) {
25             BlockingQueue JavaDoc<Object JavaDoc> q = new LinkedBlockingQueue JavaDoc<Object JavaDoc>();
26             outputQueues.add(q);
27             outputQueues.notify();
28             return q;
29         }
30     }
31     
32     private class OutputQueueWriter implements Runnable JavaDoc
33     {
34         private BlockingQueue JavaDoc<Object JavaDoc> outputQueue;
35         
36         public OutputQueueWriter(BlockingQueue JavaDoc<Object JavaDoc> outputQueue)
37         {
38             this.outputQueue = outputQueue;
39         }
40         
41         public void run()
42         {
43             ArrayList JavaDoc<Object JavaDoc> l = new ArrayList JavaDoc<Object JavaDoc>(MAX_BUFFER);
44             
45             while (true) {
46                 try {
47                     l.clear();
48                     Object JavaDoc item = inputQueue.take();
49                     inputQueue.drainTo(l, MAX_BUFFER-1);
50                     l.add(0, item);
51                     outputQueue.addAll(l);
52                 } catch (InterruptedException JavaDoc ie) {
53                     //
54
}
55             }
56         }
57     }
58     
59     public void putAll(Object JavaDoc item)
60     {
61         synchronized (outputQueues) {
62             for (Iterator JavaDoc<BlockingQueue JavaDoc<Object JavaDoc>> i = outputQueues.iterator(); i.hasNext();) {
63                 BlockingQueue JavaDoc<Object JavaDoc> queue = i.next();
64                 queue.offer(item);
65             }
66         }
67     }
68
69     private class WaitForReaders implements Runnable JavaDoc
70     {
71         public void run()
72         {
73            int readers = 0;
74            
75            System.out.println("Waiting for readers...");
76            while (true) {
77                BlockingQueue JavaDoc<Object JavaDoc> q = null;
78                
79                synchronized (outputQueues) {
80                    while (outputQueues.size() == readers) {
81                        try {
82                            outputQueues.wait();
83                        } catch (InterruptedException JavaDoc ie) {
84                            //
85
}
86                    }
87                    // an output queue was added, so spin up a thread
88
// to write to it
89
q = outputQueues.get(readers++);
90                }
91                
92                ThreadUtil.startDaemonThread(new OutputQueueWriter(q));
93                System.out.println("Started queue reader");
94            }
95         }
96     }
97
98     public void start(BlockingQueue JavaDoc<Object JavaDoc> queue)
99     {
100         synchronized (this) {
101             this.inputQueue = queue;
102         }
103         ThreadUtil.startDaemonThread(new WaitForReaders());
104     }
105 }
106
Popular Tags