KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > GroupRequest


1 // $Id: GroupRequest.java,v 1.11 2005/01/13 01:06:53 belaban Exp $
2

3 package org.jgroups.blocks;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Address;
9 import org.jgroups.Message;
10 import org.jgroups.Transport;
11 import org.jgroups.View;
12 import org.jgroups.util.Command;
13 import org.jgroups.util.RspList;
14
15 import java.util.Vector JavaDoc;
16
17
18
19
20 /**
21  * Sends a message to all members of the group and waits for all responses (or timeout). Returns a
22  * boolean value (success or failure). Results (if any) can be retrieved when done.<p>
23  * The supported transport to send requests is currently either a RequestCorrelator or a generic
24  * Transport. One of them has to be given in the constructor. It will then be used to send a
25  * request. When a message is received by either one, the receiveResponse() of this class has to
26  * be called (this class does not actively receive requests/responses itself). Also, when a view change
27  * or suspicion is received, the methods viewChange() or suspect() of this class have to be called.<p>
28  * When started, an array of responses, correlating to the membership, is created. Each response
29  * is added to the corresponding field in the array. When all fields have been set, the algorithm
30  * terminates.
31  * This algorithm can optionally use a suspicion service (failure detector) to detect (and
32  * exclude from the membership) fauly members. If no suspicion service is available, timeouts
33  * can be used instead (see <code>execute()</code>). When done, a list of suspected members
34  * can be retrieved.<p>
35  * Because a channel might deliver requests, and responses to <em>different</em> requests, the
36  * <code>GroupRequest</code> class cannot itself receive and process requests/responses from the
37  * channel. A mechanism outside this class has to do this; it has to determine what the responses
38  * are for the message sent by the <code>execute()</code> method and call <code>receiveResponse()</code>
39  * to do so.<p>
40  * <b>Requirements</b>: lossless delivery, e.g. acknowledgment-based message confirmation.
41  * @author Bela Ban
42  * @version $Revision: 1.11 $
43  */

