KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > UNICAST


1 // $Id: UNICAST.java,v 1.20 2005/04/20 13:56:02 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.AckReceiverWindow;
7 import org.jgroups.stack.AckSenderWindow;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11 import org.jgroups.util.Streamable;
12
13 import java.io.*;
14 import java.util.*;
15
16
17
18 /**
19  * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission
20  * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for
21  * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All
22  * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a
23  * message from a peer for the first time, another entry will be created and added to the hashtable
24  * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This
25  * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
26  * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The
27  * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received
28  * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno
29  * received so far, and keeps messages in order.<p>
30  * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from
31  * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow
32  * whenever a message is received: the new message is added and then we try to remove as many messages as
33  * possible (until we stop at a gap, or there are no more messages).
34  * @author Bela Ban
35  */

36 public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand {
37     boolean operational=false;
38     final Vector members=new Vector(11);
39     final HashMap connections=new HashMap(11); // Object (sender or receiver) -- Entries
40
long[] timeout={400, 800,1600,3200}; // for AckSenderWindow: max time to wait for missing acks
41
Address local_addr=null;
42     TimeScheduler timer=null; // used for retransmissions (passed to AckSenderWindow)
43

44     // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false
45
// default is true
46
boolean use_gms=true;
47     int window_size=-1; // sliding window: max number of msgs in table (disabled by default)
48
int min_threshold=-1; // num under which table has to fall before we resume adding msgs
49
final static String JavaDoc name="UNICAST";
50
51
52
53
54     class Entry {
55         AckReceiverWindow received_msgs=null; // stores all msgs rcvd by a certain peer in seqno-order
56
AckSenderWindow sent_msgs=null; // stores (and retransmits) msgs sent by us to a certain peer
57
long sent_msgs_seqno=getInitialSeqno(); // seqno for msgs sent by us
58

59
60         void reset() {
61             if(sent_msgs != null)
62                 sent_msgs.reset();
63             if(received_msgs != null)
64                 received_msgs.reset();
65         }
66
67
68         public String JavaDoc toString() {
69             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
70             if(sent_msgs != null)
71                 sb.append("sent_msgs=" + sent_msgs + '\n');
72             if(received_msgs != null)
73                 sb.append("received_msgs=" + received_msgs + '\n');
74             return sb.toString();
75         }
76     }
77
78
79
80
81     /** All protocol names have to be unique ! */
82     public String JavaDoc getName() {return name;}
83
84
85     public boolean setProperties(Properties props) {
86         String JavaDoc str;
87         long[] tmp;
88
89         super.setProperties(props);
90         str=props.getProperty("timeout");
91         if(str != null) {
92         tmp=Util.parseCommaDelimitedLongs(str);
93         if(tmp != null && tmp.length > 0)
94         timeout=tmp;
95             props.remove("timeout");
96         }
97
98         str=props.getProperty("window_size");
99         if(str != null) {
100             window_size=Integer.parseInt(str);
101             props.remove("window_size");
102         }
103
104         str=props.getProperty("min_threshold");
105         if(str != null) {
106             min_threshold=Integer.parseInt(str);
107             props.remove("min_threshold");
108         }
109
110         str=props.getProperty("use_gms");
111         if(str != null) {
112             use_gms=Boolean.valueOf(str).booleanValue();
113             props.remove("use_gms");
114         }
115
116         if(props.size() > 0) {
117             System.err.println("UNICAST.setProperties(): these properties are not recognized:");
118             props.list(System.out);
119             return false;
120         }
121
122         // Some sanity checks
123
if((window_size > 0 && min_threshold <= 0) || (window_size <= 0 && min_threshold > 0)) {
124             log.error("window_size and min_threshold have to be both set if one of them is set");
125             return false;
126         }
127         if(window_size > 0 && min_threshold > 0 && window_size < min_threshold) {
128             log.error("min_threshold (" + min_threshold + ") has to be less than window_size (" + window_size + ')');
129             return false;
130         }
131         return true;
132     }
133
134     public void start() throws Exception JavaDoc {
135         timer=stack != null ? stack.timer : null;
136         if(timer == null)
137             throw new Exception JavaDoc("UNICAST.start(): timer is null");
138     }
139
140     public void stop() {
141         removeAllConnections();
142         operational=false;
143     }
144
145
146     public void up(Event evt) {
147         Message msg;
148         Address dst, src;
149         UnicastHeader hdr;
150
151         switch(evt.getType()) {
152
153         case Event.MSG:
154             msg=(Message)evt.getArg();
155             dst=msg.getDest();
156
157             if(dst == null || dst.isMulticastAddress()) // only handle unicast messages
158
break; // pass up
159

160             hdr=(UnicastHeader)msg.removeHeader(name);
161             if(hdr == null) break;
162             src=msg.getSrc();
163             switch(hdr.type) {
164             case UnicastHeader.DATA: // received regular message
165
sendAck(src, hdr.seqno);
166                 handleDataReceived(src, hdr.seqno, hdr.first, msg);
167                 break;
168             case UnicastHeader.DATA_ACK: // received ACK for previously sent message
169
handleAckReceived(src, hdr.seqno);
170                 break;
171             default:
172                 log.error("UnicastHeader type " + hdr.type + " not known !");
173                 break;
174             }
175             return;
176
177         case Event.SET_LOCAL_ADDRESS:
178             local_addr=(Address)evt.getArg();
179             break;
180         }
181
182         passUp(evt); // Pass up to the layer above us
183
}
184
185
186
187
188
189     public void down(Event evt) {
190         Message msg;
191         Object JavaDoc dst, mbr;
192         Entry entry;
193         UnicastHeader hdr;
194
195         switch (evt.getType()) {
196
197         case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down
198
msg = (Message) evt.getArg();
199             dst = msg.getDest();
200
201             /* only handle unicast messages */
202             if (dst == null || ((Address) dst).isMulticastAddress()) {
203                 break;
204             }
205
206             synchronized(connections) {
207                 entry = (Entry) connections.get(dst);
208                 if (entry == null) {
209                     entry = new Entry();
210                     connections.put(dst, entry);
211                 }
212             }
213
214             hdr = new UnicastHeader(UnicastHeader.DATA, entry.sent_msgs_seqno);
215             if (entry.sent_msgs == null) { // first msg to peer 'dst'
216
hdr.first = true;
217                 entry.sent_msgs = new AckSenderWindow(this, timeout, this);
218                 if (window_size > 0)
219                     entry.sent_msgs.setWindowSize(window_size, min_threshold);
220             }
221             msg.putHeader(name, hdr);
222             if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> DATA(" + dst + ": #" +
223                                                entry.sent_msgs_seqno + ", first=" + hdr.first + ')');
224
225             if (Global.copy)
226                 entry.sent_msgs.add(entry.sent_msgs_seqno, msg.copy()); // add *including* UnicastHeader
227
else
228                 entry.sent_msgs.add(entry.sent_msgs_seqno, msg); // add *including* UnicastHeader
229

230             entry.sent_msgs_seqno++;
231             msg=null;
232             return; // AckSenderWindow will send message for us
233

234         case Event.BECOME_SERVER:
235             operational = true;
236             break;
237
238         case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore !
239
Vector new_members = ((View) evt.getArg()).getMembers();
240             Vector left_members;
241             synchronized (members) {
242                 left_members = Util.determineLeftMembers(members, new_members);
243                 members.removeAllElements();
244                 if (new_members != null)
245                     members.addAll(new_members);
246             }
247
248             // Remove all connections for members that left between the current view and the new view
249
// See DESIGN for details
250
if (use_gms && left_members.size() > 0) {
251                 for (int i = 0; i < left_members.size(); i++) {
252                     mbr = left_members.elementAt(i);
253                     removeConnection(mbr);
254                 }
255             }
256             break;
257         }
258
259         passDown(evt); // Pass on to the layer below us
260
}
261
262
263     /** Removes and resets from connection table (which is already locked) */
264     private void removeConnection(Object JavaDoc mbr) {
265         Entry entry;
266
267         synchronized(connections) {
268             entry=(Entry)connections.remove(mbr);
269         }
270         if(entry != null) {
271             entry.reset();
272             if(log.isTraceEnabled()) log.trace("removed " + mbr + " from connection table");
273         }
274     }
275
276
277     private void removeAllConnections() {
278         Entry entry;
279
280         synchronized(connections) {
281             for(Iterator it=connections.values().iterator(); it.hasNext();) {
282                 entry=(Entry)it.next();
283                 entry.reset();
284             }
285             connections.clear();
286         }
287     }
288
289
290
291
292     /** Returns random initial sequence number between 1 and 100 */
293     static long getInitialSeqno() {
294         long ret=(long)((Math.random() * 100) % 100);
295         return ret;
296     }
297
298
299
300     /** Called by AckSenderWindow to resend messages for which no ACK has been received yet */
301     public void retransmit(long seqno, Message msg) {
302         Object JavaDoc dst=msg.getDest();
303
304         // bela Dec 23 2002:
305
// this will remove a member on a MERGE request, e.g. A and B merge: when A sends the unicast
306
// request to B and there's a retransmit(), B will be removed !
307

308         // if(use_gms && !members.contains(dst) && !prev_members.contains(dst)) {
309
//
310
// if(log.isWarnEnabled()) log.warn("UNICAST.retransmit()", "seqno=" + seqno + ": dest " + dst +
311
// " is not member any longer; removing entry !");
312

313         // synchronized(connections) {
314
// removeConnection(dst);
315
// }
316
// return;
317
// }
318

319         if(log.isTraceEnabled())
320             log.trace("[" + local_addr + "] --> XMIT(" + dst + ": #" + seqno + ')');
321
322     if(Global.copy)
323         passDown(new Event(Event.MSG, msg.copy()));
324     else
325         passDown(new Event(Event.MSG, msg));
326     }
327
328
329
330
331
332     /**
333      * Check whether the hashtable contains an entry e for <code>sender</code> (create if not). If
334      * e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
335      * add message. Set e.received_msgs to the new window. Else just add the message. If first is false,
336      * but we don't yet have hashtable.received_msgs, then just discard the message. If first is true, but
337      * hashtable.received_msgs already exists, also discard the message (redundant message).
338      */

339     void handleDataReceived(Object JavaDoc sender, long seqno, boolean first, Message msg) {
340         Entry entry;
341         Message m;
342
343         if(log.isTraceEnabled())
344             log.trace("[" + local_addr + "] <-- DATA(" + sender + ": #" + seqno + ", first=" + first);
345
346         synchronized(connections) {
347             entry=(Entry)connections.get(sender);
348             if(entry == null) {
349                 entry=new Entry();
350                 connections.put(sender, entry);
351             }
352         }
353
354         if(entry.received_msgs == null) {
355             if(first)
356                 entry.received_msgs=new AckReceiverWindow(seqno);
357             else {
358                 if(operational) {
359                     if(log.isWarnEnabled())
360                         log.warn("[" + local_addr + "] seqno " + seqno + " from " +
361                                 sender + " is not tagged as the first message sent by " + sender +
362                                 "; however, the table for received messages from " + sender +
363                                 " is still null ! We probably haven't received the first message from "
364                                 + sender + " ! Discarding message (operational=" + operational + ')');
365                     return;
366                 }
367             }
368         }
369
370         if(entry.received_msgs != null) {
371             entry.received_msgs.add(seqno, msg);
372         
373             // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
374
while((m=entry.received_msgs.remove()) != null)
375                 passUp(new Event(Event.MSG, m));
376         }
377     }
378
379
380
381
382     /** Add the ACK to hashtable.sender.sent_msgs */
383     private void handleAckReceived(Object JavaDoc sender, long seqno) {
384         Entry entry;
385         AckSenderWindow win;
386
387         if(log.isTraceEnabled()) log.trace("[" + local_addr + "] <-- ACK(" + sender + ": #" + seqno + ')');
388         synchronized(connections) {
389             entry=(Entry)connections.get(sender);
390         }
391         if(entry == null || entry.sent_msgs == null)
392             return;
393         win=entry.sent_msgs;
394         win.ack(seqno); // removes message from retransmission
395
}
396
397
398
399     void sendAck(Address dst, long seqno) {
400         Message ack=new Message(dst, null, null);
401         ack.putHeader(name, new UnicastHeader(UnicastHeader.DATA_ACK, seqno));
402         if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> ACK(" + dst + ": #" + seqno + ')');
403         passDown(new Event(Event.MSG, ack));
404     }
405
406
407
408
409
410
411     public static class UnicastHeader extends Header implements Streamable {
412         static final byte DATA=0;
413         static final byte DATA_ACK=1;
414     
415         byte type=DATA;
416         long seqno=0; // First msg is 0
417
boolean first=false;
418
419
420         public UnicastHeader() {} // used for externalization
421

422         public UnicastHeader(byte type, long seqno) {
423             this.type=type == DATA_ACK ? DATA_ACK : DATA;
424             this.seqno=seqno;
425         }
426     
427         public String JavaDoc toString() {
428             return "[UNICAST: " + type2Str(type) + ", seqno=" + seqno + ']';
429         }
430     
431         public static String JavaDoc type2Str(byte t) {
432             switch(t) {
433                 case DATA: return "DATA";
434                 case DATA_ACK: return "DATA_ACK";
435                 default: return "<unknown>";
436             }
437         }
438
439         public long size() {
440             return (2 * Global.BYTE_SIZE) + Global.LONG_SIZE;
441         }
442
443
444         public void writeExternal(ObjectOutput out) throws IOException {
445             out.writeByte(type);
446             out.writeLong(seqno);
447             out.writeBoolean(first);
448         }
449     
450     
451     
452         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
453             type=in.readByte();
454             seqno=in.readLong();
455             first=in.readBoolean();
456         }
457
458         public void writeTo(DataOutputStream out) throws IOException {
459             out.writeByte(type);
460             out.writeLong(seqno);
461             out.writeBoolean(first);
462         }
463
464         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
465             type=in.readByte();
466             seqno=in.readLong();
467             first=in.readBoolean();
468         }
469     }
470     
471     
472 }
473
Popular Tags