KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > SFC


1 package org.jgroups.protocols;
2
3 import org.jgroups.*;
4 import org.jgroups.annotations.GuardedBy;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.util.Util;
7 import org.jgroups.util.Streamable;
8 import org.jgroups.util.BoundedList;
9
10 import java.util.*;
11 import java.util.concurrent.locks.Condition JavaDoc;
12 import java.util.concurrent.locks.Lock JavaDoc;
13 import java.util.concurrent.locks.ReentrantLock JavaDoc;
14 import java.util.concurrent.TimeUnit JavaDoc;
15 import java.io.*;
16
17 /**
18  * Simple flow control protocol. After max_credits bytes sent to the group (or an individual member), the sender blocks
19  * until it receives an ack from all members that they indeed received max_credits bytes.
20  * Design in doc/design/SimpleFlowControl.txt<br/>
21  * <em>Note that SFC supports only flow control for multicast messages; unicast flow control is not supported ! Use FC if
22  * unicast flow control is required.</em>
23  * @author Bela Ban
24  * @version $Id: SFC.java,v 1.17 2007/06/15 11:42:16 belaban Exp $
25  */

26 public class SFC extends Protocol {
27     static final String JavaDoc name="SFC";
28
29     /** Max number of bytes to send per receiver until an ack must be received before continuing sending */
30     private long max_credits=2000000;
31
32     private Long JavaDoc MAX_CREDITS;
33
34     private static final Long JavaDoc ZERO_CREDITS=new Long JavaDoc(0);
35
36     /** Current number of credits available to send */
37     @GuardedBy("lock")
38     private long curr_credits_available;
39
40     /** Map which keeps track of bytes received from senders */
41     @GuardedBy("received_lock")
42     private final Map<Address,Long JavaDoc> received=new HashMap<Address,Long JavaDoc>(12);
43
44     /** Set of members which have requested credits but from whom we have not yet received max_credits bytes */
45     @GuardedBy("received_lock")
46     private final Set<Address> pending_requesters=new HashSet<Address>();
47
48     /** Set of members from whom we haven't yet received credits */
49     @GuardedBy("lock")
50     private final Set<Address> pending_creditors=new HashSet<Address>();
51
52
53     private final Lock JavaDoc lock=new ReentrantLock JavaDoc();
54     /** Lock protecting access to received and pending_requesters */
55     private final Lock JavaDoc received_lock=new ReentrantLock JavaDoc();
56
57
58     /** Used to wait for and signal when credits become available again */
59     private final Condition JavaDoc credits_available=lock.newCondition();
60
61     /** Number of milliseconds after which we send a new credit request if we are waiting for credit responses */
62     private long max_block_time=5000;
63
64     /** Last time a thread woke up from blocking and had to request credit */
65     private long last_blocked_request=0L;
66
67     private final List<Address> members=new LinkedList<Address>();
68
69     private boolean running=true;
70
71     @GuardedBy("lock") long start, stop;
72
73
74
75     // ---------------------- Management information -----------------------
76
long num_blockings=0;
77     long num_bytes_sent=0;
78     long num_credit_requests_sent=0;
79     long num_credit_requests_received=0;
80     long num_replenishments_received=0;
81     long num_replenishments_sent=0;
82     long total_block_time=0;
83     final BoundedList blockings=new BoundedList(50);
84
85
86     public void resetStats() {
87         super.resetStats();
88         num_blockings=total_block_time=num_replenishments_received=num_credit_requests_sent=num_bytes_sent=0;
89         num_replenishments_sent=num_credit_requests_received=0;
90         blockings.removeAll();
91     }
92
93     public long getMaxCredits() {return max_credits;}
94     public long getCredits() {return curr_credits_available;}
95     public long getBytesSent() {return num_bytes_sent;}
96     public long getBlockings() {return num_blockings;}
97     public long getCreditRequestsSent() {return num_credit_requests_sent;}
98     public long getCreditRequestsReceived() {return num_credit_requests_received;}
99     public long getReplenishmentsReceived() {return num_replenishments_received;}
100     public long getReplenishmentsSent() {return num_replenishments_sent;}
101     public long getTotalBlockingTime() {return total_block_time;}
102     public double getAverageBlockingTime() {return num_blockings == 0? 0 : total_block_time / num_blockings;}
103
104
105     public Map<String JavaDoc,Object JavaDoc> dumpStats() {
106         Map<String JavaDoc,Object JavaDoc> retval=super.dumpStats();
107         if(retval == null)
108             retval=new HashMap<String JavaDoc,Object JavaDoc>();
109         return retval;
110     }
111
112     public String JavaDoc printBlockingTimes() {
113         return blockings.toString();
114     }
115
116     public String JavaDoc printReceived() {
117         received_lock.lock();
118         try {
119             return received.toString();
120         }
121         finally {
122             received_lock.unlock();
123         }
124     }
125
126     public String JavaDoc printPendingCreditors() {
127         lock.lock();
128         try {
129             return pending_creditors.toString();
130         }
131         finally {
132             lock.unlock();
133         }
134     }
135
136     public String JavaDoc printPendingRequesters() {
137         received_lock.lock();
138         try {
139             return pending_requesters.toString();
140         }
141         finally {
142             received_lock.unlock();
143         }
144     }
145
146     public void unblock() {
147         lock.lock();
148         try {
149             curr_credits_available=max_credits;
150             credits_available.signalAll();
151         }
152         finally {
153             lock.unlock();
154         }
155     }
156
157     // ------------------- End of management information ----------------------
158

159
160     public final String JavaDoc getName() {
161         return name;
162     }
163
164     public boolean setProperties(Properties props) {
165         String JavaDoc str;
166         super.setProperties(props);
167
168         str=props.getProperty("max_block_time");
169         if(str != null) {
170             max_block_time=Long.parseLong(str);
171             props.remove("max_block_time");
172         }
173
174         str=props.getProperty("max_credits");
175         if(str != null) {
176             max_credits=Long.parseLong(str);
177             props.remove("max_credits");
178         }
179
180         Util.checkBufferSize("SFC.max_credits", max_credits);
181         MAX_CREDITS=new Long JavaDoc(max_credits);
182         curr_credits_available=max_credits;
183
184         if(!props.isEmpty()) {
185             log.error("the following properties are not recognized: " + props);
186             return false;
187         }
188         return true;
189     }
190
191
192
193     public Object JavaDoc down(Event evt) {
194         switch(evt.getType()) {
195             case Event.MSG:
196                 Message msg=(Message)evt.getArg();
197                 Address dest=msg.getDest();
198                 if(dest != null && !dest.isMulticastAddress()) // only handle multicast messages
199
break;
200
201                 boolean send_credit_request=false;
202                 lock.lock();
203                 try {
204                     while(curr_credits_available <=0 && running) {
205                         if(log.isTraceEnabled())
206                             log.trace("blocking (current credits=" + curr_credits_available + ")");
207                         try {
208                             num_blockings++;
209                             // will be signalled when we have credit responses from all members
210
boolean rc=credits_available.await(max_block_time, TimeUnit.MILLISECONDS);
211                             if(rc || (curr_credits_available <=0 && running)) {
212                                 if(log.isTraceEnabled())
213                                     log.trace("returned from await but credits still unavailable (credits=" +curr_credits_available +")");
214                                 long now=System.currentTimeMillis();
215                                 if(now - last_blocked_request >= max_block_time) {
216                                     last_blocked_request=now;
217                                     lock.unlock(); // send the credit request without holding the lock
218
try {
219                                         sendCreditRequest(true);
220                                     }
221                                     finally {
222                                         lock.lock(); // now acquire the lock again
223
}
224                                 }
225                             }
226                             else {
227                                 // reset the last_blocked_request stamp so the
228
// next timed out block will for sure send a request
229
last_blocked_request=0;
230                             }
231                         }
232                         catch(InterruptedException JavaDoc e) {
233                             // bela June 16 2007: http://jira.jboss.com/jira/browse/JGRP-536
234
// if(log.isWarnEnabled())
235
// log.warn("thread was interrupted", e);
236
// Thread.currentThread().interrupt(); // pass the exception on to the caller
237
// return null;
238
}
239                     }
240
241                     // when we get here, curr_credits_available is guaranteed to be > 0
242
int len=msg.getLength();
243                     num_bytes_sent+=len;
244                     curr_credits_available-=len; // we'll block on insufficient credits on the next down() call
245
if(curr_credits_available <=0) {
246                         pending_creditors.clear();
247                         synchronized(members) {
248                             pending_creditors.addAll(members);
249                         }
250                         send_credit_request=true;
251                     }
252                 }
253                 finally {
254                     lock.unlock();
255                 }
256
257                 // we don't need to protect send_credit_request because a thread above either (a) decrements the credits
258
// by the msg length and sets send_credit_request to true or (b) blocks because there are no credits
259
// available. So only 1 thread can ever set send_credit_request at any given time
260
if(send_credit_request) {
261                     if(log.isTraceEnabled())
262                         log.trace("sending credit request to group");
263                     start=System.nanoTime(); // only 1 thread is here at any given time
264
Object JavaDoc ret=down_prot.down(evt); // send the message before the credit request
265
sendCreditRequest(false); // do this outside of the lock
266
return ret;
267                 }
268                 break;
269
270             case Event.VIEW_CHANGE:
271                 handleViewChange((View)evt.getArg());
272                 break;
273
274             case Event.SUSPECT:
275                 handleSuspect((Address)evt.getArg());
276                 break;
277         }
278
279         return down_prot.down(evt);
280     }
281
282
283
284     public Object JavaDoc up(Event evt) {
285         switch(evt.getType()) {
286
287             case Event.MSG:
288                 Message msg=(Message)evt.getArg();
289                 Header hdr=(Header)msg.getHeader(name);
290                 Address sender=msg.getSrc();
291                 if(hdr != null) {
292                     switch(hdr.type) {
293                         case Header.CREDIT_REQUEST:
294                             handleCreditRequest(sender, false);
295                             break;
296                         case Header.URGENT_CREDIT_REQUEST:
297                             handleCreditRequest(sender, true);
298                             break;
299                         case Header.REPLENISH:
300                             handleCreditResponse(sender);
301                             break;
302                         default:
303                             if(log.isErrorEnabled())
304                                 log.error("unknown header type " + hdr.type);
305                             break;
306                     }
307                     return null; // we don't pass the request further up
308
}
309
310                 Address dest=msg.getDest();
311                 if(dest != null && !dest.isMulticastAddress()) // we don't handle unicast messages
312
break;
313
314                 handleMessage(msg, sender);
315                 break;
316
317             case Event.VIEW_CHANGE:
318                 handleViewChange((View)evt.getArg());
319                 break;
320
321             case Event.SUSPECT:
322                 handleSuspect((Address)evt.getArg());
323                 break;
324         }
325         return up_prot.up(evt);
326     }
327
328
329
330
331     public void start() throws Exception JavaDoc {
332         super.start();
333         running=true;
334     }
335
336
337     public void stop() {
338         super.stop();
339         running=false;
340         lock.lock();
341         try {
342             credits_available.signalAll();
343         }
344         finally {
345             lock.unlock();
346         }
347     }
348
349
350     private void handleMessage(Message msg, Address sender) {
351         int len=msg.getLength(); // we don't care about headers, this is faster than size()
352

353         Long JavaDoc new_val;
354         boolean send_credit_response=false;
355
356         received_lock.lock();
357         try {
358             Long JavaDoc credits=received.get(sender);
359             if(credits == null) {
360                 new_val=MAX_CREDITS;
361                 received.put(sender, new_val);
362             }
363             else {
364                 new_val=credits.longValue() + len;
365                 received.put(sender, new_val);
366             }
367             // if(log.isTraceEnabled())
368
// log.trace("received " + len + " bytes from " + sender + ": total=" + new_val + " bytes");
369

370             // see whether we have any pending credit requests
371
if(!pending_requesters.isEmpty()
372                     && pending_requesters.contains(sender)
373                     && new_val.longValue() >= max_credits) {
374                 pending_requesters.remove(sender);
375                 if(log.isTraceEnabled())
376                     log.trace("removed " + sender + " from credit requesters; sending credits");
377                 received.put(sender, ZERO_CREDITS);
378                 send_credit_response=true;
379             }
380         }
381         finally {
382             received_lock.unlock();
383         }
384
385         if(send_credit_response) // send outside of the monitor
386
sendCreditResponse(sender);
387     }
388     
389
390     private void handleCreditRequest(Address sender, boolean urgent) {
391         boolean send_credit_response=false;
392
393         received_lock.lock();
394         try {
395             num_credit_requests_received++;
396             Long JavaDoc bytes=received.get(sender);
397             if(log.isTraceEnabled())
398                 log.trace("received credit request from " + sender + " (total received: " + bytes + " bytes");
399
400             if(bytes == null) {
401                 if(log.isErrorEnabled())
402                     log.error("received credit request from " + sender + ", but sender is not in received hashmap;" +
403                             " adding it");
404                 send_credit_response=true;
405             }
406             else {
407                 if(bytes.longValue() < max_credits && !urgent) {
408                     if(log.isTraceEnabled())
409                         log.trace("adding " + sender + " to pending credit requesters");
410                     pending_requesters.add(sender);
411                 }
412                 else {
413                     send_credit_response=true;
414                 }
415             }
416             if(send_credit_response)
417                 received.put(sender, ZERO_CREDITS);
418         }
419         finally{
420             received_lock.unlock();
421         }
422
423         if(send_credit_response) {
424             sendCreditResponse(sender);
425         }
426     }
427
428     private void handleCreditResponse(Address sender) {
429         lock.lock();
430         try {
431             num_replenishments_received++;
432             if(pending_creditors.remove(sender) && pending_creditors.isEmpty()) {
433                 curr_credits_available=max_credits;
434                 stop=System.nanoTime();
435                 long diff=(stop-start)/1000000L;
436                 if(log.isTraceEnabled())
437                     log.trace("replenished credits to " + curr_credits_available +
438                             " (total blocking time=" + diff + " ms)");
439                 blockings.add(new Long JavaDoc(diff));
440                 total_block_time+=diff;
441                 credits_available.signalAll();
442             }
443         }
444         finally{
445             lock.unlock();
446         }
447     }
448
449
450
451     private void handleViewChange(View view) {
452         List<Address> mbrs=view != null? view.getMembers() : null;
453         if(mbrs != null) {
454             synchronized(members) {
455                 members.clear();
456                 members.addAll(mbrs);
457             }
458         }
459
460         lock.lock();
461         try {
462             // remove all members which left from pending_creditors
463
if(pending_creditors.retainAll(members) && pending_creditors.isEmpty()) {
464                 // the collection was changed and is empty now as a result of retainAll()
465
curr_credits_available=max_credits;
466                 if(log.isTraceEnabled())
467                     log.trace("replenished credits to " + curr_credits_available);
468                 credits_available.signalAll();
469             }
470         }
471         finally {
472             lock.unlock();
473         }
474
475         received_lock.lock();
476         try {
477             // remove left members
478
received.keySet().retainAll(members);
479
480             // add new members with *full* credits (see doc/design/SimpleFlowControl.txt for reason)
481
for(Address mbr: members) {
482                 if(!received.containsKey(mbr))
483                     received.put(mbr, MAX_CREDITS);
484             }
485
486             // remove left members from pending credit requesters
487
pending_requesters.retainAll(members);
488         }
489         finally{
490             received_lock.unlock();
491         }
492     }
493
494
495     private void handleSuspect(Address suspected_mbr) {
496         // this is the same as a credit response - we cannot block forever for a crashed member
497
handleCreditResponse(suspected_mbr);
498     }
499
500
501     private void sendCreditRequest(boolean urgent) {
502         Message credit_req=new Message();
503         // credit_req.setFlag(Message.OOB); // we need to receive the credit request after regular messages
504
byte type=urgent? Header.URGENT_CREDIT_REQUEST : Header.CREDIT_REQUEST;
505         credit_req.putHeader(name, new Header(type));
506         num_credit_requests_sent++;
507         down_prot.down(new Event(Event.MSG, credit_req));
508     }
509
510     private void sendCreditResponse(Address dest) {
511         Message credit_rsp=new Message(dest);
512         credit_rsp.setFlag(Message.OOB);
513         Header hdr=new Header(Header.REPLENISH);
514         credit_rsp.putHeader(name, hdr);
515         if(log.isTraceEnabled())
516             log.trace("sending credit response to " + dest);
517         num_replenishments_sent++;
518         down_prot.down(new Event(Event.MSG, credit_rsp));
519     }
520
521
522
523     public static class Header extends org.jgroups.Header implements Streamable {
524         public static final byte CREDIT_REQUEST = 1; // the sender of the message is the requester
525
public static final byte REPLENISH = 2; // the sender of the message is the creditor
526
public static final byte URGENT_CREDIT_REQUEST = 3;
527
528         byte type=CREDIT_REQUEST;
529
530         public Header() {
531
532         }
533
534         public Header(byte type) {
535             this.type=type;
536         }
537
538         public int size() {
539             return Global.BYTE_SIZE;
540         }
541
542         public void writeExternal(ObjectOutput out) throws IOException {
543             out.writeByte(type);
544         }
545
546         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
547             type=in.readByte();
548         }
549
550         public void writeTo(DataOutputStream out) throws IOException {
551             out.writeByte(type);
552         }
553
554         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
555             type=in.readByte();
556         }
557
558         public String JavaDoc toString() {
559             switch(type) {
560                 case REPLENISH: return "REPLENISH";
561                 case CREDIT_REQUEST: return "CREDIT_REQUEST";
562                 case URGENT_CREDIT_REQUEST: return "URGENT_CREDIT_REQUEST";
563                 default: return "<invalid type>";
564             }
565         }
566     }
567
568
569 }
570
Popular Tags