44 public class GroupRequest implements RspCollector, Command {
45     /** return only first response */
46     public static final int GET_FIRST=1;
47
48     /** return all responses */
49     public static final int GET_ALL=2;
50
51     /** return majority (of all non-faulty members) */
52     public static final int GET_MAJORITY=3;
53
54     /** return majority (of all members, may block) */
55     public static final int GET_ABS_MAJORITY=4;
56
57     /** return n responses (may block) */
58     public static final int GET_N=5;
59
60     /** return no response (async call) */
61     public static final int GET_NONE=6;
62
63     private static final short NOT_RECEIVED=0;
64     private static final short RECEIVED=1;
65     private static final short SUSPECTED=2;
66
67     private Address membership[]=null; // current membership
68
private Object JavaDoc responses[]=null; // responses corresponding to membership
69
private short received[]=null; // status of response for each mbr (see above)
70

71     /** bounded queue of suspected members */
72     private final Vector JavaDoc suspects=new Vector JavaDoc();
73
74     /** list of members, changed by viewChange() */
75     private final Vector JavaDoc members=new Vector JavaDoc();
76
77     /** keep suspects vector bounded */
78     private final int max_suspects=40;
79     protected Message request_msg=null;
80     protected RequestCorrelator corr=null; // either use RequestCorrelator or ...
81
protected Transport transport=null; // Transport (one of them has to be non-null)
82

83     protected int rsp_mode=GET_ALL;
84     protected boolean done=false;
85     protected final Object JavaDoc rsp_mutex=new Object JavaDoc();
86     protected long timeout=0;
87     protected int expected_mbrs=0;
88
89     protected static final Log log=LogFactory.getLog(GroupRequest.class);
90
91     /** to generate unique request IDs (see getRequestId()) */
92     private static long last_req_id=1;
93
94     protected long req_id=-1; // request ID for this request
95

96
97     /**
98      @param m The message to be sent
99      @param corr The request correlator to be used. A request correlator sends requests tagged with
100      a unique ID and notifies the sender when matching responses are received. The
101      reason <code>GroupRequest</code> uses it instead of a <code>Transport</code> is
102      that multiple requests/responses might be sent/received concurrently.
103      @param members The initial membership. This value reflects the membership to which the request
104      is sent (and from which potential responses are expected). Is reset by reset().
105      @param rsp_mode How many responses are expected. Can be
106      <ol>
107      <li><code>GET_ALL</code>: wait for all responses from non-suspected members.
108      A suspicion service might warn
109      us when a member from which a response is outstanding has crashed, so it can
110      be excluded from the responses. If no suspision service is available, a
111      timeout can be used (a value of 0 means wait forever). <em>If a timeout of
112      0 is used, no suspicion service is available and a member from which we
113      expect a response has crashed, this methods blocks forever !</em>.
114      <li><code>GET_FIRST</code>: wait for the first available response.
115      <li><code>GET_MAJORITY</code>: wait for the majority of all responses. The
116      majority is re-computed when a member is suspected.
117      <li><code>GET_ABS_MAJORITY</code>: wait for the majority of
118      <em>all</em> members.
119      This includes failed members, so it may block if no timeout is specified.
120      <li><code>GET_N</CODE>: wait for N members.
121      Return if n is >= membership+suspects.
122      <li><code>GET_NONE</code>: don't wait for any response. Essentially send an
123      asynchronous message to the group members.
124      </ol>
125      */

126     public GroupRequest(Message m, RequestCorrelator corr, Vector JavaDoc members, int rsp_mode) {
127         request_msg=m;
128         this.corr=corr;
129         this.rsp_mode=rsp_mode;
130         reset(members);
131         // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded
132
}
133
134
135     /**
136      @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
137      (e.g. if a suspicion service is available; timeouts are not needed).
138      */

139     public GroupRequest(Message m, RequestCorrelator corr, Vector JavaDoc members, int rsp_mode,
140                         long timeout, int expected_mbrs) {
141         this(m, corr, members, rsp_mode);
142         if(timeout > 0)
143             this.timeout=timeout;
144         this.expected_mbrs=expected_mbrs;
145     }
146
147
148     public GroupRequest(Message m, Transport transport, Vector JavaDoc members, int rsp_mode) {
149         request_msg=m;
150         this.transport=transport;
151         this.rsp_mode=rsp_mode;
152         reset(members);
153         // suspects.removeAllElements(); // bela Aug 23 2002: make suspects bounded
154
}
155
156
157     /**
158      * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
159      * (e.g. if a suspicion service is available; timeouts are not needed).
160      */

161     public GroupRequest(Message m, Transport transport, Vector JavaDoc members,
162                         int rsp_mode, long timeout, int expected_mbrs) {
163        this(m, transport, members, rsp_mode);
164        if(timeout > 0)
165           this.timeout=timeout;
166        this.expected_mbrs=expected_mbrs;
167     }
168
169
170     /**
171      * Sends the message. Returns when n responses have been received, or a
172      * timeout has occurred. <em>n</em> can be the first response, all
173      * responses, or a majority of the responses.
174      */

175     public boolean execute() {
176         boolean retval;
177         if(corr == null && transport == null) {
178             if(log.isErrorEnabled()) log.error("both corr and transport are null, cannot send group request");
179             return false;
180         }
181         synchronized(rsp_mutex) {
182             done=false;
183             retval=doExecute(timeout);
184             if(retval == false && log.isTraceEnabled())
185                 log.trace("call did not execute correctly, request is " + toString());
186             done=true;
187             return retval;
188         }
189     }
190
191
192     /**
193      * Resets the group request, so it can be reused for another execution.
194      */

195     public void reset(Message m, int mode, long timeout) {
196         synchronized(rsp_mutex) {
197             done=false;
198             request_msg=m;
199             rsp_mode=mode;
200             this.timeout=timeout;
201             rsp_mutex.notifyAll();
202         }
203     }
204
205
206     public void reset(Message m, final Vector JavaDoc members, int rsp_mode, long timeout, int expected_rsps) {
207         synchronized(rsp_mutex) {
208             reset(m, rsp_mode, timeout);
209             reset(members);
210             // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded
211
this.expected_mbrs=expected_rsps;
212             rsp_mutex.notifyAll();
213         }
214     }
215
216     /**
217      * This method sets the <code>membership</code> variable to the value of
218      * <code>members</code>. It requires that the caller already hold the
219      * <code>rsp_mutex</code> lock.
220      * @param mbrs The new list of members
221      */

222     public void reset(Vector JavaDoc mbrs) {
223         if(mbrs != null) {
224             int size=mbrs.size();
225             membership=new Address[size];
226             responses=new Object JavaDoc[size];
227             received=new short[size];
228             for(int i=0; i < size; i++) {
229                 membership[i]=(Address)mbrs.elementAt(i);
230                 responses[i]=null;
231                 received[i]=NOT_RECEIVED;
232             }
233             // maintain local membership
234
this.members.clear();
235             this.members.addAll(mbrs);
236         }
237         else {
238             if(membership != null) {
239                 for(int i=0; i < membership.length; i++) {
240                     responses[i]=null;
241                     received[i]=NOT_RECEIVED;
242                 }
243             }
244         }
245     }
246
247
248     /* ---------------------- Interface RspCollector -------------------------- */
249     /**
250      * <b>Callback</b> (called by RequestCorrelator or Transport).
251      * Adds a response to the response table. When all responses have been received,
252      * <code>execute()</code> returns.
253      */

254     public void receiveResponse(Message m) {
255         Address sender=m.getSrc(), mbr;
256         Object JavaDoc val=null;
257         if(done) {
258             if(log.isWarnEnabled()) log.warn("command is done; cannot add response !");
259             return;
260         }
261         if(suspects != null && suspects.size() > 0 && suspects.contains(sender)) {
262             if(log.isWarnEnabled()) log.warn("received response from suspected member " + sender + "; discarding");
263             return;
264         }
265         if(m.getLength() > 0) {
266             try {
267                 val=m.getObject();
268             }
269             catch(Exception JavaDoc e) {
270                 if(log.isErrorEnabled()) log.error("exception=" + e);
271             }
272         }
273         synchronized(rsp_mutex) {
274             for(int i=0; i < membership.length; i++) {
275                 mbr=membership[i];
276                 if(mbr.equals(sender)) {
277                     if(received[i] == NOT_RECEIVED) {
278                         responses[i]=val;
279                         received[i]=RECEIVED;
280                        if(log.isTraceEnabled())
281                           log.trace("received response for request " + req_id + ", sender=" + sender + ", val=" + val);
282                         rsp_mutex.notifyAll(); // wakes up execute()
283
break;
284                     }
285                 }
286             }
287         }
288         // printReceived();
289
}
290
291
292     /**
293      * <b>Callback</b> (called by RequestCorrelator or Transport).
294      * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
295      * This method would probably be called when getting a suspect message from a failure detector
296      * (where available). It is used to exclude faulty members from the response list.
297      */

298     public void suspect(Address suspected_member) {
299         Address mbr;
300         synchronized(rsp_mutex) { // modify 'suspects' and 'responses' array
301
for(int i=0; i < membership.length; i++) {
302                 mbr=membership[i];
303                 if(mbr.equals(suspected_member)) {
304                     addSuspect(suspected_member);
305                     responses[i]=null;
306                     received[i]=SUSPECTED;
307                     rsp_mutex.notifyAll();
308                     break;
309                 }
310             }
311         }
312         // printReceived();
313
}
314
315
316     /**
317      * Any member of 'membership' that is not in the new view is flagged as
318      * SUSPECTED. Any member in the new view that is <em>not</em> in the
319      * membership (ie, the set of responses expected for the current RPC) will
320      * <em>not</em> be added to it. If we did this we might run into the
321      * following problem:
322      * <ul>
323      * <li>Membership is {A,B}
324      * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the
325      * invocation handler)
326      * <li>C joins while A waits for responses from A and B
327      * <li>If this would generate a new view {A,B,C} and if this expanded the
328      * response set to {A,B,C}, A would wait forever on C's response because C
329      * never received the request in the first place, therefore won't send a
330      * response.
331      * </ul>
332      */

333     public void viewChange(View new_view) {
334         Address mbr;
335         Vector JavaDoc mbrs=new_view != null? new_view.getMembers() : null;
336         if(membership == null || membership.length == 0 || mbrs == null)
337             return;
338
339         synchronized(rsp_mutex) {
340             this.members.clear();
341             this.members.addAll(mbrs);
342             for(int i=0; i < membership.length; i++) {
343                 mbr=membership[i];
344                 if(!mbrs.contains(mbr)) {
345                     addSuspect(mbr);
346                     responses[i]=null;
347                     received[i]=SUSPECTED;
348                 }
349             }
350             rsp_mutex.notifyAll();
351         }
352     }
353
354
355     /* -------------------- End of Interface RspCollector ----------------------------------- */
356
357
358
359     /** Returns the results as a RspList */
360     public RspList getResults() {
361         RspList retval=new RspList();
362         Address sender;
363         synchronized(rsp_mutex) {
364             for(int i=0; i < membership.length; i++) {
365                 sender=membership[i];
366                 switch(received[i]) {
367                     case SUSPECTED:
368                         retval.addSuspect(sender);
369                         break;
370                     case RECEIVED:
371                         retval.addRsp(sender, responses[i]);
372                         break;
373                     case NOT_RECEIVED:
374                         retval.addNotReceived(sender);
375                         break;
376                 }
377             }
378             return retval;
379         }
380     }
381
382
383     public String JavaDoc toString() {
384         StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
385         ret.append("[GroupRequest:\n");
386         ret.append("req_id=").append(req_id).append('\n');
387         ret.append("members: ");
388         for(int i=0; i < membership.length; i++)
389             ret.append(membership[i] + " ");
390         ret.append("\nresponses: ");
391         for(int i=0; i < responses.length; i++)
392             ret.append(responses[i] + " ");
393         ret.append("\nreceived: ");
394         for(int i=0; i < received.length; i++)
395             ret.append(receivedToString(received[i]) + " ");
396         if(suspects.size() > 0)
397             ret.append("\nsuspects: " + suspects);
398         ret.append("\nrequest_msg: " + request_msg);
399         ret.append("\nrsp_mode: " + rsp_mode);
400         ret.append("\ndone: " + done);
401         ret.append("\ntimeout: " + timeout);
402         ret.append("\nexpected_mbrs: " + expected_mbrs);
403         ret.append("\n]");
404         return ret.toString();
405     }
406
407
408     public int getNumSuspects() {
409         return suspects.size();
410     }
411
412
413     public Vector JavaDoc getSuspects() {
414         return suspects;
415     }
416
417
418     public boolean isDone() {
419         return done;
420     }
421
422
423
424     /* --------------------------------- Private Methods -------------------------------------*/
425
426     protected int determineMajority(int i) {
427         return i < 2? i : (i / 2) + 1;
428     }
429
430     /** Generates a new unique request ID */
431     private static synchronized long getRequestId() {
432         long result=System.currentTimeMillis();
433         if(result <= last_req_id) {
434             result=last_req_id + 1;
435         }
436         last_req_id=result;
437         return result;
438     }
439
440     /** This method runs with rsp_mutex locked (called by <code>execute()</code>). */
441     protected boolean doExecute(long timeout) {
442         long start_time=0;
443         Address mbr, suspect;
444         req_id=getRequestId();
445         reset(null); // clear 'responses' array
446
if(suspects != null) { // mark all suspects in 'received' array
447
for(int i=0; i < suspects.size(); i++) {
448                 suspect=(Address)suspects.elementAt(i);
449                 for(int j=0; j < membership.length; j++) {
450                     mbr=membership[j];
451                     if(mbr.equals(suspect)) {
452                         received[j]=SUSPECTED;
453                         break; // we can break here because we ensure there are no duplicate members
454
}
455                 }
456             }
457         }
458
459         try {
460            if(log.isTraceEnabled()) log.trace("sending request (id=" + req_id + ')');
461             if(corr != null) {
462                 java.util.List JavaDoc tmp=members != null? members : null;
463                 corr.sendRequest(req_id, tmp, request_msg, rsp_mode == GET_NONE? null : this);
464             }
465             else {
466                 transport.send(request_msg);
467             }
468         }
469         catch(Throwable JavaDoc e) {
470             log.error("exception=" + e);
471             if(corr != null) {
472                 corr.done(req_id);
473             }
474             return false;
475         }
476
477         if(timeout <= 0) {
478             while(true) { /* Wait for responses: */
479                 adjustMembership(); // may not be necessary, just to make sure...
480
if(getResponses()) {
481                     if(corr != null) {
482                         corr.done(req_id);
483                     }
484                     if(log.isTraceEnabled()) {
485                         log.trace("received all responses: " + toString());
486                     }
487                     return true;
488                 }
489                 try {
490                     rsp_mutex.wait();
491                 }
492                 catch(Exception JavaDoc e) {
493                 }
494             }
495         }
496         else {
497             start_time=System.currentTimeMillis();
498             while(timeout > 0) { /* Wait for responses: */
499                 if(getResponses()) {
500                     if(corr != null)
501                         corr.done(req_id);
502                    if(log.isTraceEnabled()) log.trace("received all responses: " + toString());
503                     return true;
504                 }
505                 timeout=timeout - (System.currentTimeMillis() - start_time);
506                 if(timeout > 0) {
507                     try {
508                         rsp_mutex.wait(timeout);
509                     }
510                     catch(Exception JavaDoc e) {
511                         //e.printStackTrace();
512
}
513                 }
514             }
515             if(corr != null) {
516                 corr.done(req_id);
517             }
518             return false;
519         }
520     }
521
522     protected boolean getResponses() {
523         int num_not_received=getNum(NOT_RECEIVED);
524         int num_received=getNum(RECEIVED);
525         int num_suspected=getNum(SUSPECTED);
526         int num_total=membership.length;
527         int majority=determineMajority(num_total);
528         switch(rsp_mode) {
529             case GET_FIRST:
530                 if(num_received > 0)
531                     return true;
532                 if(num_suspected >= num_total)
533                 // e.g. 2 members, and both suspected
534
return true;
535                 break;
536             case GET_ALL:
537                 if(num_not_received > 0)
538                     return false;
539                 return true;
540             case GET_MAJORITY:
541                 if(num_received + num_suspected >= majority)
542                     return true;
543                 break;
544             case GET_ABS_MAJORITY:
545                 if(num_received >= majority)
546                     return true;
547                 break;
548             case GET_N:
549                 if(expected_mbrs >= num_total) {
550                     rsp_mode=GET_ALL;
551                     return getResponses();
552                 }
553                 if(num_received >= expected_mbrs) {
554                     return true;
555                 }
556                 if(num_received + num_not_received < expected_mbrs) {
557                     if(num_received + num_suspected >= expected_mbrs) {
558                         return true;
559                     }
560                     return false;
561                 }
562                 return false;
563             case GET_NONE:
564                 return true;
565             default :
566                 if(log.isErrorEnabled()) log.error("rsp_mode " + rsp_mode + " unknown !");
567                 break;
568         }
569         return false;
570     }
571
572     /** Return number of elements of a certain type in array 'received'. Type can be RECEIVED,
573      NOT_RECEIVED or SUSPECTED */

574     int getNum(int type) {
575         int retval=0;
576         for(int i=0; i < received.length; i++)
577             if(received[i] == type)
578                 retval++;
579         return retval;
580     }
581
582
583
584    private String JavaDoc receivedToString(int r) {
585        switch(r) {
586        case RECEIVED:
587            return "RECEIVED";
588        case NOT_RECEIVED:
589            return "NOR_RECEIVED";
590        case SUSPECTED:
591            return "SUSPECTED";
592        default:
593            return "n/a";
594        }
595    }
596
597
598     /**
599      * Adjusts the 'received' array in the following way:
600      * <ul>
601      * <li>if a member P in 'membership' is not in 'members', P's entry in the 'received' array
602      * will be marked as SUSPECTED
603      * <li>if P is 'suspected_mbr', then P's entry in the 'received' array will be marked
604      * as SUSPECTED
605      * </ul>
606      * This call requires exclusive access to rsp_mutex (called by getResponses() which has
607      * a the rsp_mutex locked, so this should not be a problem).
608      */

609     void adjustMembership() {
610         Address mbr;
611         if(membership == null || membership.length == 0) {
612             // if(log.isWarnEnabled()) log.warn("GroupRequest.adjustMembership()", "membership is null");
613
return;
614         }
615         for(int i=0; i < membership.length; i++) {
616             mbr=membership[i];
617             if((this.members != null && !this.members.contains(mbr))
618                     || suspects.contains(mbr)) {
619                 addSuspect(mbr);
620                 responses[i]=null;
621                 received[i]=SUSPECTED;
622             }
623         }
624     }
625
626     /**
627      * Adds a member to the 'suspects' list. Removes oldest elements from 'suspects' list
628      * to keep the list bounded ('max_suspects' number of elements)
629      */

630     void addSuspect(Address suspected_mbr) {
631         if(!suspects.contains(suspected_mbr)) {
632             suspects.addElement(suspected_mbr);
633             while(suspects.size() >= max_suspects && suspects.size() > 0)
634                 suspects.remove(0); // keeps queue bounded
635
}
636     }
637 }
638
Popular Tags