1 package org.jgroups.protocols; 2 3 import org.jgroups.*; 4 import org.jgroups.annotations.GuardedBy; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.util.Util; 7 import org.jgroups.util.Streamable; 8 import org.jgroups.util.BoundedList; 9 10 import java.util.*; 11 import java.util.concurrent.locks.Condition ; 12 import java.util.concurrent.locks.Lock ; 13 import java.util.concurrent.locks.ReentrantLock ; 14 import java.util.concurrent.TimeUnit ; 15 import java.io.*; 16 17 26 public class SFC extends Protocol { 27 static final String name="SFC"; 28 29 30 private long max_credits=2000000; 31 32 private Long MAX_CREDITS; 33 34 private static final Long ZERO_CREDITS=new Long (0); 35 36 37 @GuardedBy("lock") 38 private long curr_credits_available; 39 40 41 @GuardedBy("received_lock") 42 private final Map<Address,Long > received=new HashMap<Address,Long >(12); 43 44 45 @GuardedBy("received_lock") 46 private final Set<Address> pending_requesters=new HashSet<Address>(); 47 48 49 @GuardedBy("lock") 50 private final Set<Address> pending_creditors=new HashSet<Address>(); 51 52 53 private final Lock lock=new ReentrantLock (); 54 55 private final Lock received_lock=new ReentrantLock (); 56 57 58 59 private final Condition credits_available=lock.newCondition(); 60 61 62 private long max_block_time=5000; 63 64 65 private long last_blocked_request=0L; 66 67 private final List<Address> members=new LinkedList<Address>(); 68 69 private boolean running=true; 70 71 @GuardedBy("lock") long start, stop; 72 73 74 75 long num_blockings=0; 77 long num_bytes_sent=0; 78 long num_credit_requests_sent=0; 79 long num_credit_requests_received=0; 80 long num_replenishments_received=0; 81 long num_replenishments_sent=0; 82 long total_block_time=0; 83 final BoundedList blockings=new BoundedList(50); 84 85 86 public void resetStats() { 87 super.resetStats(); 88 num_blockings=total_block_time=num_replenishments_received=num_credit_requests_sent=num_bytes_sent=0; 89 num_replenishments_sent=num_credit_requests_received=0; 90 blockings.removeAll(); 91 } 92 93 public long getMaxCredits() {return max_credits;} 94 public long getCredits() {return curr_credits_available;} 95 public long getBytesSent() {return num_bytes_sent;} 96 public long getBlockings() {return num_blockings;} 97 public long getCreditRequestsSent() {return num_credit_requests_sent;} 98 public long getCreditRequestsReceived() {return num_credit_requests_received;} 99 public long getReplenishmentsReceived() {return num_replenishments_received;} 100 public long getReplenishmentsSent() {return num_replenishments_sent;} 101 public long getTotalBlockingTime() {return total_block_time;} 102 public double getAverageBlockingTime() {return num_blockings == 0? 0 : total_block_time / num_blockings;} 103 104 105 public Map<String ,Object > dumpStats() { 106 Map<String ,Object > retval=super.dumpStats(); 107 if(retval == null) 108 retval=new HashMap<String ,Object >(); 109 return retval; 110 } 111 112 public String printBlockingTimes() { 113 return blockings.toString(); 114 } 115 116 public String printReceived() { 117 received_lock.lock(); 118 try { 119 return received.toString(); 120 } 121 finally { 122 received_lock.unlock(); 123 } 124 } 125 126 public String printPendingCreditors() { 127 lock.lock(); 128 try { 129 return pending_creditors.toString(); 130 } 131 finally { 132 lock.unlock(); 133 } 134 } 135 136 public String printPendingRequesters() { 137 received_lock.lock(); 138 try { 139 return pending_requesters.toString(); 140 } 141 finally { 142 received_lock.unlock(); 143 } 144 } 145 146 public void unblock() { 147 lock.lock(); 148 try { 149 curr_credits_available=max_credits; 150 credits_available.signalAll(); 151 } 152 finally { 153 lock.unlock(); 154 } 155 } 156 157 159 160 public final String getName() { 161 return name; 162 } 163 164 public boolean setProperties(Properties props) { 165 String str; 166 super.setProperties(props); 167 168 str=props.getProperty("max_block_time"); 169 if(str != null) { 170 max_block_time=Long.parseLong(str); 171 props.remove("max_block_time"); 172 } 173 174 str=props.getProperty("max_credits"); 175 if(str != null) { 176 max_credits=Long.parseLong(str); 177 props.remove("max_credits"); 178 } 179 180 Util.checkBufferSize("SFC.max_credits", max_credits); 181 MAX_CREDITS=new Long (max_credits); 182 curr_credits_available=max_credits; 183 184 if(!props.isEmpty()) { 185 log.error("the following properties are not recognized: " + props); 186 return false; 187 } 188 return true; 189 } 190 191 192 193 public Object down(Event evt) { 194 switch(evt.getType()) { 195 case Event.MSG: 196 Message msg=(Message)evt.getArg(); 197 Address dest=msg.getDest(); 198 if(dest != null && !dest.isMulticastAddress()) break; 200 201 boolean send_credit_request=false; 202 lock.lock(); 203 try { 204 while(curr_credits_available <=0 && running) { 205 if(log.isTraceEnabled()) 206 log.trace("blocking (current credits=" + curr_credits_available + ")"); 207 try { 208 num_blockings++; 209 boolean rc=credits_available.await(max_block_time, TimeUnit.MILLISECONDS); 211 if(rc || (curr_credits_available <=0 && running)) { 212 if(log.isTraceEnabled()) 213 log.trace("returned from await but credits still unavailable (credits=" +curr_credits_available +")"); 214 long now=System.currentTimeMillis(); 215 if(now - last_blocked_request >= max_block_time) { 216 last_blocked_request=now; 217 lock.unlock(); try { 219 sendCreditRequest(true); 220 } 221 finally { 222 lock.lock(); } 224 } 225 } 226 else { 227 last_blocked_request=0; 230 } 231 } 232 catch(InterruptedException e) { 233 } 239 } 240 241 int len=msg.getLength(); 243 num_bytes_sent+=len; 244 curr_credits_available-=len; if(curr_credits_available <=0) { 246 pending_creditors.clear(); 247 synchronized(members) { 248 pending_creditors.addAll(members); 249 } 250 send_credit_request=true; 251 } 252 } 253 finally { 254 lock.unlock(); 255 } 256 257 if(send_credit_request) { 261 if(log.isTraceEnabled()) 262 log.trace("sending credit request to group"); 263 start=System.nanoTime(); Object ret=down_prot.down(evt); sendCreditRequest(false); return ret; 267 } 268 break; 269 270 case Event.VIEW_CHANGE: 271 handleViewChange((View)evt.getArg()); 272 break; 273 274 case Event.SUSPECT: 275 handleSuspect((Address)evt.getArg()); 276 break; 277 } 278 279 return down_prot.down(evt); 280 } 281 282 283 284 public Object up(Event evt) { 285 switch(evt.getType()) { 286 287 case Event.MSG: 288 Message msg=(Message)evt.getArg(); 289 Header hdr=(Header)msg.getHeader(name); 290 Address sender=msg.getSrc(); 291 if(hdr != null) { 292 switch(hdr.type) { 293 case Header.CREDIT_REQUEST: 294 handleCreditRequest(sender, false); 295 break; 296 case Header.URGENT_CREDIT_REQUEST: 297 handleCreditRequest(sender, true); 298 break; 299 case Header.REPLENISH: 300 handleCreditResponse(sender); 301 break; 302 default: 303 if(log.isErrorEnabled()) 304 log.error("unknown header type " + hdr.type); 305 break; 306 } 307 return null; } 309 310 Address dest=msg.getDest(); 311 if(dest != null && !dest.isMulticastAddress()) break; 313 314 handleMessage(msg, sender); 315 break; 316 317 case Event.VIEW_CHANGE: 318 handleViewChange((View)evt.getArg()); 319 break; 320 321 case Event.SUSPECT: 322 handleSuspect((Address)evt.getArg()); 323 break; 324 } 325 return up_prot.up(evt); 326 } 327 328 329 330 331 public void start() throws Exception { 332 super.start(); 333 running=true; 334 } 335 336 337 public void stop() { 338 super.stop(); 339 running=false; 340 lock.lock(); 341 try { 342 credits_available.signalAll(); 343 } 344 finally { 345 lock.unlock(); 346 } 347 } 348 349 350 private void handleMessage(Message msg, Address sender) { 351 int len=msg.getLength(); 353 Long new_val; 354 boolean send_credit_response=false; 355 356 received_lock.lock(); 357 try { 358 Long credits=received.get(sender); 359 if(credits == null) { 360 new_val=MAX_CREDITS; 361 received.put(sender, new_val); 362 } 363 else { 364 new_val=credits.longValue() + len; 365 received.put(sender, new_val); 366 } 367 370 if(!pending_requesters.isEmpty() 372 && pending_requesters.contains(sender) 373 && new_val.longValue() >= max_credits) { 374 pending_requesters.remove(sender); 375 if(log.isTraceEnabled()) 376 log.trace("removed " + sender + " from credit requesters; sending credits"); 377 received.put(sender, ZERO_CREDITS); 378 send_credit_response=true; 379 } 380 } 381 finally { 382 received_lock.unlock(); 383 } 384 385 if(send_credit_response) sendCreditResponse(sender); 387 } 388 389 390 private void handleCreditRequest(Address sender, boolean urgent) { 391 boolean send_credit_response=false; 392 393 received_lock.lock(); 394 try { 395 num_credit_requests_received++; 396 Long bytes=received.get(sender); 397 if(log.isTraceEnabled()) 398 log.trace("received credit request from " + sender + " (total received: " + bytes + " bytes"); 399 400 if(bytes == null) { 401 if(log.isErrorEnabled()) 402 log.error("received credit request from " + sender + ", but sender is not in received hashmap;" + 403 " adding it"); 404 send_credit_response=true; 405 } 406 else { 407 if(bytes.longValue() < max_credits && !urgent) { 408 if(log.isTraceEnabled()) 409 log.trace("adding " + sender + " to pending credit requesters"); 410 pending_requesters.add(sender); 411 } 412 else { 413 send_credit_response=true; 414 } 415 } 416 if(send_credit_response) 417 received.put(sender, ZERO_CREDITS); 418 } 419 finally{ 420 received_lock.unlock(); 421 } 422 423 if(send_credit_response) { 424 sendCreditResponse(sender); 425 } 426 } 427 428 private void handleCreditResponse(Address sender) { 429 lock.lock(); 430 try { 431 num_replenishments_received++; 432 if(pending_creditors.remove(sender) && pending_creditors.isEmpty()) { 433 curr_credits_available=max_credits; 434 stop=System.nanoTime(); 435 long diff=(stop-start)/1000000L; 436 if(log.isTraceEnabled()) 437 log.trace("replenished credits to " + curr_credits_available + 438 " (total blocking time=" + diff + " ms)"); 439 blockings.add(new Long (diff)); 440 total_block_time+=diff; 441 credits_available.signalAll(); 442 } 443 } 444 finally{ 445 lock.unlock(); 446 } 447 } 448 449 450 451 private void handleViewChange(View view) { 452 List<Address> mbrs=view != null? view.getMembers() : null; 453 if(mbrs != null) { 454 synchronized(members) { 455 members.clear(); 456 members.addAll(mbrs); 457 } 458 } 459 460 lock.lock(); 461 try { 462 if(pending_creditors.retainAll(members) && pending_creditors.isEmpty()) { 464 curr_credits_available=max_credits; 466 if(log.isTraceEnabled()) 467 log.trace("replenished credits to " + curr_credits_available); 468 credits_available.signalAll(); 469 } 470 } 471 finally { 472 lock.unlock(); 473 } 474 475 received_lock.lock(); 476 try { 477 received.keySet().retainAll(members); 479 480 for(Address mbr: members) { 482 if(!received.containsKey(mbr)) 483 received.put(mbr, MAX_CREDITS); 484 } 485 486 pending_requesters.retainAll(members); 488 } 489 finally{ 490 received_lock.unlock(); 491 } 492 } 493 494 495 private void handleSuspect(Address suspected_mbr) { 496 handleCreditResponse(suspected_mbr); 498 } 499 500 501 private void sendCreditRequest(boolean urgent) { 502 Message credit_req=new Message(); 503 byte type=urgent? Header.URGENT_CREDIT_REQUEST : Header.CREDIT_REQUEST; 505 credit_req.putHeader(name, new Header(type)); 506 num_credit_requests_sent++; 507 down_prot.down(new Event(Event.MSG, credit_req)); 508 } 509 510 private void sendCreditResponse(Address dest) { 511 Message credit_rsp=new Message(dest); 512 credit_rsp.setFlag(Message.OOB); 513 Header hdr=new Header(Header.REPLENISH); 514 credit_rsp.putHeader(name, hdr); 515 if(log.isTraceEnabled()) 516 log.trace("sending credit response to " + dest); 517 num_replenishments_sent++; 518 down_prot.down(new Event(Event.MSG, credit_rsp)); 519 } 520 521 522 523 public static class Header extends org.jgroups.Header implements Streamable { 524 public static final byte CREDIT_REQUEST = 1; public static final byte REPLENISH = 2; public static final byte URGENT_CREDIT_REQUEST = 3; 527 528 byte type=CREDIT_REQUEST; 529 530 public Header() { 531 532 } 533 534 public Header(byte type) { 535 this.type=type; 536 } 537 538 public int size() { 539 return Global.BYTE_SIZE; 540 } 541 542 public void writeExternal(ObjectOutput out) throws IOException { 543 out.writeByte(type); 544 } 545 546 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 547 type=in.readByte(); 548 } 549 550 public void writeTo(DataOutputStream out) throws IOException { 551 out.writeByte(type); 552 } 553 554 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 555 type=in.readByte(); 556 } 557 558 public String toString() { 559 switch(type) { 560 case REPLENISH: return "REPLENISH"; 561 case CREDIT_REQUEST: return "CREDIT_REQUEST"; 562 case URGENT_CREDIT_REQUEST: return "URGENT_CREDIT_REQUEST"; 563 default: return "<invalid type>"; 564 } 565 } 566 } 567 568 569 } 570 | Popular Tags |