1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.AckReceiverWindow; 7 import org.jgroups.stack.AckSenderWindow; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Util; 11 import org.jgroups.util.Streamable; 12 13 import java.io.*; 14 import java.util.*; 15 16 17 18 36 public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand { 37 boolean operational=false; 38 final Vector members=new Vector(11); 39 final HashMap connections=new HashMap(11); long[] timeout={400, 800,1600,3200}; Address local_addr=null; 42 TimeScheduler timer=null; 44 boolean use_gms=true; 47 int window_size=-1; int min_threshold=-1; final static String name="UNICAST"; 50 51 52 53 54 class Entry { 55 AckReceiverWindow received_msgs=null; AckSenderWindow sent_msgs=null; long sent_msgs_seqno=getInitialSeqno(); 59 60 void reset() { 61 if(sent_msgs != null) 62 sent_msgs.reset(); 63 if(received_msgs != null) 64 received_msgs.reset(); 65 } 66 67 68 public String toString() { 69 StringBuffer sb=new StringBuffer (); 70 if(sent_msgs != null) 71 sb.append("sent_msgs=" + sent_msgs + '\n'); 72 if(received_msgs != null) 73 sb.append("received_msgs=" + received_msgs + '\n'); 74 return sb.toString(); 75 } 76 } 77 78 79 80 81 82 public String getName() {return name;} 83 84 85 public boolean setProperties(Properties props) { 86 String str; 87 long[] tmp; 88 89 super.setProperties(props); 90 str=props.getProperty("timeout"); 91 if(str != null) { 92 tmp=Util.parseCommaDelimitedLongs(str); 93 if(tmp != null && tmp.length > 0) 94 timeout=tmp; 95 props.remove("timeout"); 96 } 97 98 str=props.getProperty("window_size"); 99 if(str != null) { 100 window_size=Integer.parseInt(str); 101 props.remove("window_size"); 102 } 103 104 str=props.getProperty("min_threshold"); 105 if(str != null) { 106 min_threshold=Integer.parseInt(str); 107 props.remove("min_threshold"); 108 } 109 110 str=props.getProperty("use_gms"); 111 if(str != null) { 112 use_gms=Boolean.valueOf(str).booleanValue(); 113 props.remove("use_gms"); 114 } 115 116 if(props.size() > 0) { 117 System.err.println("UNICAST.setProperties(): these properties are not recognized:"); 118 props.list(System.out); 119 return false; 120 } 121 122 if((window_size > 0 && min_threshold <= 0) || (window_size <= 0 && min_threshold > 0)) { 124 log.error("window_size and min_threshold have to be both set if one of them is set"); 125 return false; 126 } 127 if(window_size > 0 && min_threshold > 0 && window_size < min_threshold) { 128 log.error("min_threshold (" + min_threshold + ") has to be less than window_size (" + window_size + ')'); 129 return false; 130 } 131 return true; 132 } 133 134 public void start() throws Exception { 135 timer=stack != null ? stack.timer : null; 136 if(timer == null) 137 throw new Exception ("UNICAST.start(): timer is null"); 138 } 139 140 public void stop() { 141 removeAllConnections(); 142 operational=false; 143 } 144 145 146 public void up(Event evt) { 147 Message msg; 148 Address dst, src; 149 UnicastHeader hdr; 150 151 switch(evt.getType()) { 152 153 case Event.MSG: 154 msg=(Message)evt.getArg(); 155 dst=msg.getDest(); 156 157 if(dst == null || dst.isMulticastAddress()) break; 160 hdr=(UnicastHeader)msg.removeHeader(name); 161 if(hdr == null) break; 162 src=msg.getSrc(); 163 switch(hdr.type) { 164 case UnicastHeader.DATA: sendAck(src, hdr.seqno); 166 handleDataReceived(src, hdr.seqno, hdr.first, msg); 167 break; 168 case UnicastHeader.DATA_ACK: handleAckReceived(src, hdr.seqno); 170 break; 171 default: 172 log.error("UnicastHeader type " + hdr.type + " not known !"); 173 break; 174 } 175 return; 176 177 case Event.SET_LOCAL_ADDRESS: 178 local_addr=(Address)evt.getArg(); 179 break; 180 } 181 182 passUp(evt); } 184 185 186 187 188 189 public void down(Event evt) { 190 Message msg; 191 Object dst, mbr; 192 Entry entry; 193 UnicastHeader hdr; 194 195 switch (evt.getType()) { 196 197 case Event.MSG: msg = (Message) evt.getArg(); 199 dst = msg.getDest(); 200 201 202 if (dst == null || ((Address) dst).isMulticastAddress()) { 203 break; 204 } 205 206 synchronized(connections) { 207 entry = (Entry) connections.get(dst); 208 if (entry == null) { 209 entry = new Entry(); 210 connections.put(dst, entry); 211 } 212 } 213 214 hdr = new UnicastHeader(UnicastHeader.DATA, entry.sent_msgs_seqno); 215 if (entry.sent_msgs == null) { hdr.first = true; 217 entry.sent_msgs = new AckSenderWindow(this, timeout, this); 218 if (window_size > 0) 219 entry.sent_msgs.setWindowSize(window_size, min_threshold); 220 } 221 msg.putHeader(name, hdr); 222 if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> DATA(" + dst + ": #" + 223 entry.sent_msgs_seqno + ", first=" + hdr.first + ')'); 224 225 if (Global.copy) 226 entry.sent_msgs.add(entry.sent_msgs_seqno, msg.copy()); else 228 entry.sent_msgs.add(entry.sent_msgs_seqno, msg); 230 entry.sent_msgs_seqno++; 231 msg=null; 232 return; 234 case Event.BECOME_SERVER: 235 operational = true; 236 break; 237 238 case Event.VIEW_CHANGE: Vector new_members = ((View) evt.getArg()).getMembers(); 240 Vector left_members; 241 synchronized (members) { 242 left_members = Util.determineLeftMembers(members, new_members); 243 members.removeAllElements(); 244 if (new_members != null) 245 members.addAll(new_members); 246 } 247 248 if (use_gms && left_members.size() > 0) { 251 for (int i = 0; i < left_members.size(); i++) { 252 mbr = left_members.elementAt(i); 253 removeConnection(mbr); 254 } 255 } 256 break; 257 } 258 259 passDown(evt); } 261 262 263 264 private void removeConnection(Object mbr) { 265 Entry entry; 266 267 synchronized(connections) { 268 entry=(Entry)connections.remove(mbr); 269 } 270 if(entry != null) { 271 entry.reset(); 272 if(log.isTraceEnabled()) log.trace("removed " + mbr + " from connection table"); 273 } 274 } 275 276 277 private void removeAllConnections() { 278 Entry entry; 279 280 synchronized(connections) { 281 for(Iterator it=connections.values().iterator(); it.hasNext();) { 282 entry=(Entry)it.next(); 283 entry.reset(); 284 } 285 connections.clear(); 286 } 287 } 288 289 290 291 292 293 static long getInitialSeqno() { 294 long ret=(long)((Math.random() * 100) % 100); 295 return ret; 296 } 297 298 299 300 301 public void retransmit(long seqno, Message msg) { 302 Object dst=msg.getDest(); 303 304 308 313 319 if(log.isTraceEnabled()) 320 log.trace("[" + local_addr + "] --> XMIT(" + dst + ": #" + seqno + ')'); 321 322 if(Global.copy) 323 passDown(new Event(Event.MSG, msg.copy())); 324 else 325 passDown(new Event(Event.MSG, msg)); 326 } 327 328 329 330 331 332 339 void handleDataReceived(Object sender, long seqno, boolean first, Message msg) { 340 Entry entry; 341 Message m; 342 343 if(log.isTraceEnabled()) 344 log.trace("[" + local_addr + "] <-- DATA(" + sender + ": #" + seqno + ", first=" + first); 345 346 synchronized(connections) { 347 entry=(Entry)connections.get(sender); 348 if(entry == null) { 349 entry=new Entry(); 350 connections.put(sender, entry); 351 } 352 } 353 354 if(entry.received_msgs == null) { 355 if(first) 356 entry.received_msgs=new AckReceiverWindow(seqno); 357 else { 358 if(operational) { 359 if(log.isWarnEnabled()) 360 log.warn("[" + local_addr + "] seqno " + seqno + " from " + 361 sender + " is not tagged as the first message sent by " + sender + 362 "; however, the table for received messages from " + sender + 363 " is still null ! We probably haven't received the first message from " 364 + sender + " ! Discarding message (operational=" + operational + ')'); 365 return; 366 } 367 } 368 } 369 370 if(entry.received_msgs != null) { 371 entry.received_msgs.add(seqno, msg); 372 373 while((m=entry.received_msgs.remove()) != null) 375 passUp(new Event(Event.MSG, m)); 376 } 377 } 378 379 380 381 382 383 private void handleAckReceived(Object sender, long seqno) { 384 Entry entry; 385 AckSenderWindow win; 386 387 if(log.isTraceEnabled()) log.trace("[" + local_addr + "] <-- ACK(" + sender + ": #" + seqno + ')'); 388 synchronized(connections) { 389 entry=(Entry)connections.get(sender); 390 } 391 if(entry == null || entry.sent_msgs == null) 392 return; 393 win=entry.sent_msgs; 394 win.ack(seqno); } 396 397 398 399 void sendAck(Address dst, long seqno) { 400 Message ack=new Message(dst, null, null); 401 ack.putHeader(name, new UnicastHeader(UnicastHeader.DATA_ACK, seqno)); 402 if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> ACK(" + dst + ": #" + seqno + ')'); 403 passDown(new Event(Event.MSG, ack)); 404 } 405 406 407 408 409 410 411 public static class UnicastHeader extends Header implements Streamable { 412 static final byte DATA=0; 413 static final byte DATA_ACK=1; 414 415 byte type=DATA; 416 long seqno=0; boolean first=false; 418 419 420 public UnicastHeader() {} 422 public UnicastHeader(byte type, long seqno) { 423 this.type=type == DATA_ACK ? DATA_ACK : DATA; 424 this.seqno=seqno; 425 } 426 427 public String toString() { 428 return "[UNICAST: " + type2Str(type) + ", seqno=" + seqno + ']'; 429 } 430 431 public static String type2Str(byte t) { 432 switch(t) { 433 case DATA: return "DATA"; 434 case DATA_ACK: return "DATA_ACK"; 435 default: return "<unknown>"; 436 } 437 } 438 439 public long size() { 440 return (2 * Global.BYTE_SIZE) + Global.LONG_SIZE; 441 } 442 443 444 public void writeExternal(ObjectOutput out) throws IOException { 445 out.writeByte(type); 446 out.writeLong(seqno); 447 out.writeBoolean(first); 448 } 449 450 451 452 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 453 type=in.readByte(); 454 seqno=in.readLong(); 455 first=in.readBoolean(); 456 } 457 458 public void writeTo(DataOutputStream out) throws IOException { 459 out.writeByte(type); 460 out.writeLong(seqno); 461 out.writeBoolean(first); 462 } 463 464 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 465 type=in.readByte(); 466 seqno=in.readLong(); 467 first=in.readBoolean(); 468 } 469 } 470 471 472 } 473 | Popular Tags |