KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FC


1 // $Id: FC.java,v 1.20 2005/04/20 20:25:46 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.util.CondVar;
7 import org.jgroups.util.Streamable;
8 import org.jgroups.stack.Protocol;
9
10 import java.io.*;
11 import java.util.*;
12
13 /**
14  * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes
15  * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of
16  * how many credits it has received from a sender. When credits for a sender fall below a threshold,
17  * the receiver sends more credits to the sender. Works for both unicast and multicast messages.<br>
18  * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this
19  * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down().
20  * @author Bela Ban
21  * @version $Revision: 1.20 $
22  */

23 public class FC extends Protocol {
24
25     /** My own address */
26     Address local_addr=null;
27
28     /** HashMap<Address,Long>: keys are members, values are credits left. For each send, the
29      * number of credits is decremented by the message size */

30     final HashMap sent=new HashMap(11);
31
32     /** HashMap<Address,Long>: keys are members, values are credits left (in bytes).
33      * For each receive, the credits for the sender are decremented by the size of the received message.
34      * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT
35      * is received after reaching <tt>min_credits</tt> credits. */

36     final HashMap received=new HashMap(11);
37
38     /** We cache the membership */
39     final Vector members=new Vector(11);
40
41     /** List of members from whom we expect credits */
42     final List creditors=new ArrayList(11);
43
44     /** Max number of bytes to send per receiver until an ack must
45      * be received before continuing sending */

46     long max_credits=50000;
47
48     /** If credits fall below this limit, we send more credits to the sender. (We also send when
49      * credits are exhausted (0 credits left)) */

50     double min_threshold=0.25;
51
52     /** Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will
53      * override the above computation */

54     long min_credits=0;
55
56     /** Current mode. True if channel was sent a BLOCK_SEND event, false if UNBLOCK_EVENT was sent */
57     CondVar blocking=new CondVar("blocking", Boolean.FALSE, sent); // we're using the sender's map as sync
58

59     static final String JavaDoc name="FC";
60
61     long start_blocking=0, stop_blocking=0;
62
63
64
65
66     public String JavaDoc getName() {
67         return name;
68     }
69
70
71     public boolean setProperties(Properties props) {
72         String JavaDoc str;
73         boolean min_credits_set=false;
74
75         super.setProperties(props);
76         str=props.getProperty("max_credits");
77         if(str != null) {
78             max_credits=Long.parseLong(str);
79             props.remove("max_credits");
80         }
81
82         str=props.getProperty("min_threshold");
83         if(str != null) {
84             min_threshold=Double.parseDouble(str);
85             props.remove("min_threshold");
86         }
87
88         str=props.getProperty("min_credits");
89         if(str != null) {
90             min_credits=Long.parseLong(str);
91             props.remove("min_credits");
92             min_credits_set=true;
93         }
94
95         if(!min_credits_set)
96             min_credits=(long)((double)max_credits * min_threshold);
97
98         if(props.size() > 0) {
99             System.err.println("FC.setProperties(): the following properties are not recognized:");
100             props.list(System.out);
101             return false;
102         }
103         return true;
104     }
105
106
107
108     public void down(final Event evt) {
109         switch(evt.getType()) {
110             case Event.VIEW_CHANGE:
111                 // this has to be run in a separate thread because waitUntilEnoughCreditsAvailable() might block,
112
// and the view change could potentially unblock it
113
new Thread JavaDoc() {
114                     public void run() {
115                         handleViewChange(((View)evt.getArg()).getMembers());
116                     }
117                 }.start();
118                 break;
119             case Event.MSG:
120                 // blocks until enought credits are available to send message
121
waitUntilEnoughCreditsAvailable(evt);
122                 return;
123         }
124         passDown(evt); // this could potentially use the lower protocol's thread which may block
125
}
126
127
128
129
130     public void up(Event evt) {
131         switch(evt.getType()) {
132             case Event.SET_LOCAL_ADDRESS:
133                 local_addr=(Address)evt.getArg();
134                 break;
135             case Event.VIEW_CHANGE:
136                 handleViewChange(((View)evt.getArg()).getMembers());
137                 break;
138             case Event.MSG:
139                 Message msg=(Message)evt.getArg();
140                 FcHeader hdr=(FcHeader)msg.removeHeader(name);
141                 if(hdr != null) {
142                     if(hdr.type == FcHeader.REPLENISH) {
143                         handleCredit(msg.getSrc());
144                         return; // don't pass message up
145
}
146                 }
147                 else {
148                     adjustCredit(msg);
149                 }
150                 break;
151         }
152         passUp(evt);
153     }
154
155
156
157     void handleCredit(Address src) {
158         if(src == null) return;
159
160         synchronized(sent) {
161             if(log.isTraceEnabled())
162                 log.trace("received replenishment message from " + src + ", old credit was " + sent.get(src) +
163                           ", new credits are " + max_credits + ". Creditors are\n" + printCreditors());
164
165             sent.put(src, new Long JavaDoc(max_credits));
166             if(creditors.size() > 0) { // we are blocked because we expect credit from one or more members
167
removeCreditor(src);
168                 if(creditors.size() == 0 && blocking.get().equals(Boolean.TRUE)) {
169                     unblockSender(); // triggers sent.notifyAll()...
170
}
171             }
172         }
173     }
174
175
176     /**
177      * Check whether sender has enough credits left. If not, send him some more
178      * @param msg
179      */

180     void adjustCredit(Message msg) {
181         Address SRC=msg.getSrc();
182         long size=Math.max(24, msg.getLength());
183
184         if(src == null) {
185             if(log.isErrorEnabled()) log.error("src is null");
186             return;
187         }
188
189         synchronized(received) {
190             if(log.isTraceEnabled()) log.trace("credit for " + src + " is " + received.get(src));
191             if(decrementCredit(received, src, size) == false) {
192                 received.put(src, new Long JavaDoc(max_credits));
193                 // not enough credits left
194
if(log.isTraceEnabled()) log.trace("sending replenishment message to " + src);
195                 sendCredit(src);
196             }
197         }
198     }
199
200
201
202     void sendCredit(Address dest) {
203         Message msg=new Message(dest, null, null);
204         FcHeader hdr=new FcHeader(FcHeader.REPLENISH);
205         msg.putHeader(name, hdr);
206         passDown(new Event(Event.MSG, msg));
207     }
208
209
210     /**
211      * Checks whether enough credits are available to send message. If not, blocks until enough credits
212      * are available
213      * @param evt Guaranteed to be a Message
214      * @return
215      */

216     void waitUntilEnoughCreditsAvailable(Event evt) {
217         Message msg=(Message)evt.getArg();
218
219         // not enough credits, block until replenished with credits
220
synchronized(sent) { // 'sent' is the same lock as blocking.getLock()...
221
passDown(evt); // let this one go, but block on the next message if not sufficient credit
222
if(decrMessage(msg) == false) {
223                 if(log.isTraceEnabled())
224                     log.trace("blocking due to insufficient credits, creditors=\n" + printCreditors());
225                 start_blocking=System.currentTimeMillis();
226                 blocking.set(Boolean.TRUE);
227                 blocking.waitUntil(Boolean.FALSE); // waits on 'sent'
228
}
229         }
230     }
231
232
233     /**
234      * Try to decrement the credits needed for this message and return true if successful, or false otherwise.
235      * For unicast destinations, the credits required are subtracted from the unicast destination member, for
236      * multicast messages the credits are subtracted from all current members in the group.
237      * @param msg
238      * @return false: will block, true: will not block
239      */

240     private boolean decrMessage(Message msg) {
241         Address dest;
242         long size;
243         boolean success=true;
244
245         // ******************************************************************************************************
246
// this method is called by waitUntilEnoughCredits() which syncs on 'sent', so we don't need to sync here
247
// ******************************************************************************************************
248

249         if(msg == null) {
250             if(log.isErrorEnabled()) log.error("msg is null");
251             return true; // don't block !
252
}
253         dest=msg.getDest();
254         size=Math.max(24, msg.getLength());
255         if(dest != null && !dest.isMulticastAddress()) { // unicast destination
256
if(log.isTraceEnabled()) log.trace("credit for " + dest + " is " + sent.get(dest));
257             if(decrementCredit(sent, dest, size)) {
258                 return true;
259             }
260             else {
261                 addCreditor(dest);
262                 return false;
263             }
264         }
265         else { // multicast destination
266
for(Iterator it=members.iterator(); it.hasNext();) {
267                 dest=(Address)it.next();
268                 if(log.isTraceEnabled()) log.trace("credit for " + dest + " is " + sent.get(dest));
269                 if(decrementCredit(sent, dest, size) == false) {
270                     addCreditor(dest);
271                     success=false;
272                 }
273             }
274         }
275         return success;
276     }
277
278
279
280
281     /** If message queueing is enabled, sends queued messages and unlocks sender (if successful) */
282     private void unblockSender() {
283         // **********************************************************************
284
// always called with 'sent' lock acquired, so we don't need to sync here
285
// **********************************************************************
286
if(log.isTraceEnabled()) log.trace("setting blocking=false");
287         blocking.set(Boolean.FALSE);
288         printBlockTime();
289     }
290
291     private void printBlockTime() {
292         stop_blocking=System.currentTimeMillis();
293         long diff=stop_blocking - start_blocking;
294         stop_blocking=start_blocking=0;
295         if(log.isTraceEnabled())
296             log.trace("blocking time was " + diff + "ms");
297     }
298
299     private String JavaDoc printCreditors() {
300         // **********************************************************************
301
// always called with 'sent' lock acquired, so we don't need to sync here
302
// **********************************************************************
303
StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
304         for(Iterator it=creditors.iterator(); it.hasNext();) {
305             Address creditor=(Address)it.next();
306             sb.append(creditor).append(": ").append(getCredits(sent, creditor)).append(" credits\n");
307         }
308         return sb.toString();
309     }
310
311     private void addCreditor(Address mbr) {
312         if(mbr != null && !creditors.contains(mbr))
313             creditors.add(mbr);
314     }
315
316     private void removeCreditor(Address mbr) {
317         if(mbr != null)
318             creditors.remove(mbr);
319     }
320
321     private long getCredits(Map map, Address mbr) {
322         Long JavaDoc tmp=(Long JavaDoc)map.get(mbr);
323         if(tmp == null) {
324             map.put(mbr, new Long JavaDoc(max_credits));
325             return max_credits;
326         }
327         return tmp.longValue();
328     }
329
330
331
332
333
334     /**
335      * Find the credits associated with <tt>dest</tt> and decrement its credits by the message size.
336      * @param map
337      * @param dest
338      * @return Whether the required credits could successfully be subtracted from the credits left
339      */

340     private boolean decrementCredit(HashMap map, Address dest, long credits_required) {
341         long credits_left, new_credits_left;
342         Long JavaDoc tmp=(Long JavaDoc)map.get(dest);
343
344         if(tmp != null) {
345             credits_left=tmp.longValue();
346             new_credits_left=Math.max(0, credits_left - credits_required);
347             map.put(dest, new Long JavaDoc(new_credits_left));
348
349             if(new_credits_left >= min_credits + credits_required) {
350                 return true;
351             }
352             else {
353                 if(log.isTraceEnabled()) {
354                     StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
355                     sb.append("not enough credits left for ").append(dest).append(": left=").append(new_credits_left);
356                     sb.append(", required+min_credits=").append((credits_required +min_credits)).append(", required=");
357                     sb.append(credits_required).append(", min_credits=").append(min_credits);
358                     log.trace(sb.toString());
359                 }
360                 return false;
361             }
362         }
363         return true;
364     }
365
366
367     void handleViewChange(Vector mbrs) {
368         Address addr;
369         if(mbrs == null) return;
370
371         if(log.isTraceEnabled()) log.trace("new membership: " + mbrs);
372         members.clear();
373         members.addAll(mbrs);
374
375         synchronized(received) {
376             // add members not in membership to received hashmap (with full credits)
377
for(int i=0; i < mbrs.size(); i++) {
378                 addr=(Address) mbrs.elementAt(i);
379                 if(!received.containsKey(addr))
380                     received.put(addr, new Long JavaDoc(max_credits));
381             }
382             // remove members that left
383
for(Iterator it=received.keySet().iterator(); it.hasNext();) {
384                 addr=(Address) it.next();
385                 if(!mbrs.contains(addr))
386                     it.remove();
387             }
388         }
389
390         synchronized(sent) {
391             // add members not in membership to sent hashmap (with full credits)
392
for(int i=0; i < mbrs.size(); i++) {
393                 addr=(Address) mbrs.elementAt(i);
394                 if(!sent.containsKey(addr))
395                     sent.put(addr, new Long JavaDoc(max_credits));
396             }
397             // remove members that left
398
for(Iterator it=sent.keySet().iterator(); it.hasNext();) {
399                 addr=(Address)it.next();
400                 if(!mbrs.contains(addr))
401                     it.remove(); // modified the underlying map
402
}
403
404
405
406             // remove all creditors which are not in the new view
407
for(Iterator it=creditors.iterator(); it.hasNext();) {
408                 Address creditor=(Address) it.next();
409                 if(!mbrs.contains(creditor))
410                     it.remove();
411             }
412
413             if(log.isTraceEnabled()) log.trace("creditors are\n" + printCreditors());
414             if(creditors.size() == 0 && blocking.get().equals(Boolean.TRUE))
415                 unblockSender();
416         }
417     }
418
419
420
421 // private String dumpSentMessages() {
422
// StringBuffer sb=new StringBuffer();
423
// for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
424
// Map.Entry entry=(Map.Entry)it.next();
425
// sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');
426
// }
427
// return sb.toString();
428
// }
429

430 // private String dumpReceivedMessages() {
431
// Map tmp;
432
//
433
// synchronized(received) {
434
// tmp=(Map)received.clone();
435
// }
436
// StringBuffer sb=new StringBuffer();
437
// for(Iterator it=tmp.entrySet().iterator(); it.hasNext();) {
438
// Map.Entry entry=(Map.Entry)it.next();
439
// sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');
440
// }
441
// return sb.toString();
442
// }
443

444 // private String dumpMessages() {
445
// StringBuffer sb=new StringBuffer();
446
// sb.append("sent:\n").append(sent).append('\n');
447
// synchronized(received) {
448
// sb.append("received:\n").append(received).append('\n');
449
// }
450
// return sb.toString();
451
// }
452

453     public static class FcHeader extends Header implements Streamable {
454         public static final byte REPLENISH = 1;
455         byte type = REPLENISH;
456
457         public FcHeader() {
458
459         }
460
461         public FcHeader(byte type) {
462             this.type=type;
463         }
464
465
466
467         public long size() {
468             return Global.BYTE_SIZE;
469         }
470
471
472         public void writeExternal(ObjectOutput out) throws IOException {
473             out.writeByte(type);
474         }
475
476         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
477             type=in.readByte();
478         }
479
480         public void writeTo(DataOutputStream out) throws IOException {
481             out.writeByte(type);
482         }
483
484         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
485             type=in.readByte();
486         }
487
488     }
489
490
491 }
492
Popular Tags