KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > SMACK


1 // $Id: SMACK.java,v 1.8 2004/09/23 16:29:42 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.AckMcastSenderWindow;
8 import org.jgroups.stack.AckReceiverWindow;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Util;
11
12 import java.io.IOException JavaDoc;
13 import java.io.ObjectInput JavaDoc;
14 import java.io.ObjectOutput JavaDoc;
15 import java.util.HashMap JavaDoc;
16 import java.util.Iterator JavaDoc;
17 import java.util.Properties JavaDoc;
18 import java.util.Vector JavaDoc;
19
20
21
22
23 /**
24  * Simple Multicast ACK protocol. A positive acknowledgment-based protocol for reliable delivery of
25  * multicast messages, which does not need any group membership service.
26  * Basically works as follows:
27  * <ul>
28  * <li>Sender S sends multicast message M</li>
29  * <li>When member P receives M, it sends back a unicast ack to S</li>
30  * <li>When S receives the ack from P, it checks whether P is in its
31  * membership list. If not, P will be added. This is necessary to retransmit the next message
32  * sent to P.</li>
33  * <li>When S sends a multicast message M, all members are added to a
34  * retransmission entry (containing all members to which the message
35  * was sent), which is added to a hashmap (keyed by seqno). Whenever
36  * an ack is received from receiver X, X will be removed from the
37  * retransmission list for the given seqno. When the retransmission
38  * list is empty, the seqno will be removed from the hashmap.</li>
39  * <li>A retransmitter thread in the sender periodically retransmits
40  * (either via unicast, or multicast) messages for which no ack has
41  * been received yet</li>
42  * <li>When a max number of (unsuccessful) retransmissions have been
43  * exceeded, all remaining members for that seqno are removed from
44  * the local membership, and the seqno is removed from te hashmap,
45  * ceasing all retransmissions</li>
46  * </ul>
47  * Advantage of this protocol: no group membership necessary, fast.
48  * @author Bela Ban Aug 2002
49  * @version $Revision: 1.8 $
50  * todo: initial mcast to announce new member (for view change)
51  * todo: fix membershop bug: start a, b, kill b, restart b: b will be suspected by a
52  */

