KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FRAG2


1 // $Id: FRAG2.java,v 1.15 2005/04/20 20:25:46 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.View;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Range;
11 import org.jgroups.util.Util;
12
13 import java.util.*;
14
15
16 /**
17  * Fragmentation layer. Fragments messages larger than frag_size into smaller packets.
18  * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
19  * to the messages as a header (and removed at the receiving side).<p>
20  * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
21  * (b) the fragmentation ID (which is unique per FRAG2 layer (monotonically increasing) and (c) the
22  * fragement ID which ranges from 0 to number_of_fragments-1.<p>
23  * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
24  * multicast messages.<br/>
25  * Compared to FRAG, this protocol does <em>not</em> need to serialize the message in order to break it into
26  * smaller fragments: it looks only at the message's buffer, which is a byte[] array anyway. We assume that the
27  * size addition for headers and src and dest address is minimal when the transport finally has to serialize the
28  * message, so we add a constant (1000 bytes).
29  * @author Bela Ban
30  * @version $Id: FRAG2.java,v 1.15 2005/04/20 20:25:46 belaban Exp $
31  */

32 public class FRAG2 extends Protocol {
33
34     /** The max number of bytes in a message. If a message's buffer is bigger, it will be fragmented */
35     int frag_size=1500;
36
37     /** Number of bytes that we think the headers plus src and dest will take up when
38         message is serialized by transport. This will be subtracted from frag_size */

39     int overhead=50;
40
41     /*the fragmentation list contains a fragmentation table per sender
42      *this way it becomes easier to clean up if a sender (member) leaves or crashes
43      */

44     private final FragmentationList fragment_list=new FragmentationList();
45     private int curr_id=1;
46     private final Vector members=new Vector(11);
47     private static final String JavaDoc name="FRAG2";
48
49
50     public final String JavaDoc getName() {
51         return name;
52     }
53
54
55     /** Setup the Protocol instance acording to the configuration string */
56     public boolean setProperties(Properties props) {
57         String JavaDoc str;
58
59         super.setProperties(props);
60         str=props.getProperty("frag_size");
61         if(str != null) {
62             frag_size=Integer.parseInt(str);
63             props.remove("frag_size");
64         }
65
66         str=props.getProperty("overhead");
67         if(str != null) {
68             overhead=Integer.parseInt(str);
69             props.remove("overhead");
70         }
71
72         int old_frag_size=frag_size;
73         frag_size-=overhead;
74         if(frag_size <=0) {
75             log.error("frag_size=" + old_frag_size + ", overhead=" + overhead +
76                       ", new frag_size=" + frag_size + ": new frag_size is invalid");
77             return false;
78         }
79
80         if(log.isInfoEnabled())
81             log.info("frag_size=" + old_frag_size + ", overhead=" + overhead + ", new frag_size=" + frag_size);
82
83         if(props.size() > 0) {
84             System.err.println("FRAG2.setProperties(): the following properties are not recognized:");
85             props.list(System.out);
86             return false;
87         }
88         return true;
89     }
90
91
92
93
94     /**
95      * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
96      * add a header if framentation is needed !
97      */

98     public void down(Event evt) {
99         switch(evt.getType()) {
100
101             case Event.MSG:
102                 Message msg=(Message)evt.getArg();
103                 long size=msg.getLength();
104                 if(size > frag_size) {
105                     if(log.isTraceEnabled()) {
106                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc("message's buffer size is ");
107                         sb.append(size).append(", will fragment ").append("(frag_size=");
108                         sb.append(frag_size).append(')');
109                         log.trace(sb.toString());
110                     }
111                     fragment(msg); // Fragment and pass down
112
return;
113                 }
114                 break;
115
116             case Event.VIEW_CHANGE:
117                 //don't do anything if this dude is sending out the view change
118
//we are receiving a view change,
119
//in here we check for the
120
View view=(View)evt.getArg();
121                 Vector new_mbrs=view.getMembers(), left_mbrs;
122                 Address mbr;
123
124                 left_mbrs=Util.determineLeftMembers(members, new_mbrs);
125                 members.clear();
126                 members.addAll(new_mbrs);
127
128                 for(int i=0; i < left_mbrs.size(); i++) {
129                     mbr=(Address)left_mbrs.elementAt(i);
130                     //the new view doesn't contain the sender, he must have left,
131
//hence we will clear all his fragmentation tables
132
fragment_list.remove(mbr);
133                     if(log.isTraceEnabled()) log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table");
134                 }
135                 break;
136
137             case Event.CONFIG:
138                 passDown(evt);
139                  if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
140                 handleConfigEvent((HashMap)evt.getArg());
141                 return;
142         }
143
144         passDown(evt); // Pass on to the layer below us
145
}
146
147
148     /**
149      * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up
150      * the stack.
151      * todo: Filip catch the view change event so that we can clean up old members
152      */

153     public void up(Event evt) {
154         switch(evt.getType()) {
155
156             case Event.MSG:
157                 Message msg=(Message)evt.getArg();
158                 Object JavaDoc obj=msg.getHeader(name);
159                 if(obj != null && obj instanceof FragHeader) { // needs to be defragmented
160
unfragment(msg); // Unfragment and possibly pass up
161
return;
162                 }
163                 break;
164
165             case Event.CONFIG:
166                 passUp(evt);
167                  if(log.isInfoEnabled()) log.info("received CONFIG event: " + evt.getArg());
168                 handleConfigEvent((HashMap)evt.getArg());
169                 return;
170         }
171
172         passUp(evt); // Pass up to the layer above us by default
173
}
174
175
176     /** Send all fragments as separate messages (with same ID !).
177      Example:
178      <pre>
179      Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
180      would be fragmented into:
181
182      [2344,3,0]{dst,src,buf1},
183      [2344,3,1]{dst,src,buf2} and
184      [2344,3,2]{dst,src,buf3}
185      </pre>
186      */

187     void fragment(Message msg) {
188         byte[] buffer;
189         List fragments;
190         Event evt;
191         FragHeader hdr;
192         Message frag_msg=null;
193         Address dest=msg.getDest();
194         long id=curr_id++; // used as seqnos
195
int num_frags=0;
196         StringBuffer JavaDoc sb;
197         Range r;
198
199         try {
200             buffer=msg.getBuffer();
201             fragments=Util.computeFragOffsets(buffer, frag_size);
202             num_frags=fragments.size();
203
204             if(log.isTraceEnabled()) {
205                 sb=new StringBuffer JavaDoc("fragmenting packet to ");
206                 sb.append((dest != null ? dest.toString() : "<all members>")).append(" (size=").append(buffer.length);
207                 sb.append(") into ").append(num_frags).append(" fragment(s) [frag_size=").append(frag_size).append(']');
208                 log.trace(sb.toString());
209             }
210
211             for(int i=0; i < fragments.size(); i++) {
212                 r=(Range)fragments.get(i);
213                 // Copy the original msg (needed because we need to copy the headers too)
214
frag_msg=msg.copy(false); // don't copy the buffer, only src, dest and headers
215
frag_msg.setBuffer(buffer, (int)r.low, (int)r.high);
216                 hdr=new FragHeader(id, i, num_frags);
217                 frag_msg.putHeader(name, hdr);
218                 evt=new Event(Event.MSG, frag_msg);
219                 passDown(evt);
220             }
221         }
222         catch(Exception JavaDoc e) {
223             if(log.isErrorEnabled()) log.error("exception is " + e);
224         }
225     }
226
227
228     /**
229      1. Get all the fragment buffers
230      2. When all are received -> Assemble them into one big buffer
231      3. Read headers and byte buffer from big buffer
232      4. Set headers and buffer in msg
233      5. Pass msg up the stack
234      */

235     void unfragment(Message msg) {
236         FragmentationTable frag_table=null;
237         Address sender=msg.getSrc();
238         Message assembled_msg;
239         FragHeader hdr=(FragHeader)msg.removeHeader(name);
240
241         frag_table=fragment_list.get(sender);
242         if(frag_table == null) {
243             frag_table=new FragmentationTable(sender);
244             try {
245                 fragment_list.add(sender, frag_table);
246             }
247             catch(IllegalArgumentException JavaDoc x) { // the entry has already been added, probably in parallel from another thread
248
frag_table=fragment_list.get(sender);
249             }
250         }
251         assembled_msg=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg);
252         if(assembled_msg != null) {
253             try {
254                 if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
255                 assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
256
passUp(new Event(Event.MSG, assembled_msg));
257             }
258             catch(Exception JavaDoc e) {
259                 if(log.isErrorEnabled()) log.error("exception is " + e);
260             }
261         }
262     }
263
264
265     void handleConfigEvent(HashMap map) {
266         if(map == null) return;
267         if(map.containsKey("frag_size")) {
268             frag_size=((Integer JavaDoc)map.get("frag_size")).intValue();
269             if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
270         }
271     }
272
273
274
275
276     /**
277      * A fragmentation list keeps a list of fragmentation tables
278      * sorted by an Address ( the sender ).
279      * This way, if the sender disappears or leaves the group half way
280      * sending the content, we can simply remove this members fragmentation
281      * table and clean up the memory of the receiver.
282      * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
283      */

