1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.View; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.ExposedByteArrayOutputStream; 11 import org.jgroups.util.Util; 12 13 import java.io.ByteArrayInputStream ; 14 import java.io.DataInputStream ; 15 import java.io.DataOutputStream ; 16 import java.util.*; 17 18 19 20 33 public class FRAG extends Protocol { 34 private int frag_size=8192; 36 39 private final FragmentationList fragment_list=new FragmentationList(); 40 private int curr_id=1; 41 private final ExposedByteArrayOutputStream bos=new ExposedByteArrayOutputStream(1024); 42 private final Vector members=new Vector(11); 43 private final static String name="FRAG"; 44 45 46 public String getName() { 47 return name; 48 } 49 50 51 54 public boolean setProperties(Properties props) { 55 String str; 56 57 super.setProperties(props); 58 str=props.getProperty("frag_size"); 59 if(str != null) { 60 frag_size=Integer.parseInt(str); 61 props.remove("frag_size"); 62 } 63 64 if(props.size() > 0) { 65 System.err.println("FRAG.setProperties(): the following properties are not recognized:"); 66 props.list(System.out); 67 return false; 68 } 69 return true; 70 } 71 72 73 77 public void down(Event evt) { 78 switch(evt.getType()) { 79 80 case Event.MSG: 81 Message msg=(Message)evt.getArg(); 82 long size=msg.size(); 83 if(size > frag_size) { 84 if(log.isTraceEnabled()) { 85 StringBuffer sb=new StringBuffer ("message size is "); 86 sb.append(size).append(", will fragment (frag_size=").append(frag_size).append(')'); 87 log.trace(sb.toString()); 88 } 89 fragment(msg); return; 91 } 92 break; 93 94 case Event.VIEW_CHANGE: 95 View view=(View)evt.getArg(); 99 Vector new_mbrs=view.getMembers(), left_mbrs; 100 Address mbr; 101 102 left_mbrs=Util.determineLeftMembers(members, new_mbrs); 103 members.clear(); 104 members.addAll(new_mbrs); 105 106 for(int i=0; i < left_mbrs.size(); i++) { 107 mbr=(Address)left_mbrs.elementAt(i); 108 fragment_list.remove(mbr); 111 if(log.isTraceEnabled()) 112 log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table"); 113 } 114 break; 115 116 case Event.CONFIG: 117 passDown(evt); 118 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 119 handleConfigEvent((HashMap)evt.getArg()); 120 return; 121 } 122 123 passDown(evt); } 125 126 127 130 public void up(Event evt) { 131 switch(evt.getType()) { 132 133 case Event.MSG: 134 Message msg=(Message)evt.getArg(); 135 Object obj=msg.getHeader(name); 136 137 if(obj != null && obj instanceof FragHeader) { unfragment(msg); return; 140 } 141 break; 142 143 case Event.CONFIG: 144 passUp(evt); 145 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 146 handleConfigEvent((HashMap)evt.getArg()); 147 return; 148 } 149 150 passUp(evt); } 152 153 154 166 private void fragment(Message msg) { 167 DataOutputStream out=null; 168 byte[] buffer; 169 byte[] fragments[]; 170 Event evt; 171 FragHeader hdr; 172 Message frag_msg; 173 Address dest=msg.getDest(), SRC=msg.getSrc(); 174 long id=curr_id++; int num_frags; 176 177 try { 178 bos.reset(); 180 out=new DataOutputStream (bos); 181 msg.writeTo(out); 182 out.flush(); 183 buffer=bos.getRawBuffer(); 184 fragments=Util.fragmentBuffer(buffer, frag_size, bos.size()); 185 num_frags=fragments.length; 186 187 if(log.isTraceEnabled()) { 188 StringBuffer sb=new StringBuffer (); 189 sb.append("fragmenting packet to ").append(dest != null ? dest.toString() : "<all members>"); 190 sb.append(" (size=").append(buffer.length).append(") into ").append(num_frags); 191 sb.append(" fragment(s) [frag_size=").append(frag_size).append(']'); 192 log.trace(sb.toString()); 193 } 194 195 for(int i=0; i < num_frags; i++) { 196 frag_msg=new Message(dest, src, fragments[i]); 197 hdr=new FragHeader(id, i, num_frags); 198 frag_msg.putHeader(name, hdr); 199 evt=new Event(Event.MSG, frag_msg); 200 passDown(evt); 201 } 202 } 203 catch(Exception e) { 204 log.error("exception is " + e); 205 } 206 finally { 207 Util.closeOutputStream(out); 208 } 209 } 210 211 212 219 private void unfragment(Message msg) { 220 FragmentationTable frag_table=null; 221 Address sender=msg.getSrc(); 222 Message assembled_msg; 223 FragHeader hdr=(FragHeader)msg.removeHeader(name); 224 byte[] m; 225 ByteArrayInputStream bis; 226 DataInputStream in=null; 227 228 frag_table=fragment_list.get(sender); 229 if(frag_table == null) { 230 frag_table=new FragmentationTable(sender); 231 try { 232 fragment_list.add(sender, frag_table); 233 } 234 catch(IllegalArgumentException x) { frag_table=fragment_list.get(sender); 236 } 237 } 238 m=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer()); 239 if(m != null) { 240 try { 241 bis=new ByteArrayInputStream (m); 242 in=new DataInputStream (bis); 243 assembled_msg=new Message(); 244 assembled_msg.readFrom(in); 245 if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg); 246 assembled_msg.setSrc(sender); passUp(new Event(Event.MSG, assembled_msg)); 248 } 249 catch(Exception e) { 250 log.error("exception is " + e); 251 } 252 finally { 253 Util.closeInputStream(in); 254 } 255 } 256 } 257 258 259 void handleConfigEvent(HashMap map) { 260 if(map == null) return; 261 if(map.containsKey("frag_size")) { 262 frag_size=((Integer )map.get("frag_size")).intValue(); 263 if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size); 264 } 265 } 266 267 268 269 270 278 static class FragmentationList { 279 283 private final HashMap frag_tables=new HashMap(11); 284 285 286 294 public void add(Address sender, FragmentationTable table) throws IllegalArgumentException { 295 FragmentationTable healthCheck; 296 297 synchronized(frag_tables) { 298 healthCheck=(FragmentationTable)frag_tables.get(sender); 299 if(healthCheck == null) { 300 frag_tables.put(sender, table); 301 } 302 else { 303 throw new IllegalArgumentException ("Sender <" + sender + "> already exists in the fragementation list."); 304 } 305 } 306 } 307 308 313 public FragmentationTable get(Address sender) { 314 synchronized(frag_tables) { 315 return (FragmentationTable)frag_tables.get(sender); 316 } 317 } 318 319 320 326 public boolean containsSender(Address sender) { 327 synchronized(frag_tables) { 328 return frag_tables.containsKey(sender); 329 } 330 } 331 332 339 public boolean remove(Address sender) { 340 synchronized(frag_tables) { 341 boolean result=containsSender(sender); 342 frag_tables.remove(sender); 343 return result; 344 } 345 } 346 347 351 public Address[] getSenders() { 352 Address[] result; 353 int index=0; 354 355 synchronized(frag_tables) { 356 result=new Address[frag_tables.size()]; 357 for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) { 358 result[index++]=(Address)it.next(); 359 } 360 } 361 return result; 362 } 363 364 public String toString() { 365 Map.Entry entry; 366 StringBuffer buf=new StringBuffer ("Fragmentation list contains "); 367 synchronized(frag_tables) { 368 buf.append(frag_tables.size()).append(" tables\n"); 369 for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) { 370 entry=(Map.Entry)it.next(); 371 buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n"); 372 } 373 } 374 return buf.toString(); 375 } 376 377 } 378 379 385 static class FragmentationTable { 386 private final Address sender; 387 388 private final Hashtable h=new Hashtable(11); 390 391 public FragmentationTable(Address sender) { 392 this.sender=sender; 393 } 394 395 396 402 static class Entry { 403 int tot_frags=0; 405 byte[] fragments[]=null; 407 int number_of_frags_recvd=0; 409 long msg_id=-1; 411 412 417 Entry(long msg_id, int tot_frags) { 418 this.msg_id=msg_id; 419 this.tot_frags=tot_frags; 420 fragments=new byte[tot_frags][]; 421 for(int i=0; i < tot_frags; i++) { 422 fragments[i]=null; 423 } 424 } 425 426 432 public void set(int frag_id, byte[] frag) { 433 fragments[frag_id]=frag; 434 number_of_frags_recvd++; 435 } 436 437 441 public boolean isComplete() { 442 443 if(number_of_frags_recvd < tot_frags) { 444 return false; 445 } 446 447 for(int i=0; i < fragments.length; i++) { 448 if(fragments[i] == null) 449 return false; 450 } 451 452 return true; 453 } 454 455 461 public byte[] assembleBuffer() { 462 return Util.defragmentBuffer(fragments); 463 } 464 465 468 public String toString() { 469 StringBuffer ret=new StringBuffer (); 470 ret.append("[tot_frags=" + tot_frags + ", number_of_frags_recvd=" + number_of_frags_recvd + ']'); 471 return ret.toString(); 472 } 473 474 public int hashCode() { 475 return super.hashCode(); 476 } 477 } 478 479 480 491 public synchronized byte[] add(long id, int frag_id, int tot_frags, byte[] fragment) { 492 493 494 byte[] retval=null; 495 496 Entry e=(Entry)h.get(new Long (id)); 497 498 if(e == null) { e=new Entry(id, tot_frags); 500 h.put(new Long (id), e); 501 } 502 503 e.set(frag_id, fragment); 504 if(e.isComplete()) { 505 retval=e.assembleBuffer(); 506 h.remove(new Long (id)); 507 } 508 509 return retval; 510 } 511 512 public void reset() { 513 } 514 515 public String toString() { 516 StringBuffer buf=new StringBuffer ("Fragmentation Table Sender:").append(sender).append("\n\t"); 517 java.util.Enumeration e=this.h.elements(); 518 while(e.hasMoreElements()) { 519 Entry entry=(Entry)e.nextElement(); 520 int count=0; 521 for(int i=0; i < entry.fragments.length; i++) { 522 if(entry.fragments[i] != null) { 523 count++; 524 } 525 } 526 buf.append("Message ID:").append(entry.msg_id).append("\n\t"); 527 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t"); 528 buf.append("Frags Received:").append(count).append("\n\n"); 529 } 530 return buf.toString(); 531 } 532 } 533 534 } 535 536 537 | Popular Tags |