53 public class SMACK extends Protocol implements AckMcastSenderWindow.RetransmitCommand {
54     long[] timeout={1000,2000,3000}; // retransmit timeouts (for AckMcastSenderWindow)
55
int max_xmits=10; // max retransmissions (if still no ack, member will be removed)
56
final Vector JavaDoc members=new Vector JavaDoc(); // contains Addresses
57
AckMcastSenderWindow sender_win=null;
58     final HashMap JavaDoc receivers=new HashMap JavaDoc(); // keys=sender (Address), values=AckReceiverWindow
59
final HashMap JavaDoc xmit_table=new HashMap JavaDoc(); // keeps track of num xmits / member (keys: mbr, val:num)
60
Address local_addr=null; // my own address
61
long seqno=1; // seqno for msgs sent by this sender
62
long vid=1; // for the fake view changes
63
boolean print_local_addr=true;
64     static final String JavaDoc name="SMACK";
65     
66     
67
68
69
70     public SMACK() {
71         ;
72     }
73
74     public String JavaDoc getName() {
75         return name;
76     }
77
78
79     public boolean setProperties(Properties JavaDoc props) {
80         String JavaDoc str;
81         long[] tmp;
82
83         super.setProperties(props);
84         str=props.getProperty("print_local_addr");
85         if(str != null) {
86             print_local_addr=Boolean.valueOf(str).booleanValue();
87             props.remove("print_local_addr");
88         }
89
90         str=props.getProperty("timeout");
91         if(str != null) {
92             tmp=Util.parseCommaDelimitedLongs(str);
93             props.remove("timeout");
94             if(tmp != null && tmp.length > 0)
95                 timeout=tmp;
96         }
97
98         str=props.getProperty("max_xmits");
99         if(str != null) {
100             max_xmits=Integer.parseInt(str);
101             props.remove("max_xmits");
102         }
103
104
105         if(props.size() > 0) {
106             System.err.println("SMACK.setProperties(): the following properties are not recognized:");
107             props.list(System.out);
108             return false;
109         }
110         return true;
111     }
112
113
114     public void stop() {
115         AckReceiverWindow win;
116         if(sender_win != null) {
117             sender_win.stop();
118             sender_win=null;
119         }
120         for(Iterator JavaDoc it=receivers.values().iterator(); it.hasNext();) {
121             win=(AckReceiverWindow)it.next();
122             win.reset();
123         }
124         receivers.clear();
125     }
126
127
128     public void up(Event evt) {
129         Address sender;
130
131         switch(evt.getType()) {
132
133             case Event.SET_LOCAL_ADDRESS:
134                 local_addr=(Address)evt.getArg();
135                 addMember(local_addr);
136                 if(print_local_addr) {
137                     System.out.println("\n-------------------------------------------------------\n" +
138                                        "GMS: address is " + local_addr +
139                                        "\n-------------------------------------------------------");
140                 }
141                 break;
142
143             case Event.CONNECT_OK:
144                 passUp(evt);
145                 sender_win=new AckMcastSenderWindow(this, timeout);
146
147                 // send join announcement
148
Message join_msg=new Message();
149                 join_msg.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1));
150                 passDown(new Event(Event.MSG, join_msg));
151                 return;
152
153             case Event.SUSPECT:
154
155                     if(log.isInfoEnabled()) log.info("removing suspected member " + evt.getArg());
156                 removeMember((Address)evt.getArg());
157                 break;
158
159             case Event.MSG:
160                 Message msg=(Message)evt.getArg(), tmp_msg;
161                 if(msg == null) break;
162                 sender=msg.getSrc();
163                 SmackHeader hdr=(SmackHeader)msg.removeHeader(name);
164                 if(hdr == null) // is probably a unicast message
165
break;
166                 switch(hdr.type) {
167                     case SmackHeader.MCAST: // send an ack, then pass up (if not already received)
168
Long JavaDoc tmp_seqno;
169                         AckReceiverWindow win;
170                         Message ack_msg=new Message(sender, null, null);
171
172                         ack_msg.putHeader(name, new SmackHeader(SmackHeader.ACK, hdr.seqno));
173                         passDown(new Event(Event.MSG, ack_msg));
174
175                         tmp_seqno=new Long JavaDoc(hdr.seqno);
176
177                         if(log.isTraceEnabled())
178                             log.trace("received #" + tmp_seqno + " from " + sender);
179
180                         win=(AckReceiverWindow)receivers.get(sender);
181                         if(win == null) {
182                             addMember(sender);
183                             win=new AckReceiverWindow(hdr.seqno);
184                             receivers.put(sender, win);
185                         }
186                         win.add(hdr.seqno, msg);
187
188                         // now remove as many messages as possible
189
while((tmp_msg=win.remove()) != null)
190                             passUp(new Event(Event.MSG, tmp_msg));
191                         return;
192
193                     case SmackHeader.ACK:
194                         addMember(msg.getSrc());
195                         sender_win.ack(hdr.seqno, msg.getSrc());
196                         sender_win.clearStableMessages();
197                         if(log.isTraceEnabled())
198                             log.trace("received ack for #" + hdr.seqno + " from " + msg.getSrc());
199                         return;
200
201                     case SmackHeader.JOIN_ANNOUNCEMENT:
202
203                             if(log.isInfoEnabled()) log.info("received join announcement by " + msg.getSrc());
204
205                         if(!containsMember(sender)) {
206                             Message join_rsp=new Message(sender, null, null);
207                             join_rsp.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1));
208                             passDown(new Event(Event.MSG, join_rsp));
209                         }
210                         addMember(sender);
211                         return;
212
213                     case SmackHeader.LEAVE_ANNOUNCEMENT:
214
215                             if(log.isInfoEnabled()) log.info("received leave announcement by " + msg.getSrc());
216
217                         removeMember(sender);
218                         return;
219
220                     default:
221                         if(log.isWarnEnabled()) log.warn("detected SmackHeader with invalid type: " + hdr);
222                         break;
223                 }
224                 break;
225         }
226
227         passUp(evt);
228     }
229
230
231     public void down(Event evt) {
232         Message leave_msg;
233
234         switch(evt.getType()) {
235
236             case Event.DISCONNECT:
237                 leave_msg=new Message();
238                 leave_msg.putHeader(name, new SmackHeader(SmackHeader.LEAVE_ANNOUNCEMENT, -1));
239                 passDown(new Event(Event.MSG, leave_msg));
240                 // passUp(new Event(Event.DISCONNECT_OK));
241
break;
242
243             case Event.CONNECT:
244                 //passUp(new Event(Event.CONNECT_OK));
245

246                 // Do not send JOIN_ANOUNCEMENT here, don't know yet if the transport is OK.
247
// Send it later when handling CONNECT_OK from below
248

249 // sender_win=new AckMcastSenderWindow(this, timeout);
250
// // send join announcement
251
// Message join_msg=new Message();
252
// join_msg.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1));
253
// passDown(new Event(Event.MSG, join_msg));
254
// return;
255

256                 break;
257
258
259 // add a header with the current sequence number and increment seqno
260
case Event.MSG:
261                 Message msg=(Message)evt.getArg();
262                 if(msg == null) break;
263                 if(msg.getDest() == null || msg.getDest().isMulticastAddress()) {
264                     msg.putHeader(name, new SmackHeader(SmackHeader.MCAST, seqno));
265                     sender_win.add(seqno, msg, (Vector JavaDoc)members.clone());
266                     if(log.isTraceEnabled()) log.trace("sending mcast #" + seqno);
267                     seqno++;
268                 }
269                 break;
270         }
271
272         passDown(evt);
273     }
274
275
276
277     /* ----------------------- Interface AckMcastSenderWindow.RetransmitCommand -------------------- */
278
279     public void retransmit(long seqno, Message msg, Address dest) {
280         msg.setDest(dest);
281
282             if(log.isInfoEnabled()) log.info(seqno + ", msg=" + msg);
283         passDown(new Event(Event.MSG, msg));
284     }
285
286     /* -------------------- End of Interface AckMcastSenderWindow.RetransmitCommand ---------------- */
287
288
289
290
291     public static class SmackHeader extends Header {
292         public static final int MCAST=1;
293         public static final int ACK=2;
294         public static final int JOIN_ANNOUNCEMENT=3;
295         public static final int LEAVE_ANNOUNCEMENT=4;
296
297         int type=0;
298         long seqno=-1;
299
300         public SmackHeader() {
301             ;
302         }
303
304         public SmackHeader(int type, long seqno) {
305             this.type=type;
306             this.seqno=seqno;
307         }
308
309
310         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
311             out.writeInt(type);
312             out.writeLong(seqno);
313         }
314
315
316         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
317             type=in.readInt();
318             seqno=in.readLong();
319         }
320
321
322         public String JavaDoc toString() {
323             switch(type) {
324                 case MCAST:
325                     return "MCAST";
326                 case ACK:
327                     return "ACK";
328                 case JOIN_ANNOUNCEMENT:
329                     return "JOIN_ANNOUNCEMENT";
330                 case LEAVE_ANNOUNCEMENT:
331                     return "LEAVE_ANNOUNCEMENT";
332                 default:
333                     return "<unknown>";
334             }
335         }
336     }
337
338
339     /* ------------------------------------- Private methods --------------------------------------- */
340     void addMember(Address mbr) {
341         synchronized(members) {
342             if(mbr != null && !members.contains(mbr)) {
343                 Object JavaDoc tmp;
344                 View new_view;
345                 members.addElement(mbr);
346                 tmp=members.clone();
347                 if(log.isTraceEnabled())
348                     log.trace("added " + mbr + ", members=" + tmp);
349                 new_view=new View(new ViewId(local_addr, vid++), (Vector JavaDoc)tmp);
350                 passUp(new Event(Event.VIEW_CHANGE, new_view));
351                 passDown(new Event(Event.VIEW_CHANGE, new_view));
352             }
353         }
354     }
355
356     void removeMember(Address mbr) {
357         synchronized(members) {
358             if(mbr != null) {
359                 Object JavaDoc tmp;
360                 View new_view;
361                 members.removeElement(mbr);
362                 tmp=members.clone();
363                 if(log.isTraceEnabled())
364                     log.trace("removed " + mbr + ", members=" + tmp);
365                 new_view=new View(new ViewId(local_addr, vid++), (Vector JavaDoc)tmp);
366                 passUp(new Event(Event.VIEW_CHANGE, new_view));
367                 passDown(new Event(Event.VIEW_CHANGE, new_view));
368                 if(sender_win != null)
369                     sender_win.remove(mbr); // causes retransmissions to mbr to stop
370
}
371         }
372     }
373
374
375     boolean containsMember(Address mbr) {
376         synchronized(members) {
377             return mbr != null && members.contains(mbr);
378         }
379     }
380
381     /* --------------------------------- End of Private methods ------------------------------------ */
382
383 }
384
Popular Tags