284     static class FragmentationList {
285         /* * HashMap<Address,FragmentationTable>, initialize the hashtable to hold all the fragmentation
286          * tables (11 is the best growth capacity to start with)
287          */

288         private final HashMap frag_tables=new HashMap(11);
289
290
291         /**
292          * Adds a fragmentation table for this particular sender
293          * If this sender already has a fragmentation table, an IllegalArgumentException
294          * will be thrown.
295          * @param sender - the address of the sender, cannot be null
296          * @param table - the fragmentation table of this sender, cannot be null
297          * @exception IllegalArgumentException if an entry for this sender already exist
298          */

299         public void add(Address sender, FragmentationTable table) throws IllegalArgumentException JavaDoc {
300             FragmentationTable healthCheck;
301
302             synchronized(frag_tables) {
303                 healthCheck=(FragmentationTable)frag_tables.get(sender);
304                 if(healthCheck == null) {
305                     frag_tables.put(sender, table);
306                 }
307                 else {
308                     throw new IllegalArgumentException JavaDoc("Sender <" + sender + "> already exists in the fragementation list.");
309                 }
310             }
311         }
312
313         /**
314          * returns a fragmentation table for this sender
315          * returns null if the sender doesn't have a fragmentation table
316          * @return the fragmentation table for this sender, or null if no table exist
317          */

318         public FragmentationTable get(Address sender) {
319             synchronized(frag_tables) {
320                 return (FragmentationTable)frag_tables.get(sender);
321             }
322         }
323
324
325         /**
326          * returns true if this sender already holds a
327          * fragmentation for this sender, false otherwise
328          * @param sender - the sender, cannot be null
329          * @return true if this sender already has a fragmentation table
330          */

331         public boolean containsSender(Address sender) {
332             synchronized(frag_tables) {
333                 return frag_tables.containsKey(sender);
334             }
335         }
336
337         /**
338          * removes the fragmentation table from the list.
339          * after this operation, the fragementation list will no longer
340          * hold a reference to this sender's fragmentation table
341          * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
342          * @return true if the table was removed, false if the sender doesn't have an entry
343          */

344         public boolean remove(Address sender) {
345             synchronized(frag_tables) {
346                 boolean result=containsSender(sender);
347                 frag_tables.remove(sender);
348                 return result;
349             }
350         }
351
352         /**
353          * returns a list of all the senders that have fragmentation tables
354          * opened.
355          * @return an array of all the senders in the fragmentation list
356          */

357         public Address[] getSenders() {
358             Address[] result;
359             int index=0;
360
361             synchronized(frag_tables) {
362                 result=new Address[frag_tables.size()];
363                 for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) {
364                     result[index++]=(Address)it.next();
365                 }
366             }
367             return result;
368         }
369
370         public String JavaDoc toString() {
371             Map.Entry entry;
372             StringBuffer JavaDoc buf=new StringBuffer JavaDoc("Fragmentation list contains ");
373             synchronized(frag_tables) {
374                 buf.append(frag_tables.size()).append(" tables\n");
375                 for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) {
376                     entry=(Map.Entry)it.next();
377                     buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n");
378                 }
379             }
380             return buf.toString();
381         }
382
383     }
384
385     /**
386      * Keeps track of the fragments that are received.
387      * Reassembles fragements into entire messages when all fragments have been received.
388      * The fragmentation holds a an array of byte arrays for a unique sender
389      * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
390      */

