KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > impl > SimpleDeliveryManager


1 package com.ubermq.jms.client.impl;
2
3 import com.ubermq.jms.client.*;
4 import com.ubermq.kernel.*;
5 import com.ubermq.util.*;
6
7 import java.util.*;
8 import javax.jms.Message JavaDoc;
9 import EDU.oswego.cs.dl.util.concurrent.*;
10
11 /**
12  * A simple delivery manager implementation that performs reordering
13  * and de-duplication on behalf of a subscriber.
14  */

15 public class SimpleDeliveryManager
16     implements IDeliveryManager
17 {
18     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SimpleDeliveryManager.class);
19     
20     /**
21      * A map from senderId -> sender state information
22      */

23     private final HashMap senders;
24
25     private static final int SENDERS_INITIAL_MAP_SIZE = 100;
26
27     /**
28      * A clock daemon thread that will periodically flush the sender queues.
29      * we do this every 750 ms.
30      */

31     private static ClockDaemon cd = new ClockDaemon();
32     private static final int UNSEQUENCED_FLUSH_INTERVAL =
33         Integer.valueOf(Configurator.getProperty(ClientConfig.SUB_UNSEQUENCED_FLUSH_TIMEOUT, "750")).intValue();
34     static {
35         cd.setThreadFactory(new ThreadFactory() {
36                     public Thread JavaDoc newThread(Runnable JavaDoc command)
37                     {
38                         Thread JavaDoc t = new Thread JavaDoc(command, "SimpleDeliveryManager Resequencing Thread");
39                         t.setDaemon(true);
40                         return t;
41                     }
42                 });
43     }
44
45     /**
46      * Represents a sender and the last message sequence number we have seen
47      * from that sender.
48      */

49     private final class SimpleSenderState
50     {
51         private long lastSequence; // the last known sequence number
52
private TreeSet outgoingQueue; // the queue of outgoing messages.
53

54         SimpleSenderState(long initialSequence)
55         {
56             lastSequence = initialSequence;
57             outgoingQueue = new TreeSet();
58         }
59
60         long getLastSequence() {return lastSequence;}
61
62         synchronized void deliver(long sequence,
63                                   Message JavaDoc msg,
64                                   IMessageSender s)
65         {
66             if (sequence == lastSequence + 1)
67                 sendImmediately(sequence, msg, s);
68             else if (sequence <= lastSequence)
69                 drop();
70             else // sequence > lastSequence + 1
71
{
72                 enqueue(sequence, msg, s);
73             }
74         }
75
76         private void sendImmediately(long sequence, Message JavaDoc msg, IMessageSender s)
77         {
78             // deliver the message and update the last known sequence #
79
s.sendMessage(msg);
80             lastSequence = sequence;
81
82             // maybe flush enqueued items after this one.
83
flush();
84         }
85
86         private void drop()
87         {
88         }
89
90         private void enqueue(long sequence, Message JavaDoc msg, IMessageSender s)
91         {
92             outgoingQueue.add(new OrderedMessage(sequence, msg, s));
93         }
94
95         private void flush()
96         {
97             long nextSequence = lastSequence + 1;
98             OrderedMessage om;
99
100             while(outgoingQueue.size() > 0 &&
101                       (om = (OrderedMessage)outgoingQueue.first()).sequence == nextSequence)
102             {
103                 outgoingQueue.remove(om);
104                 om.send();
105
106                 log.debug("flushing out of order, sequence=" + nextSequence);
107
108                 lastSequence++;
109                 nextSequence++;
110             }
111
112         }
113
114         final synchronized void flushQueue()
115         {
116             if (outgoingQueue.size() == 0) return;
117
118             lastSequence = ((OrderedMessage)outgoingQueue.first()).sequence - 1;
119             flush();
120         }
121     }
122
123     private final class OrderedMessage
124         implements Comparable JavaDoc
125     {
126         private long sequence;
127         private Message JavaDoc payload;
128         private IMessageSender sender;
129
130         OrderedMessage(long sequence, Message JavaDoc payload, IMessageSender sender)
131         {
132             this.sequence = sequence;
133             this.payload = payload;
134             this.sender = sender;
135         }
136
137         public int compareTo(Object JavaDoc o)
138         {
139             OrderedMessage om = (OrderedMessage)o;
140
141             if (om.sequence == sequence) return 0;
142             else return (sequence > om.sequence) ? 1 : -1;
143         }
144
145         public boolean equals(Object JavaDoc o)
146         {
147             OrderedMessage om = (OrderedMessage)o;
148             return om.sequence == sequence;
149         }
150
151         public int hashCode()
152         {
153             return (int)(sequence ^ (sequence >>> 32));
154         }
155
156         public long getSequence() {return sequence;}
157         public void send() {sender.sendMessage(payload);}
158     }
159
160     /**
161      * construct a simple delivery manager,
162      * capable of handling multiple senders.
163      */

164     public SimpleDeliveryManager()
165     {
166         senders = new HashMap(SENDERS_INITIAL_MAP_SIZE);
167         cd.executePeriodically(UNSEQUENCED_FLUSH_INTERVAL,
168                                new Runnable JavaDoc() {
169                     public void run()
170                     {
171                         synchronized(senders)
172                         {
173                             Iterator iter = senders.values().iterator();
174                             while (iter.hasNext())
175                             {
176                                 SimpleSenderState element = (SimpleSenderState)iter.next();
177                                 element.flushQueue();
178                             }
179                         }
180                     }
181                 }, false);
182     }
183
184     /**
185      * request delivery of the message at some time in the future.
186      * at this point, the caller can forget about the message and
187      * assume the message sender specified may be called back
188      * at some future time. It is up to the implementation whether
189      * the object will be called back at all.
190      * <P>
191      * we track the last sequence sent for a given sender, L. the incoming
192      * sequence is S
193      * 1. if S = L + 1, the message is in order. deliver immediately
194      * 2. if S > L + 1, the message is out of order. enqueue.
195      * 3. if S = L, the message is a dup.
196      * 4. if S < L, the message may be a dup or may be filling a gap.
197      */

198     public void deliver(long senderId,
199                         long sequence,
200                         Message JavaDoc msg,
201                         IMessageSender s)
202     {
203         SimpleSenderState senderState = ((SimpleSenderState)senders.get(new Long JavaDoc(senderId)));
204         if (senderState == null) {
205             senderState = new SimpleSenderState(sequence - 1); // allow this incoming msg to go out
206
synchronized(senders)
207             {
208                 senders.put(new Long JavaDoc(senderId),
209                             senderState);
210             }
211         }
212
213         senderState.deliver(sequence,
214                             msg,
215                             s);
216     }
217
218     public IDeliveryManager newInstance()
219     {
220         return new SimpleDeliveryManager();
221     }
222
223
224 }
225
Popular Tags