KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FRAG


1 // $Id: FRAG.java,v 1.21 2005/04/20 13:50:05 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.View;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.ExposedByteArrayOutputStream;
11 import org.jgroups.util.Util;
12
13 import java.io.ByteArrayInputStream JavaDoc;
14 import java.io.DataInputStream JavaDoc;
15 import java.io.DataOutputStream JavaDoc;
16 import java.util.*;
17
18
19
20 /**
21  * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets.
22  * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
23  * to the messages as a header (and removed at the receiving side).<p>
24  * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
25  * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the
26  * fragement ID which ranges from 0 to number_of_fragments-1.<p>
27  * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
28  * multicast messages.
29  * @author Bela Ban
30  * @author Filip Hanik
31  * @version $Id: FRAG.java,v 1.21 2005/04/20 13:50:05 belaban Exp $
32  */

33 public class FRAG extends Protocol {
34     private int frag_size=8192; // conservative value
35

36     /*the fragmentation list contains a fragmentation table per sender
37      *this way it becomes easier to clean up if a sender (member) leaves or crashes
38      */

39     private final FragmentationList fragment_list=new FragmentationList();
40     private int curr_id=1;
41     private final ExposedByteArrayOutputStream bos=new ExposedByteArrayOutputStream(1024);
42     private final Vector members=new Vector(11);
43     private final static String JavaDoc name="FRAG";
44
45
46     public String JavaDoc getName() {
47         return name;
48     }
49
50
51     /**
52      * Setup the Protocol instance acording to the configuration string
53      */

54     public boolean setProperties(Properties props) {
55         String JavaDoc str;
56         
57         super.setProperties(props);
58         str=props.getProperty("frag_size");
59         if(str != null) {
60             frag_size=Integer.parseInt(str);
61             props.remove("frag_size");
62         }
63
64         if(props.size() > 0) {
65             System.err.println("FRAG.setProperties(): the following properties are not recognized:");
66             props.list(System.out);
67             return false;
68         }
69         return true;
70     }
71
72
73     /**
74      * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
75      * add a header if framentation is needed !
76      */

77     public void down(Event evt) {
78         switch(evt.getType()) {
79
80             case Event.MSG:
81                 Message msg=(Message)evt.getArg();
82                 long size=msg.size();
83                 if(size > frag_size) {
84                     if(log.isTraceEnabled()) {
85                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc("message size is ");
86                         sb.append(size).append(", will fragment (frag_size=").append(frag_size).append(')');
87                         log.trace(sb.toString());
88                     }
89                     fragment(msg); // Fragment and pass down
90
return;
91                 }
92                 break;
93
94             case Event.VIEW_CHANGE:
95                 //don't do anything if this dude is sending out the view change
96
//we are receiving a view change,
97
//in here we check for the
98
View view=(View)evt.getArg();
99                 Vector new_mbrs=view.getMembers(), left_mbrs;
100                 Address mbr;
101
102                 left_mbrs=Util.determineLeftMembers(members, new_mbrs);
103                 members.clear();
104                 members.addAll(new_mbrs);
105
106                 for(int i=0; i < left_mbrs.size(); i++) {
107                     mbr=(Address)left_mbrs.elementAt(i);
108                     //the new view doesn't contain the sender, he must have left,
109
//hence we will clear all his fragmentation tables
110
fragment_list.remove(mbr);
111                     if(log.isTraceEnabled())
112                         log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table");
113                 }
114                 break;
115
116             case Event.CONFIG:
117                 passDown(evt);
118                 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
119                 handleConfigEvent((HashMap)evt.getArg());
120                 return;
121         }
122
123         passDown(evt); // Pass on to the layer below us
124
}
125
126
127     /**
128      * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.
129      */

130     public void up(Event evt) {
131         switch(evt.getType()) {
132
133             case Event.MSG:
134                 Message msg=(Message)evt.getArg();
135                 Object JavaDoc obj=msg.getHeader(name);
136
137                 if(obj != null && obj instanceof FragHeader) { // needs to be defragmented
138
unfragment(msg); // Unfragment and possibly pass up
139
return;
140                 }
141                 break;
142
143             case Event.CONFIG:
144                 passUp(evt);
145                 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
146                 handleConfigEvent((HashMap)evt.getArg());
147                 return;
148         }
149
150         passUp(evt); // Pass up to the layer above us by default
151
}
152
153
154     /**
155      * Send all fragments as separate messages (with same ID !).
156      * Example:
157      * <pre>
158      * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
159      * would be fragmented into:
160      * <p/>
161      * [2344,3,0]{dst,src,buf1},
162      * [2344,3,1]{dst,src,buf2} and
163      * [2344,3,2]{dst,src,buf3}
164      * </pre>
165      */

166     private void fragment(Message msg) {
167         DataOutputStream JavaDoc out=null;
168         byte[] buffer;
169         byte[] fragments[];
170         Event evt;
171         FragHeader hdr;
172         Message frag_msg;
173         Address dest=msg.getDest(), SRC=msg.getSrc();
174         long id=curr_id++; // used as seqnos
175
int num_frags;
176
177         try {
178             // Write message into a byte buffer and fragment it
179
bos.reset();
180             out=new DataOutputStream JavaDoc(bos);
181             msg.writeTo(out);
182             out.flush();
183             buffer=bos.getRawBuffer();
184             fragments=Util.fragmentBuffer(buffer, frag_size, bos.size());
185             num_frags=fragments.length;
186
187             if(log.isTraceEnabled()) {
188                 StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
189                 sb.append("fragmenting packet to ").append(dest != null ? dest.toString() : "<all members>");
190                 sb.append(" (size=").append(buffer.length).append(") into ").append(num_frags);
191                 sb.append(" fragment(s) [frag_size=").append(frag_size).append(']');
192                 log.trace(sb.toString());
193             }
194
195             for(int i=0; i < num_frags; i++) {
196                 frag_msg=new Message(dest, src, fragments[i]);
197                 hdr=new FragHeader(id, i, num_frags);
198                 frag_msg.putHeader(name, hdr);
199                 evt=new Event(Event.MSG, frag_msg);
200                 passDown(evt);
201             }
202         }
203         catch(Exception JavaDoc e) {
204             log.error("exception is " + e);
205         }
206         finally {
207             Util.closeOutputStream(out);
208         }
209     }
210
211
212     /**
213      * 1. Get all the fragment buffers
214      * 2. When all are received -> Assemble them into one big buffer
215      * 3. Read headers and byte buffer from big buffer
216      * 4. Set headers and buffer in msg
217      * 5. Pass msg up the stack
218      */

219     private void unfragment(Message msg) {
220         FragmentationTable frag_table=null;
221         Address sender=msg.getSrc();
222         Message assembled_msg;
223         FragHeader hdr=(FragHeader)msg.removeHeader(name);
224         byte[] m;
225         ByteArrayInputStream JavaDoc bis;
226         DataInputStream JavaDoc in=null;
227
228         frag_table=fragment_list.get(sender);
229         if(frag_table == null) {
230             frag_table=new FragmentationTable(sender);
231             try {
232                 fragment_list.add(sender, frag_table);
233             }
234             catch(IllegalArgumentException JavaDoc x) { // the entry has already been added, probably in parallel from another thread
235
frag_table=fragment_list.get(sender);
236             }
237         }
238         m=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer());
239         if(m != null) {
240             try {
241                 bis=new ByteArrayInputStream JavaDoc(m);
242                 in=new DataInputStream JavaDoc(bis);
243                 assembled_msg=new Message();
244                 assembled_msg.readFrom(in);
245                 if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
246                 assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
247
passUp(new Event(Event.MSG, assembled_msg));
248             }
249             catch(Exception JavaDoc e) {
250                 log.error("exception is " + e);
251             }
252             finally {
253                 Util.closeInputStream(in);
254             }
255         }
256     }
257
258
259     void handleConfigEvent(HashMap map) {
260         if(map == null) return;
261         if(map.containsKey("frag_size")) {
262             frag_size=((Integer JavaDoc)map.get("frag_size")).intValue();
263             if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
264         }
265     }
266
267
268
269
270     /**
271      * A fragmentation list keeps a list of fragmentation tables
272      * sorted by an Address ( the sender ).
273      * This way, if the sender disappears or leaves the group half way
274      * sending the content, we can simply remove this members fragmentation
275      * table and clean up the memory of the receiver.
276      * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
277      */

