1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.View; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Range; 11 import org.jgroups.util.Util; 12 13 import java.util.*; 14 15 16 32 public class FRAG2 extends Protocol { 33 34 35 int frag_size=1500; 36 37 39 int overhead=50; 40 41 44 private final FragmentationList fragment_list=new FragmentationList(); 45 private int curr_id=1; 46 private final Vector members=new Vector(11); 47 private static final String name="FRAG2"; 48 49 50 public final String getName() { 51 return name; 52 } 53 54 55 56 public boolean setProperties(Properties props) { 57 String str; 58 59 super.setProperties(props); 60 str=props.getProperty("frag_size"); 61 if(str != null) { 62 frag_size=Integer.parseInt(str); 63 props.remove("frag_size"); 64 } 65 66 str=props.getProperty("overhead"); 67 if(str != null) { 68 overhead=Integer.parseInt(str); 69 props.remove("overhead"); 70 } 71 72 int old_frag_size=frag_size; 73 frag_size-=overhead; 74 if(frag_size <=0) { 75 log.error("frag_size=" + old_frag_size + ", overhead=" + overhead + 76 ", new frag_size=" + frag_size + ": new frag_size is invalid"); 77 return false; 78 } 79 80 if(log.isInfoEnabled()) 81 log.info("frag_size=" + old_frag_size + ", overhead=" + overhead + ", new frag_size=" + frag_size); 82 83 if(props.size() > 0) { 84 System.err.println("FRAG2.setProperties(): the following properties are not recognized:"); 85 props.list(System.out); 86 return false; 87 } 88 return true; 89 } 90 91 92 93 94 98 public void down(Event evt) { 99 switch(evt.getType()) { 100 101 case Event.MSG: 102 Message msg=(Message)evt.getArg(); 103 long size=msg.getLength(); 104 if(size > frag_size) { 105 if(log.isTraceEnabled()) { 106 StringBuffer sb=new StringBuffer ("message's buffer size is "); 107 sb.append(size).append(", will fragment ").append("(frag_size="); 108 sb.append(frag_size).append(')'); 109 log.trace(sb.toString()); 110 } 111 fragment(msg); return; 113 } 114 break; 115 116 case Event.VIEW_CHANGE: 117 View view=(View)evt.getArg(); 121 Vector new_mbrs=view.getMembers(), left_mbrs; 122 Address mbr; 123 124 left_mbrs=Util.determineLeftMembers(members, new_mbrs); 125 members.clear(); 126 members.addAll(new_mbrs); 127 128 for(int i=0; i < left_mbrs.size(); i++) { 129 mbr=(Address)left_mbrs.elementAt(i); 130 fragment_list.remove(mbr); 133 if(log.isTraceEnabled()) log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table"); 134 } 135 break; 136 137 case Event.CONFIG: 138 passDown(evt); 139 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 140 handleConfigEvent((HashMap)evt.getArg()); 141 return; 142 } 143 144 passDown(evt); } 146 147 148 153 public void up(Event evt) { 154 switch(evt.getType()) { 155 156 case Event.MSG: 157 Message msg=(Message)evt.getArg(); 158 Object obj=msg.getHeader(name); 159 if(obj != null && obj instanceof FragHeader) { unfragment(msg); return; 162 } 163 break; 164 165 case Event.CONFIG: 166 passUp(evt); 167 if(log.isInfoEnabled()) log.info("received CONFIG event: " + evt.getArg()); 168 handleConfigEvent((HashMap)evt.getArg()); 169 return; 170 } 171 172 passUp(evt); } 174 175 176 187 void fragment(Message msg) { 188 byte[] buffer; 189 List fragments; 190 Event evt; 191 FragHeader hdr; 192 Message frag_msg=null; 193 Address dest=msg.getDest(); 194 long id=curr_id++; int num_frags=0; 196 StringBuffer sb; 197 Range r; 198 199 try { 200 buffer=msg.getBuffer(); 201 fragments=Util.computeFragOffsets(buffer, frag_size); 202 num_frags=fragments.size(); 203 204 if(log.isTraceEnabled()) { 205 sb=new StringBuffer ("fragmenting packet to "); 206 sb.append((dest != null ? dest.toString() : "<all members>")).append(" (size=").append(buffer.length); 207 sb.append(") into ").append(num_frags).append(" fragment(s) [frag_size=").append(frag_size).append(']'); 208 log.trace(sb.toString()); 209 } 210 211 for(int i=0; i < fragments.size(); i++) { 212 r=(Range)fragments.get(i); 213 frag_msg=msg.copy(false); frag_msg.setBuffer(buffer, (int)r.low, (int)r.high); 216 hdr=new FragHeader(id, i, num_frags); 217 frag_msg.putHeader(name, hdr); 218 evt=new Event(Event.MSG, frag_msg); 219 passDown(evt); 220 } 221 } 222 catch(Exception e) { 223 if(log.isErrorEnabled()) log.error("exception is " + e); 224 } 225 } 226 227 228 235 void unfragment(Message msg) { 236 FragmentationTable frag_table=null; 237 Address sender=msg.getSrc(); 238 Message assembled_msg; 239 FragHeader hdr=(FragHeader)msg.removeHeader(name); 240 241 frag_table=fragment_list.get(sender); 242 if(frag_table == null) { 243 frag_table=new FragmentationTable(sender); 244 try { 245 fragment_list.add(sender, frag_table); 246 } 247 catch(IllegalArgumentException x) { frag_table=fragment_list.get(sender); 249 } 250 } 251 assembled_msg=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg); 252 if(assembled_msg != null) { 253 try { 254 if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg); 255 assembled_msg.setSrc(sender); passUp(new Event(Event.MSG, assembled_msg)); 257 } 258 catch(Exception e) { 259 if(log.isErrorEnabled()) log.error("exception is " + e); 260 } 261 } 262 } 263 264 265 void handleConfigEvent(HashMap map) { 266 if(map == null) return; 267 if(map.containsKey("frag_size")) { 268 frag_size=((Integer )map.get("frag_size")).intValue(); 269 if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size); 270 } 271 } 272 273 274 275 276 284 static class FragmentationList { 285 288 private final HashMap frag_tables=new HashMap(11); 289 290 291 299 public void add(Address sender, FragmentationTable table) throws IllegalArgumentException { 300 FragmentationTable healthCheck; 301 302 synchronized(frag_tables) { 303 healthCheck=(FragmentationTable)frag_tables.get(sender); 304 if(healthCheck == null) { 305 frag_tables.put(sender, table); 306 } 307 else { 308 throw new IllegalArgumentException ("Sender <" + sender + "> already exists in the fragementation list."); 309 } 310 } 311 } 312 313 318 public FragmentationTable get(Address sender) { 319 synchronized(frag_tables) { 320 return (FragmentationTable)frag_tables.get(sender); 321 } 322 } 323 324 325 331 public boolean containsSender(Address sender) { 332 synchronized(frag_tables) { 333 return frag_tables.containsKey(sender); 334 } 335 } 336 337 344 public boolean remove(Address sender) { 345 synchronized(frag_tables) { 346 boolean result=containsSender(sender); 347 frag_tables.remove(sender); 348 return result; 349 } 350 } 351 352 357 public Address[] getSenders() { 358 Address[] result; 359 int index=0; 360 361 synchronized(frag_tables) { 362 result=new Address[frag_tables.size()]; 363 for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) { 364 result[index++]=(Address)it.next(); 365 } 366 } 367 return result; 368 } 369 370 public String toString() { 371 Map.Entry entry; 372 StringBuffer buf=new StringBuffer ("Fragmentation list contains "); 373 synchronized(frag_tables) { 374 buf.append(frag_tables.size()).append(" tables\n"); 375 for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) { 376 entry=(Map.Entry)it.next(); 377 buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n"); 378 } 379 } 380 return buf.toString(); 381 } 382 383 } 384 385 391 static class FragmentationTable { 392 private final Address sender; 393 394 private final Hashtable h=new Hashtable(11); 396 397 public FragmentationTable(Address sender) { 398 this.sender=sender; 399 } 400 401 402 408 static class Entry { 409 int tot_frags=0; 411 Message fragments[]=null; 413 int number_of_frags_recvd=0; 415 long msg_id=-1; 417 418 422 Entry(long msg_id, int tot_frags) { 423 this.msg_id=msg_id; 424 this.tot_frags=tot_frags; 425 fragments=new Message[tot_frags]; 426 for(int i=0; i < tot_frags; i++) 427 fragments[i]=null; 428 } 429 430 435 public void set(int frag_id, Message frag) { 436 if(fragments[frag_id] == null) { 439 fragments[frag_id]=frag; 440 number_of_frags_recvd++; 441 } 442 } 443 444 448 public boolean isComplete() { 449 450 if(number_of_frags_recvd < tot_frags) { 451 return false; 452 } 453 454 for(int i=0; i < fragments.length; i++) { 455 if(fragments[i] == null) 456 return false; 457 } 458 459 return true; 460 } 461 462 470 public Message assembleMessage() { 471 Message retval=null; 472 byte[] combined_buffer, tmp; 473 int combined_length=0, length, offset; 474 Message fragment; 475 int index=0; 476 477 for(int i=0; i < fragments.length; i++) { 478 fragment=fragments[i]; 479 combined_length+=fragment.getLength(); 480 } 481 482 combined_buffer=new byte[combined_length]; 483 for(int i=0; i < fragments.length; i++) { 484 fragment=fragments[i]; 485 tmp=fragment.getRawBuffer(); 486 length=fragment.getLength(); 487 offset=fragment.getOffset(); 488 System.arraycopy(tmp, offset, combined_buffer, index, length); 489 index+=length; 490 } 491 492 retval=fragments[0].copy(false); 493 retval.setBuffer(combined_buffer); 494 return retval; 495 } 496 497 500 public String toString() { 501 StringBuffer ret=new StringBuffer (); 502 ret.append("[tot_frags=" + tot_frags + ", number_of_frags_recvd=" + number_of_frags_recvd + ']'); 503 return ret.toString(); 504 } 505 506 public int hashCode() { 507 return super.hashCode(); 508 } 509 } 510 511 512 522 public synchronized Message add(long id, int frag_id, int tot_frags, Message fragment) { 523 Message retval=null; 524 525 Entry e=(Entry)h.get(new Long (id)); 526 527 if(e == null) { e=new Entry(id, tot_frags); 529 h.put(new Long (id), e); 530 } 531 532 e.set(frag_id, fragment); 533 if(e.isComplete()) { 534 retval=e.assembleMessage(); 535 h.remove(new Long (id)); 536 } 537 538 return retval; 539 } 540 541 542 public void reset() { 543 } 544 545 public String toString() { 546 StringBuffer buf=new StringBuffer ("Fragmentation Table Sender:").append(sender).append("\n\t"); 547 java.util.Enumeration e=this.h.elements(); 548 while(e.hasMoreElements()) { 549 Entry entry=(Entry)e.nextElement(); 550 int count=0; 551 for(int i=0; i < entry.fragments.length; i++) { 552 if(entry.fragments[i] != null) { 553 count++; 554 } 555 } 556 buf.append("Message ID:").append(entry.msg_id).append("\n\t"); 557 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t"); 558 buf.append("Frags Received:").append(count).append("\n\n"); 559 } 560 return buf.toString(); 561 } 562 } 563 564 } 565 566 567 | Popular Tags |