1 4 package com.tctest; 5 6 import com.tctest.util.ThreadUtil; 7 8 import java.util.ArrayList ; 9 import java.util.Iterator ; 10 import java.util.List ; 11 import java.util.concurrent.BlockingQueue ; 12 import java.util.concurrent.LinkedBlockingQueue ; 13 14 public class QueueMultiplexer 15 { 16 private static final int MAX_BUFFER = 10; 17 18 private transient BlockingQueue <Object > inputQueue; 19 20 private List <BlockingQueue <Object >> outputQueues = new ArrayList <BlockingQueue <Object >>(); 21 22 public BlockingQueue <Object > getNewOutputQueue() 23 { 24 synchronized (outputQueues) { 25 BlockingQueue <Object > q = new LinkedBlockingQueue <Object >(); 26 outputQueues.add(q); 27 outputQueues.notify(); 28 return q; 29 } 30 } 31 32 private class OutputQueueWriter implements Runnable 33 { 34 private BlockingQueue <Object > outputQueue; 35 36 public OutputQueueWriter(BlockingQueue <Object > outputQueue) 37 { 38 this.outputQueue = outputQueue; 39 } 40 41 public void run() 42 { 43 ArrayList <Object > l = new ArrayList <Object >(MAX_BUFFER); 44 45 while (true) { 46 try { 47 l.clear(); 48 Object item = inputQueue.take(); 49 inputQueue.drainTo(l, MAX_BUFFER-1); 50 l.add(0, item); 51 outputQueue.addAll(l); 52 } catch (InterruptedException ie) { 53 } 55 } 56 } 57 } 58 59 public void putAll(Object item) 60 { 61 synchronized (outputQueues) { 62 for (Iterator <BlockingQueue <Object >> i = outputQueues.iterator(); i.hasNext();) { 63 BlockingQueue <Object > queue = i.next(); 64 queue.offer(item); 65 } 66 } 67 } 68 69 private class WaitForReaders implements Runnable 70 { 71 public void run() 72 { 73 int readers = 0; 74 75 System.out.println("Waiting for readers..."); 76 while (true) { 77 BlockingQueue <Object > q = null; 78 79 synchronized (outputQueues) { 80 while (outputQueues.size() == readers) { 81 try { 82 outputQueues.wait(); 83 } catch (InterruptedException ie) { 84 } 86 } 87 q = outputQueues.get(readers++); 90 } 91 92 ThreadUtil.startDaemonThread(new OutputQueueWriter(q)); 93 System.out.println("Started queue reader"); 94 } 95 } 96 } 97 98 public void start(BlockingQueue <Object > queue) 99 { 100 synchronized (this) { 101 this.inputQueue = queue; 102 } 103 ThreadUtil.startDaemonThread(new WaitForReaders()); 104 } 105 } 106 | Popular Tags |