278     static class FragmentationList {
279         /* initialize the hashtable to hold all the fragmentation tables
280          * 11 is the best growth capacity to start with<br/>
281          * HashMap<Address,FragmentationTable>
282          */

283         private final HashMap frag_tables=new HashMap(11);
284
285
286         /**
287          * Adds a fragmentation table for this particular sender
288          * If this sender already has a fragmentation table, an IllegalArgumentException
289          * will be thrown.
290          * @param sender - the address of the sender, cannot be null
291          * @param table - the fragmentation table of this sender, cannot be null
292          * @throws IllegalArgumentException if an entry for this sender already exist
293          */

294         public void add(Address sender, FragmentationTable table) throws IllegalArgumentException JavaDoc {
295             FragmentationTable healthCheck;
296
297             synchronized(frag_tables) {
298                 healthCheck=(FragmentationTable)frag_tables.get(sender);
299                 if(healthCheck == null) {
300                     frag_tables.put(sender, table);
301                 }
302                 else {
303                     throw new IllegalArgumentException JavaDoc("Sender <" + sender + "> already exists in the fragementation list.");
304                 }
305             }
306         }
307
308         /**
309          * returns a fragmentation table for this sender
310          * returns null if the sender doesn't have a fragmentation table
311          * @return the fragmentation table for this sender, or null if no table exist
312          */

313         public FragmentationTable get(Address sender) {
314             synchronized(frag_tables) {
315                 return (FragmentationTable)frag_tables.get(sender);
316             }
317         }
318
319
320         /**
321          * returns true if this sender already holds a
322          * fragmentation for this sender, false otherwise
323          * @param sender - the sender, cannot be null
324          * @return true if this sender already has a fragmentation table
325          */

326         public boolean containsSender(Address sender) {
327             synchronized(frag_tables) {
328                 return frag_tables.containsKey(sender);
329             }
330         }
331
332         /**
333          * removes the fragmentation table from the list.
334          * after this operation, the fragementation list will no longer
335          * hold a reference to this sender's fragmentation table
336          * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
337          * @return true if the table was removed, false if the sender doesn't have an entry
338          */

339         public boolean remove(Address sender) {
340             synchronized(frag_tables) {
341                 boolean result=containsSender(sender);
342                 frag_tables.remove(sender);
343                 return result;
344             }
345         }
346
347         /**
348          * returns a list of all the senders that have fragmentation tables opened.
349          * @return an array of all the senders in the fragmentation list
350          */

351         public Address[] getSenders() {
352             Address[] result;
353             int index=0;
354
355             synchronized(frag_tables) {
356                 result=new Address[frag_tables.size()];
357                 for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) {
358                     result[index++]=(Address)it.next();
359                 }
360             }
361             return result;
362         }
363
364         public String JavaDoc toString() {
365             Map.Entry entry;
366             StringBuffer JavaDoc buf=new StringBuffer JavaDoc("Fragmentation list contains ");
367             synchronized(frag_tables) {
368                 buf.append(frag_tables.size()).append(" tables\n");
369                 for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) {
370                     entry=(Map.Entry)it.next();
371                     buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n");
372                 }
373             }
374             return buf.toString();
375         }
376
377     }
378
379     /**
380      * Keeps track of the fragments that are received.
381      * Reassembles fragements into entire messages when all fragments have been received.
382      * The fragmentation holds a an array of byte arrays for a unique sender
383      * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
384      */