391     static class FragmentationTable {
392         private final Address sender;
393         /* the hashtable that holds the fragmentation entries for this sender*/
394         private final Hashtable h=new Hashtable(11); // keys: frag_ids, vals: Entrys
395

396
397         public FragmentationTable(Address sender) {
398             this.sender=sender;
399         }
400
401
402         /**
403          * inner class represents an entry for a message
404          * each entry holds an array of byte arrays sorted
405          * once all the byte buffer entries have been filled
406          * the fragmentation is considered complete.
407          */

408         static class Entry {
409             //the total number of fragment in this message
410
int tot_frags=0;
411             // each fragment is a byte buffer
412
Message fragments[]=null;
413             //the number of fragments we have received
414
int number_of_frags_recvd=0;
415             // the message ID
416
long msg_id=-1;
417
418             /**
419              * Creates a new entry
420              * @param tot_frags the number of fragments to expect for this message
421              */

422             Entry(long msg_id, int tot_frags) {
423                 this.msg_id=msg_id;
424                 this.tot_frags=tot_frags;
425                 fragments=new Message[tot_frags];
426                 for(int i=0; i < tot_frags; i++)
427                     fragments[i]=null;
428             }
429
430             /**
431              * adds on fragmentation buffer to the message
432              * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
433              * @param frag the byte buffer containing the data for this fragmentation, should not be null
434              */

435             public void set(int frag_id, Message frag) {
436                  // don't count an already received fragment (should not happen though because the
437
// reliable transmission protocol(s) below should weed out duplicates
438
if(fragments[frag_id] == null) {
439                     fragments[frag_id]=frag;
440                     number_of_frags_recvd++;
441                 }
442             }
443
444             /** returns true if this fragmentation is complete
445              * ie, all fragmentations have been received for this buffer
446              *
447              */

448             public boolean isComplete() {
449                 /*first make the simple check*/
450                 if(number_of_frags_recvd < tot_frags) {
451                     return false;
452                 }
453                 /*then double check just in case*/
454                 for(int i=0; i < fragments.length; i++) {
455                     if(fragments[i] == null)
456                         return false;
457                 }
458                 /*all fragmentations have been received*/
459                 return true;
460             }
461
462             /**
463              * Assembles all the fragments into one buffer. Takes all Messages, and combines their buffers into one
464              * buffer.
465              * This method does not check if the fragmentation is complete (use {@link #isComplete()} to verify
466              * before calling this method)
467              * @return the complete message in one buffer
468              *
469              */

470             public Message assembleMessage() {
471                 Message retval=null;
472                 byte[] combined_buffer, tmp;
473                 int combined_length=0, length, offset;
474                 Message fragment;
475                 int index=0;
476
477                 for(int i=0; i < fragments.length; i++) {
478                     fragment=fragments[i];
479                     combined_length+=fragment.getLength();
480                 }
481
482                 combined_buffer=new byte[combined_length];
483                 for(int i=0; i < fragments.length; i++) {
484                     fragment=fragments[i];
485                     tmp=fragment.getRawBuffer();
486                     length=fragment.getLength();
487                     offset=fragment.getOffset();
488                     System.arraycopy(tmp, offset, combined_buffer, index, length);
489                     index+=length;
490                 }
491
492                 retval=fragments[0].copy(false);
493                 retval.setBuffer(combined_buffer);
494                 return retval;
495             }
496
497             /**
498              * debug only
499              */

500             public String JavaDoc toString() {
501                 StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
502                 ret.append("[tot_frags=" + tot_frags + ", number_of_frags_recvd=" + number_of_frags_recvd + ']');
503                 return ret.toString();
504             }
505
506             public int hashCode() {
507                 return super.hashCode();
508             }
509         }
510
511
512         /**
513          * Creates a new entry if not yet present. Adds the fragment.
514          * If all fragements for a given message have been received,
515          * an entire message is reassembled and returned.
516          * Otherwise null is returned.
517          * @param id - the message ID, unique for a sender
518          * @param frag_id the index of this fragmentation (0..tot_frags-1)
519          * @param tot_frags the total number of fragmentations expected
520          * @param fragment - the byte buffer for this fragment
521          */

522         public synchronized Message add(long id, int frag_id, int tot_frags, Message fragment) {
523             Message retval=null;
524
525             Entry e=(Entry)h.get(new Long JavaDoc(id));
526
527             if(e == null) { // Create new entry if not yet present
528
e=new Entry(id, tot_frags);
529                 h.put(new Long JavaDoc(id), e);
530             }
531
532             e.set(frag_id, fragment);
533             if(e.isComplete()) {
534                 retval=e.assembleMessage();
535                 h.remove(new Long JavaDoc(id));
536             }
537
538             return retval;
539         }
540
541
542         public void reset() {
543         }
544
545         public String JavaDoc toString() {
546             StringBuffer JavaDoc buf=new StringBuffer JavaDoc("Fragmentation Table Sender:").append(sender).append("\n\t");
547             java.util.Enumeration JavaDoc e=this.h.elements();
548             while(e.hasMoreElements()) {
549                 Entry entry=(Entry)e.nextElement();
550                 int count=0;
551                 for(int i=0; i < entry.fragments.length; i++) {
552                     if(entry.fragments[i] != null) {
553                         count++;
554                     }
555                 }
556                 buf.append("Message ID:").append(entry.msg_id).append("\n\t");
557                 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t");
558                 buf.append("Frags Received:").append(count).append("\n\n");
559             }
560             return buf.toString();
561         }
562     }
563
564 }
565
566
567
Popular Tags