1 package org.jgroups.util; 2 3 import org.jgroups.Address; 4 5 import java.util.LinkedList ; 6 import java.util.Map ; 7 import java.util.concurrent.*; 8 import java.util.concurrent.atomic.AtomicInteger ; 9 10 19 public class FIFOMessageQueue<K, V> { 20 21 final BlockingQueue<V> queue=new LinkedBlockingQueue<V>(); 22 23 25 final ConcurrentMap<Address,ConcurrentMap<K,Entry<V>>> queues=new ConcurrentHashMap<Address,ConcurrentMap<K,Entry<V>>>(); 26 27 private final AtomicInteger size=new AtomicInteger (0); 28 29 30 public V take() throws InterruptedException { 31 V retval=queue.take(); 32 if(retval != null) 33 size.decrementAndGet(); 34 return retval; 35 } 36 37 38 public V poll(long timeout) throws InterruptedException { 39 V retval=queue.poll(timeout, TimeUnit.MILLISECONDS); 40 if(retval != null) 41 size.decrementAndGet(); 42 return retval; 43 } 44 45 46 47 public void put(Address sender, K dest, V el) throws InterruptedException { 48 if(sender == null) { 49 size.incrementAndGet(); 50 queue.add(el); 51 return; 52 } 53 54 ConcurrentMap<K,Entry<V>> dests=queues.get(sender); 55 if(dests == null) { 56 dests=new ConcurrentHashMap<K,Entry<V>>(); 57 if(queues.putIfAbsent(sender, dests) != null) dests=queues.get(sender); 59 } 60 61 Entry<V> entry=dests.get(dest); 62 if(entry == null) { 63 entry=new Entry<V>(); 64 if(dests.putIfAbsent(dest, entry) != null) 65 entry=dests.get(dest); 66 } 67 68 synchronized(entry) { 69 size.incrementAndGet(); 70 if(entry.ready) { 71 entry.ready=false; 72 queue.add(el); 73 } 74 else { 75 entry.list.add(el); 76 } 77 } 78 } 79 80 81 public void done(Address sender, K dest) { 82 if(sender == null) 83 return; 84 Map <K,Entry<V>> dests=queues.get(sender); 85 if(dests == null) return; 86 87 Entry<V> entry=dests.get(dest); 88 if(entry != null) { 89 V el=null; 90 synchronized(entry) { 91 if(!entry.list.isEmpty()) { 92 el=entry.list.removeFirst(); 93 queue.add(el); 94 } 95 else { 96 entry.ready=true; 97 } 98 } 99 } 100 } 101 102 public int size() { 103 return size.get(); 104 } 105 106 public String toString() { 107 StringBuilder sb=new StringBuilder (); 108 sb.append("queue: ").append(queue).append("\nqueues:\n"); 109 for(ConcurrentMap.Entry<Address,ConcurrentMap<K,Entry<V>>> entry: queues.entrySet()) { 110 sb.append("sender ").append(entry.getKey()).append(":\n"); 111 for(Map.Entry <K,Entry<V>> entry2: entry.getValue().entrySet()) { 112 sb.append(entry2.getKey()).append(": ").append(entry2.getValue().list).append("\n"); 113 } 114 } 115 return sb.toString(); 116 } 117 118 119 120 static class Entry<T> { 121 boolean ready=true; 122 LinkedList <T> list=new LinkedList <T>(); 123 } 124 125 } 126 | Popular Tags |