KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > FIFOMessageQueue


1 package org.jgroups.util;
2
3 import org.jgroups.Address;
4
5 import java.util.LinkedList JavaDoc;
6 import java.util.Map JavaDoc;
7 import java.util.concurrent.*;
8 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
9
10 /**
11  * Blocking queue which can only process 1 message per service concurrently, establishing FIFO order per sender. Example:
12  * if message A1, A2, A3, B1, B2 (where A and B are service names for services on top of a Multiplexer) arrive at the
13  * same time, then this class will deliver A1 and B1 concurrently (ie. pass them up to the thread pool for processing).
14  * Only when A1 is done will A2 be processed, same for B2: it will get processed when B1 is done. Thus, messages
15  * for different services are processed concurrently; messages from the same service are processed FIFO.
16  * @author Bela Ban
17  * @version $Id: FIFOMessageQueue.java,v 1.8 2007/06/29 10:57:40 belaban Exp $
18  */

19 public class FIFOMessageQueue<K, V> {
20     /** Used for consolidated takes */
21     final BlockingQueue<V> queue=new LinkedBlockingQueue<V>();
22
23     /** One queue per sender and destination. This is a two level hashmap, with sender's addresses as keys and hashmaps
24      * as values. Those hashmaps have destinations (K) as keys and Entries (list of Vs) as values */

25     final ConcurrentMap<Address,ConcurrentMap<K,Entry<V>>> queues=new ConcurrentHashMap<Address,ConcurrentMap<K,Entry<V>>>();
26
27     private final AtomicInteger JavaDoc size=new AtomicInteger JavaDoc(0);
28
29
30     public V take() throws InterruptedException JavaDoc {
31         V retval=queue.take();
32         if(retval != null)
33             size.decrementAndGet();
34         return retval;
35     }
36
37
38     public V poll(long timeout) throws InterruptedException JavaDoc {
39         V retval=queue.poll(timeout, TimeUnit.MILLISECONDS);
40         if(retval != null)
41             size.decrementAndGet();
42         return retval;
43     }
44
45
46
47     public void put(Address sender, K dest, V el) throws InterruptedException JavaDoc {
48         if(sender == null) {
49             size.incrementAndGet();
50             queue.add(el);
51             return;
52         }
53
54         ConcurrentMap<K,Entry<V>> dests=queues.get(sender);
55         if(dests == null) {
56             dests=new ConcurrentHashMap<K,Entry<V>>();
57             if(queues.putIfAbsent(sender, dests) != null) // already existed (someone else inserted the key/value mapping)
58
dests=queues.get(sender);
59         }
60
61         Entry<V> entry=dests.get(dest);
62         if(entry == null) {
63             entry=new Entry<V>();
64             if(dests.putIfAbsent(dest, entry) != null)
65                 entry=dests.get(dest);
66         }
67
68         synchronized(entry) {
69             size.incrementAndGet();
70             if(entry.ready) {
71                 entry.ready=false;
72                 queue.add(el);
73             }
74             else {
75                 entry.list.add(el);
76             }
77         }
78     }
79
80
81     public void done(Address sender, K dest) {
82         if(sender == null)
83             return;
84         Map JavaDoc<K,Entry<V>> dests=queues.get(sender);
85         if(dests == null) return;
86
87         Entry<V> entry=dests.get(dest);
88         if(entry != null) {
89             V el=null;
90             synchronized(entry) {
91                 if(!entry.list.isEmpty()) {
92                     el=entry.list.removeFirst();
93                     queue.add(el);
94                 }
95                 else {
96                     entry.ready=true;
97                 }
98             }
99         }
100     }
101
102     public int size() {
103         return size.get();
104     }
105
106     public String JavaDoc toString() {
107         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
108         sb.append("queue: ").append(queue).append("\nqueues:\n");
109         for(ConcurrentMap.Entry<Address,ConcurrentMap<K,Entry<V>>> entry: queues.entrySet()) {
110             sb.append("sender ").append(entry.getKey()).append(":\n");
111             for(Map.Entry JavaDoc<K,Entry<V>> entry2: entry.getValue().entrySet()) {
112                 sb.append(entry2.getKey()).append(": ").append(entry2.getValue().list).append("\n");
113             }
114         }
115         return sb.toString();
116     }
117
118
119
120     static class Entry<T> {
121         boolean ready=true;
122         LinkedList JavaDoc<T> list=new LinkedList JavaDoc<T>();
123     }
124     
125 }
126
Popular Tags