1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.AckMcastSenderWindow; 8 import org.jgroups.stack.AckReceiverWindow; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Util; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.util.HashMap ; 16 import java.util.Iterator ; 17 import java.util.Properties ; 18 import java.util.Vector ; 19 20 21 22 23 53 public class SMACK extends Protocol implements AckMcastSenderWindow.RetransmitCommand { 54 long[] timeout={1000,2000,3000}; int max_xmits=10; final Vector members=new Vector (); AckMcastSenderWindow sender_win=null; 58 final HashMap receivers=new HashMap (); final HashMap xmit_table=new HashMap (); Address local_addr=null; long seqno=1; long vid=1; boolean print_local_addr=true; 64 static final String name="SMACK"; 65 66 67 68 69 70 public SMACK() { 71 ; 72 } 73 74 public String getName() { 75 return name; 76 } 77 78 79 public boolean setProperties(Properties props) { 80 String str; 81 long[] tmp; 82 83 super.setProperties(props); 84 str=props.getProperty("print_local_addr"); 85 if(str != null) { 86 print_local_addr=Boolean.valueOf(str).booleanValue(); 87 props.remove("print_local_addr"); 88 } 89 90 str=props.getProperty("timeout"); 91 if(str != null) { 92 tmp=Util.parseCommaDelimitedLongs(str); 93 props.remove("timeout"); 94 if(tmp != null && tmp.length > 0) 95 timeout=tmp; 96 } 97 98 str=props.getProperty("max_xmits"); 99 if(str != null) { 100 max_xmits=Integer.parseInt(str); 101 props.remove("max_xmits"); 102 } 103 104 105 if(props.size() > 0) { 106 System.err.println("SMACK.setProperties(): the following properties are not recognized:"); 107 props.list(System.out); 108 return false; 109 } 110 return true; 111 } 112 113 114 public void stop() { 115 AckReceiverWindow win; 116 if(sender_win != null) { 117 sender_win.stop(); 118 sender_win=null; 119 } 120 for(Iterator it=receivers.values().iterator(); it.hasNext();) { 121 win=(AckReceiverWindow)it.next(); 122 win.reset(); 123 } 124 receivers.clear(); 125 } 126 127 128 public void up(Event evt) { 129 Address sender; 130 131 switch(evt.getType()) { 132 133 case Event.SET_LOCAL_ADDRESS: 134 local_addr=(Address)evt.getArg(); 135 addMember(local_addr); 136 if(print_local_addr) { 137 System.out.println("\n-------------------------------------------------------\n" + 138 "GMS: address is " + local_addr + 139 "\n-------------------------------------------------------"); 140 } 141 break; 142 143 case Event.CONNECT_OK: 144 passUp(evt); 145 sender_win=new AckMcastSenderWindow(this, timeout); 146 147 Message join_msg=new Message(); 149 join_msg.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1)); 150 passDown(new Event(Event.MSG, join_msg)); 151 return; 152 153 case Event.SUSPECT: 154 155 if(log.isInfoEnabled()) log.info("removing suspected member " + evt.getArg()); 156 removeMember((Address)evt.getArg()); 157 break; 158 159 case Event.MSG: 160 Message msg=(Message)evt.getArg(), tmp_msg; 161 if(msg == null) break; 162 sender=msg.getSrc(); 163 SmackHeader hdr=(SmackHeader)msg.removeHeader(name); 164 if(hdr == null) break; 166 switch(hdr.type) { 167 case SmackHeader.MCAST: Long tmp_seqno; 169 AckReceiverWindow win; 170 Message ack_msg=new Message(sender, null, null); 171 172 ack_msg.putHeader(name, new SmackHeader(SmackHeader.ACK, hdr.seqno)); 173 passDown(new Event(Event.MSG, ack_msg)); 174 175 tmp_seqno=new Long (hdr.seqno); 176 177 if(log.isTraceEnabled()) 178 log.trace("received #" + tmp_seqno + " from " + sender); 179 180 win=(AckReceiverWindow)receivers.get(sender); 181 if(win == null) { 182 addMember(sender); 183 win=new AckReceiverWindow(hdr.seqno); 184 receivers.put(sender, win); 185 } 186 win.add(hdr.seqno, msg); 187 188 while((tmp_msg=win.remove()) != null) 190 passUp(new Event(Event.MSG, tmp_msg)); 191 return; 192 193 case SmackHeader.ACK: 194 addMember(msg.getSrc()); 195 sender_win.ack(hdr.seqno, msg.getSrc()); 196 sender_win.clearStableMessages(); 197 if(log.isTraceEnabled()) 198 log.trace("received ack for #" + hdr.seqno + " from " + msg.getSrc()); 199 return; 200 201 case SmackHeader.JOIN_ANNOUNCEMENT: 202 203 if(log.isInfoEnabled()) log.info("received join announcement by " + msg.getSrc()); 204 205 if(!containsMember(sender)) { 206 Message join_rsp=new Message(sender, null, null); 207 join_rsp.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1)); 208 passDown(new Event(Event.MSG, join_rsp)); 209 } 210 addMember(sender); 211 return; 212 213 case SmackHeader.LEAVE_ANNOUNCEMENT: 214 215 if(log.isInfoEnabled()) log.info("received leave announcement by " + msg.getSrc()); 216 217 removeMember(sender); 218 return; 219 220 default: 221 if(log.isWarnEnabled()) log.warn("detected SmackHeader with invalid type: " + hdr); 222 break; 223 } 224 break; 225 } 226 227 passUp(evt); 228 } 229 230 231 public void down(Event evt) { 232 Message leave_msg; 233 234 switch(evt.getType()) { 235 236 case Event.DISCONNECT: 237 leave_msg=new Message(); 238 leave_msg.putHeader(name, new SmackHeader(SmackHeader.LEAVE_ANNOUNCEMENT, -1)); 239 passDown(new Event(Event.MSG, leave_msg)); 240 break; 242 243 case Event.CONNECT: 244 246 249 256 break; 257 258 259 case Event.MSG: 261 Message msg=(Message)evt.getArg(); 262 if(msg == null) break; 263 if(msg.getDest() == null || msg.getDest().isMulticastAddress()) { 264 msg.putHeader(name, new SmackHeader(SmackHeader.MCAST, seqno)); 265 sender_win.add(seqno, msg, (Vector )members.clone()); 266 if(log.isTraceEnabled()) log.trace("sending mcast #" + seqno); 267 seqno++; 268 } 269 break; 270 } 271 272 passDown(evt); 273 } 274 275 276 277 278 279 public void retransmit(long seqno, Message msg, Address dest) { 280 msg.setDest(dest); 281 282 if(log.isInfoEnabled()) log.info(seqno + ", msg=" + msg); 283 passDown(new Event(Event.MSG, msg)); 284 } 285 286 287 288 289 290 291 public static class SmackHeader extends Header { 292 public static final int MCAST=1; 293 public static final int ACK=2; 294 public static final int JOIN_ANNOUNCEMENT=3; 295 public static final int LEAVE_ANNOUNCEMENT=4; 296 297 int type=0; 298 long seqno=-1; 299 300 public SmackHeader() { 301 ; 302 } 303 304 public SmackHeader(int type, long seqno) { 305 this.type=type; 306 this.seqno=seqno; 307 } 308 309 310 public void writeExternal(ObjectOutput out) throws IOException { 311 out.writeInt(type); 312 out.writeLong(seqno); 313 } 314 315 316 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 317 type=in.readInt(); 318 seqno=in.readLong(); 319 } 320 321 322 public String toString() { 323 switch(type) { 324 case MCAST: 325 return "MCAST"; 326 case ACK: 327 return "ACK"; 328 case JOIN_ANNOUNCEMENT: 329 return "JOIN_ANNOUNCEMENT"; 330 case LEAVE_ANNOUNCEMENT: 331 return "LEAVE_ANNOUNCEMENT"; 332 default: 333 return "<unknown>"; 334 } 335 } 336 } 337 338 339 340 void addMember(Address mbr) { 341 synchronized(members) { 342 if(mbr != null && !members.contains(mbr)) { 343 Object tmp; 344 View new_view; 345 members.addElement(mbr); 346 tmp=members.clone(); 347 if(log.isTraceEnabled()) 348 log.trace("added " + mbr + ", members=" + tmp); 349 new_view=new View(new ViewId(local_addr, vid++), (Vector )tmp); 350 passUp(new Event(Event.VIEW_CHANGE, new_view)); 351 passDown(new Event(Event.VIEW_CHANGE, new_view)); 352 } 353 } 354 } 355 356 void removeMember(Address mbr) { 357 synchronized(members) { 358 if(mbr != null) { 359 Object tmp; 360 View new_view; 361 members.removeElement(mbr); 362 tmp=members.clone(); 363 if(log.isTraceEnabled()) 364 log.trace("removed " + mbr + ", members=" + tmp); 365 new_view=new View(new ViewId(local_addr, vid++), (Vector )tmp); 366 passUp(new Event(Event.VIEW_CHANGE, new_view)); 367 passDown(new Event(Event.VIEW_CHANGE, new_view)); 368 if(sender_win != null) 369 sender_win.remove(mbr); } 371 } 372 } 373 374 375 boolean containsMember(Address mbr) { 376 synchronized(members) { 377 return mbr != null && members.contains(mbr); 378 } 379 } 380 381 382 383 } 384 | Popular Tags |