1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.util.CondVar; 7 import org.jgroups.util.Streamable; 8 import org.jgroups.stack.Protocol; 9 10 import java.io.*; 11 import java.util.*; 12 13 23 public class FC extends Protocol { 24 25 26 Address local_addr=null; 27 28 30 final HashMap sent=new HashMap(11); 31 32 36 final HashMap received=new HashMap(11); 37 38 39 final Vector members=new Vector(11); 40 41 42 final List creditors=new ArrayList(11); 43 44 46 long max_credits=50000; 47 48 50 double min_threshold=0.25; 51 52 54 long min_credits=0; 55 56 57 CondVar blocking=new CondVar("blocking", Boolean.FALSE, sent); 59 static final String name="FC"; 60 61 long start_blocking=0, stop_blocking=0; 62 63 64 65 66 public String getName() { 67 return name; 68 } 69 70 71 public boolean setProperties(Properties props) { 72 String str; 73 boolean min_credits_set=false; 74 75 super.setProperties(props); 76 str=props.getProperty("max_credits"); 77 if(str != null) { 78 max_credits=Long.parseLong(str); 79 props.remove("max_credits"); 80 } 81 82 str=props.getProperty("min_threshold"); 83 if(str != null) { 84 min_threshold=Double.parseDouble(str); 85 props.remove("min_threshold"); 86 } 87 88 str=props.getProperty("min_credits"); 89 if(str != null) { 90 min_credits=Long.parseLong(str); 91 props.remove("min_credits"); 92 min_credits_set=true; 93 } 94 95 if(!min_credits_set) 96 min_credits=(long)((double)max_credits * min_threshold); 97 98 if(props.size() > 0) { 99 System.err.println("FC.setProperties(): the following properties are not recognized:"); 100 props.list(System.out); 101 return false; 102 } 103 return true; 104 } 105 106 107 108 public void down(final Event evt) { 109 switch(evt.getType()) { 110 case Event.VIEW_CHANGE: 111 new Thread () { 114 public void run() { 115 handleViewChange(((View)evt.getArg()).getMembers()); 116 } 117 }.start(); 118 break; 119 case Event.MSG: 120 waitUntilEnoughCreditsAvailable(evt); 122 return; 123 } 124 passDown(evt); } 126 127 128 129 130 public void up(Event evt) { 131 switch(evt.getType()) { 132 case Event.SET_LOCAL_ADDRESS: 133 local_addr=(Address)evt.getArg(); 134 break; 135 case Event.VIEW_CHANGE: 136 handleViewChange(((View)evt.getArg()).getMembers()); 137 break; 138 case Event.MSG: 139 Message msg=(Message)evt.getArg(); 140 FcHeader hdr=(FcHeader)msg.removeHeader(name); 141 if(hdr != null) { 142 if(hdr.type == FcHeader.REPLENISH) { 143 handleCredit(msg.getSrc()); 144 return; } 146 } 147 else { 148 adjustCredit(msg); 149 } 150 break; 151 } 152 passUp(evt); 153 } 154 155 156 157 void handleCredit(Address src) { 158 if(src == null) return; 159 160 synchronized(sent) { 161 if(log.isTraceEnabled()) 162 log.trace("received replenishment message from " + src + ", old credit was " + sent.get(src) + 163 ", new credits are " + max_credits + ". Creditors are\n" + printCreditors()); 164 165 sent.put(src, new Long (max_credits)); 166 if(creditors.size() > 0) { removeCreditor(src); 168 if(creditors.size() == 0 && blocking.get().equals(Boolean.TRUE)) { 169 unblockSender(); } 171 } 172 } 173 } 174 175 176 180 void adjustCredit(Message msg) { 181 Address SRC=msg.getSrc(); 182 long size=Math.max(24, msg.getLength()); 183 184 if(src == null) { 185 if(log.isErrorEnabled()) log.error("src is null"); 186 return; 187 } 188 189 synchronized(received) { 190 if(log.isTraceEnabled()) log.trace("credit for " + src + " is " + received.get(src)); 191 if(decrementCredit(received, src, size) == false) { 192 received.put(src, new Long (max_credits)); 193 if(log.isTraceEnabled()) log.trace("sending replenishment message to " + src); 195 sendCredit(src); 196 } 197 } 198 } 199 200 201 202 void sendCredit(Address dest) { 203 Message msg=new Message(dest, null, null); 204 FcHeader hdr=new FcHeader(FcHeader.REPLENISH); 205 msg.putHeader(name, hdr); 206 passDown(new Event(Event.MSG, msg)); 207 } 208 209 210 216 void waitUntilEnoughCreditsAvailable(Event evt) { 217 Message msg=(Message)evt.getArg(); 218 219 synchronized(sent) { passDown(evt); if(decrMessage(msg) == false) { 223 if(log.isTraceEnabled()) 224 log.trace("blocking due to insufficient credits, creditors=\n" + printCreditors()); 225 start_blocking=System.currentTimeMillis(); 226 blocking.set(Boolean.TRUE); 227 blocking.waitUntil(Boolean.FALSE); } 229 } 230 } 231 232 233 240 private boolean decrMessage(Message msg) { 241 Address dest; 242 long size; 243 boolean success=true; 244 245 249 if(msg == null) { 250 if(log.isErrorEnabled()) log.error("msg is null"); 251 return true; } 253 dest=msg.getDest(); 254 size=Math.max(24, msg.getLength()); 255 if(dest != null && !dest.isMulticastAddress()) { if(log.isTraceEnabled()) log.trace("credit for " + dest + " is " + sent.get(dest)); 257 if(decrementCredit(sent, dest, size)) { 258 return true; 259 } 260 else { 261 addCreditor(dest); 262 return false; 263 } 264 } 265 else { for(Iterator it=members.iterator(); it.hasNext();) { 267 dest=(Address)it.next(); 268 if(log.isTraceEnabled()) log.trace("credit for " + dest + " is " + sent.get(dest)); 269 if(decrementCredit(sent, dest, size) == false) { 270 addCreditor(dest); 271 success=false; 272 } 273 } 274 } 275 return success; 276 } 277 278 279 280 281 282 private void unblockSender() { 283 if(log.isTraceEnabled()) log.trace("setting blocking=false"); 287 blocking.set(Boolean.FALSE); 288 printBlockTime(); 289 } 290 291 private void printBlockTime() { 292 stop_blocking=System.currentTimeMillis(); 293 long diff=stop_blocking - start_blocking; 294 stop_blocking=start_blocking=0; 295 if(log.isTraceEnabled()) 296 log.trace("blocking time was " + diff + "ms"); 297 } 298 299 private String printCreditors() { 300 StringBuffer sb=new StringBuffer (); 304 for(Iterator it=creditors.iterator(); it.hasNext();) { 305 Address creditor=(Address)it.next(); 306 sb.append(creditor).append(": ").append(getCredits(sent, creditor)).append(" credits\n"); 307 } 308 return sb.toString(); 309 } 310 311 private void addCreditor(Address mbr) { 312 if(mbr != null && !creditors.contains(mbr)) 313 creditors.add(mbr); 314 } 315 316 private void removeCreditor(Address mbr) { 317 if(mbr != null) 318 creditors.remove(mbr); 319 } 320 321 private long getCredits(Map map, Address mbr) { 322 Long tmp=(Long )map.get(mbr); 323 if(tmp == null) { 324 map.put(mbr, new Long (max_credits)); 325 return max_credits; 326 } 327 return tmp.longValue(); 328 } 329 330 331 332 333 334 340 private boolean decrementCredit(HashMap map, Address dest, long credits_required) { 341 long credits_left, new_credits_left; 342 Long tmp=(Long )map.get(dest); 343 344 if(tmp != null) { 345 credits_left=tmp.longValue(); 346 new_credits_left=Math.max(0, credits_left - credits_required); 347 map.put(dest, new Long (new_credits_left)); 348 349 if(new_credits_left >= min_credits + credits_required) { 350 return true; 351 } 352 else { 353 if(log.isTraceEnabled()) { 354 StringBuffer sb=new StringBuffer (); 355 sb.append("not enough credits left for ").append(dest).append(": left=").append(new_credits_left); 356 sb.append(", required+min_credits=").append((credits_required +min_credits)).append(", required="); 357 sb.append(credits_required).append(", min_credits=").append(min_credits); 358 log.trace(sb.toString()); 359 } 360 return false; 361 } 362 } 363 return true; 364 } 365 366 367 void handleViewChange(Vector mbrs) { 368 Address addr; 369 if(mbrs == null) return; 370 371 if(log.isTraceEnabled()) log.trace("new membership: " + mbrs); 372 members.clear(); 373 members.addAll(mbrs); 374 375 synchronized(received) { 376 for(int i=0; i < mbrs.size(); i++) { 378 addr=(Address) mbrs.elementAt(i); 379 if(!received.containsKey(addr)) 380 received.put(addr, new Long (max_credits)); 381 } 382 for(Iterator it=received.keySet().iterator(); it.hasNext();) { 384 addr=(Address) it.next(); 385 if(!mbrs.contains(addr)) 386 it.remove(); 387 } 388 } 389 390 synchronized(sent) { 391 for(int i=0; i < mbrs.size(); i++) { 393 addr=(Address) mbrs.elementAt(i); 394 if(!sent.containsKey(addr)) 395 sent.put(addr, new Long (max_credits)); 396 } 397 for(Iterator it=sent.keySet().iterator(); it.hasNext();) { 399 addr=(Address)it.next(); 400 if(!mbrs.contains(addr)) 401 it.remove(); } 403 404 405 406 for(Iterator it=creditors.iterator(); it.hasNext();) { 408 Address creditor=(Address) it.next(); 409 if(!mbrs.contains(creditor)) 410 it.remove(); 411 } 412 413 if(log.isTraceEnabled()) log.trace("creditors are\n" + printCreditors()); 414 if(creditors.size() == 0 && blocking.get().equals(Boolean.TRUE)) 415 unblockSender(); 416 } 417 } 418 419 420 421 430 444 453 public static class FcHeader extends Header implements Streamable { 454 public static final byte REPLENISH = 1; 455 byte type = REPLENISH; 456 457 public FcHeader() { 458 459 } 460 461 public FcHeader(byte type) { 462 this.type=type; 463 } 464 465 466 467 public long size() { 468 return Global.BYTE_SIZE; 469 } 470 471 472 public void writeExternal(ObjectOutput out) throws IOException { 473 out.writeByte(type); 474 } 475 476 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 477 type=in.readByte(); 478 } 479 480 public void writeTo(DataOutputStream out) throws IOException { 481 out.writeByte(type); 482 } 483 484 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 485 type=in.readByte(); 486 } 487 488 } 489 490 491 } 492 | Popular Tags |