385     static class FragmentationTable {
386         private final Address sender;
387         /* the hashtable that holds the fragmentation entries for this sender*/
388         private final Hashtable h=new Hashtable(11); // keys: frag_ids, vals: Entrys
389

390
391         public FragmentationTable(Address sender) {
392             this.sender=sender;
393         }
394
395
396         /**
397          * inner class represents an entry for a message
398          * each entry holds an array of byte arrays sorted
399          * once all the byte buffer entries have been filled
400          * the fragmentation is considered complete.
401          */

402         static class Entry {
403             //the total number of fragment in this message
404
int tot_frags=0;
405             // each fragment is a byte buffer
406
byte[] fragments[]=null;
407             //the number of fragments we have received
408
int number_of_frags_recvd=0;
409             // the message ID
410
long msg_id=-1;
411
412             /**
413              * Creates a new entry
414              *
415              * @param tot_frags the number of fragments to expect for this message
416              */

417             Entry(long msg_id, int tot_frags) {
418                 this.msg_id=msg_id;
419                 this.tot_frags=tot_frags;
420                 fragments=new byte[tot_frags][];
421                 for(int i=0; i < tot_frags; i++) {
422                     fragments[i]=null;
423                 }
424             }
425
426             /**
427              * adds on fragmentation buffer to the message
428              *
429              * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
430              * @param frag the byte buffer containing the data for this fragmentation, should not be null
431              */

432             public void set(int frag_id, byte[] frag) {
433                 fragments[frag_id]=frag;
434                 number_of_frags_recvd++;
435             }
436
437             /**
438              * returns true if this fragmentation is complete
439              * ie, all fragmentations have been received for this buffer
440              */

441             public boolean isComplete() {
442                 /*first make the simple check*/
443                 if(number_of_frags_recvd < tot_frags) {
444                     return false;
445                 }
446                 /*then double check just in case*/
447                 for(int i=0; i < fragments.length; i++) {
448                     if(fragments[i] == null)
449                         return false;
450                 }
451                 /*all fragmentations have been received*/
452                 return true;
453             }
454
455             /**
456              * Assembles all the fragmentations into one buffer
457              * this method does not check if the fragmentation is complete
458              *
459              * @return the complete message in one buffer
460              */

461             public byte[] assembleBuffer() {
462                 return Util.defragmentBuffer(fragments);
463             }
464
465             /**
466              * debug only
467              */

468             public String JavaDoc toString() {
469                 StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
470                 ret.append("[tot_frags=" + tot_frags + ", number_of_frags_recvd=" + number_of_frags_recvd + ']');
471                 return ret.toString();
472             }
473
474             public int hashCode() {
475                 return super.hashCode();
476             }
477         }
478
479
480         /**
481          * Creates a new entry if not yet present. Adds the fragment.
482          * If all fragements for a given message have been received,
483          * an entire message is reassembled and returned.
484          * Otherwise null is returned.
485          *
486          * @param id - the message ID, unique for a sender
487          * @param frag_id the index of this fragmentation (0..tot_frags-1)
488          * @param tot_frags the total number of fragmentations expected
489          * @param fragment - the byte buffer for this fragment
490          */

491         public synchronized byte[] add(long id, int frag_id, int tot_frags, byte[] fragment) {
492
493             /*initialize the return value to default not complete */
494             byte[] retval=null;
495
496             Entry e=(Entry)h.get(new Long JavaDoc(id));
497
498             if(e == null) { // Create new entry if not yet present
499
e=new Entry(id, tot_frags);
500                 h.put(new Long JavaDoc(id), e);
501             }
502
503             e.set(frag_id, fragment);
504             if(e.isComplete()) {
505                 retval=e.assembleBuffer();
506                 h.remove(new Long JavaDoc(id));
507             }
508
509             return retval;
510         }
511
512         public void reset() {
513         }
514
515         public String JavaDoc toString() {
516             StringBuffer JavaDoc buf=new StringBuffer JavaDoc("Fragmentation Table Sender:").append(sender).append("\n\t");
517             java.util.Enumeration JavaDoc e=this.h.elements();
518             while(e.hasMoreElements()) {
519                 Entry entry=(Entry)e.nextElement();
520                 int count=0;
521                 for(int i=0; i < entry.fragments.length; i++) {
522                     if(entry.fragments[i] != null) {
523                         count++;
524                     }
525                 }
526                 buf.append("Message ID:").append(entry.msg_id).append("\n\t");
527                 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t");
528                 buf.append("Frags Received:").append(count).append("\n\n");
529             }
530             return buf.toString();
531         }
532     }
533
534 }
535
536
537
Popular Tags