KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > VotingAdapter


1 package org.jgroups.blocks;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.*;
6 import org.jgroups.util.Rsp;
7 import org.jgroups.util.RspList;
8
9 import java.io.Serializable JavaDoc;
10 import java.util.HashSet JavaDoc;
11 import java.util.Iterator JavaDoc;
12 import java.util.Set JavaDoc;
13
14 /**
15  * Voting adapter provides a voting functionality for an application. There
16  * should be at most one {@link VotingAdapter} listening on one {@link Channel}
17  * instance. Each adapter can have zero or more registered {@link VotingListener}
18  * instances that will be called during voting process.
19  * <p>
20  * Decree is an object that has some semantic meaning within the application.
21  * Each voting listener receives a decree and can respond with either
22  * <code>true</code> or false. If the decree has no meaning for the voting
23  * listener, it is required to throw {@link VoteException}. In this case
24  * this specific listener will be excluded from the voting on the specified
25  * decree. After performing local voting, this voting adapter sends the request
26  * back to the originator of the voting process. Originator receives results
27  * from each node and decides if all voting process succeeded or not depending
28  * on the consensus type specified during voting.
29  *
30  * @author Roman Rokytskyy (rrokytskyy@acm.org)
31  */

32 public class VotingAdapter implements MessageListener, MembershipListener {
33     
34     /**
35      * This consensus type means that at least one positive vote is required
36      * for the voting to succeed.
37      */

38     public static final int VOTE_ANY = 0;
39     
40     /**
41      * This consensus type means that at least one positive vote and no negative
42      * votes are required for the voting to succeed.
43      */

44     public static final int VOTE_ALL = 1;
45     
46     /**
47      * This consensus type means that number of positive votes should be greater
48      * than number of negative votes.
49      */

50     public static final int VOTE_MAJORITY = 2;
51     
52     
53     private static final int PROCESS_CONTINUE = 0;
54     private static final int PROCESS_SKIP = 1;
55     private static final int PROCESS_BREAK = 2;
56     
57
58     private final RpcDispatcher rpcDispatcher;
59
60     protected final Log log=LogFactory.getLog(getClass());
61
62     private final HashSet JavaDoc suspectedNodes = new HashSet JavaDoc();
63     private boolean blocked = false;
64     private boolean closed;
65
66     /**
67      * Creates an instance of the VoteChannel that uses JGroups
68      * for communication between group members.
69      * @param channel JGroups channel.
70      */

71     public VotingAdapter(Channel channel) {
72         rpcDispatcher = new RpcDispatcher(channel, this, this, this);
73     }
74
75     public VotingAdapter(PullPushAdapter adapter, Serializable JavaDoc id) {
76         rpcDispatcher = new RpcDispatcher(adapter, id, this, this, this);
77     }
78
79
80     /**
81      * Performs actual voting on the VoteChannel using the JGroups
82      * facilities for communication.
83      */

84     public boolean vote(Object JavaDoc decree, int consensusType, long timeout)
85     throws ChannelException
86     {
87         if (closed)
88             throw new ChannelException("Channel was closed.");
89             
90
91             if(log.isDebugEnabled()) log.debug("Conducting voting on decree " + decree + ", consensus type " +
92             getConsensusStr(consensusType) + ", timeout " + timeout);
93
94         int mode = GroupRequest.GET_ALL;
95
96         // perform the consensus mapping
97
switch (consensusType) {
98     case VotingAdapter.VOTE_ALL : mode = GroupRequest.GET_ALL; break;
99     case VotingAdapter.VOTE_ANY : mode = GroupRequest.GET_FIRST; break;
100     case VotingAdapter.VOTE_MAJORITY : mode = GroupRequest.GET_MAJORITY; break;
101     default : mode = GroupRequest.GET_ALL;
102         }
103
104         try {
105             java.lang.reflect.Method JavaDoc method = this.getClass().getMethod(
106                                     "localVote", new Class JavaDoc[] { Object JavaDoc.class });
107
108             MethodCall methodCall = new MethodCall(method, new Object JavaDoc[] {decree});
109
110
111                 if(log.isDebugEnabled()) log.debug("Calling remote methods...");
112     
113             // vote
114
RspList responses = rpcDispatcher.callRemoteMethods(
115                                      null, methodCall, mode, timeout);
116                 
117
118                 if(log.isDebugEnabled()) log.debug("Checking responses.");
119
120
121             return processResponses(responses, consensusType);
122             
123         } catch(NoSuchMethodException JavaDoc nsmex) {
124             
125             // UPS!!! How can this happen?!
126

127             if(log.isErrorEnabled()) log.error("Could not find method localVote(Object). " +
128             nsmex.toString());
129
130             throw new UnsupportedOperationException JavaDoc(
131                             "Cannot execute voting because of absence of " +
132                             this.getClass().getName() + ".localVote(Object) method.");
133         }
134     }
135
136     /**
137      * Processes the response list and makes a decision according to the
138      * type of the consensus for current voting.
139      * <p>
140      * Note: we do not support voting in case of Byzantine failures, i.e.
141      * when the node responds with the fault message.
142      */

143     private boolean processResponses(RspList responses, int consensusType)
144     throws ChannelException
145     {
146         if (responses == null) {
147             return false;
148         }
149
150         boolean voteResult = false;
151         int totalPositiveVotes = 0;
152         int totalNegativeVotes = 0;
153
154         for(int i = 0; i < responses.size(); i++) {
155             Rsp response = (Rsp)responses.elementAt(i);
156
157             switch(checkResponse(response)) {
158         case PROCESS_SKIP : continue;
159         case PROCESS_BREAK : return false;
160             }
161
162             VoteResult result = (VoteResult)response.getValue();
163             
164             totalPositiveVotes += result.getPositiveVotes();
165             totalNegativeVotes += result.getNegativeVotes();
166         }
167
168         switch(consensusType) {
169     case VotingAdapter.VOTE_ALL :
170         voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);
171         break;
172     case VotingAdapter.VOTE_ANY :
173         voteResult = (totalPositiveVotes > 0);
174         break;
175     case VotingAdapter.VOTE_MAJORITY :
176         voteResult = (totalPositiveVotes > totalNegativeVotes);
177         }
178
179         return voteResult;
180     }
181
182     /**
183      * This method checks the response and says the processResponses() method
184      * what to do.
185      * @return PROCESS_CONTINUE to continue calculating votes,
186      * PROCESS_BREAK to stop calculating votes from the nodes,
187      * PROCESS_SKIP to skip current response.
188      * @throws ChannelException when the response is fatal to the
189      * current voting process.
190      */

191     private int checkResponse(Rsp response) throws ChannelException {
192
193         if (!response.wasReceived()) {
194             
195
196                 if(log.isDebugEnabled()) log.debug("Response from node " + response.getSender() +
197                 " was not received.");
198             
199             // what do we do when one node failed to respond?
200
//throw new ChannelException("Node " + response.GetSender() +
201
// " failed to respond.");
202
return PROCESS_BREAK ;
203         }
204
205         /**@todo check what to do here */
206         if (response.wasSuspected()) {
207             
208
209                 if(log.isDebugEnabled()) log.debug("Node " + response.getSender() + " was suspected.");
210             
211             // wat do we do when one node is suspected?
212
return PROCESS_SKIP ;
213         }
214
215         Object JavaDoc object = response.getValue();
216
217         // we received exception/error, something went wrong
218
// on one of the nodes... and we do not handle such faults
219
if (object instanceof Throwable JavaDoc) {
220             throw new ChannelException("Node " + response.getSender() +
221                        " is faulty.");
222         }
223
224         if (object == null) {
225             return PROCESS_SKIP;
226         }
227
228         // it is always interesting to know the class that caused failure...
229
if (!(object instanceof VoteResult)) {
230             String JavaDoc faultClass = object.getClass().getName();
231
232             // ...but we do not handle byzantine faults
233
throw new ChannelException("Node " + response.getSender() +
234                        " generated fault (class " + faultClass + ')');
235         }
236
237         // what if we received the response from faulty node?
238
if (object instanceof FailureVoteResult) {
239             
240             if(log.isErrorEnabled()) log.error(((FailureVoteResult)object).getReason());
241             
242             return PROCESS_BREAK;
243         }
244
245         // everything is fine :)
246
return PROCESS_CONTINUE;
247     }
248
249     /**
250      * Callback for notification about the new view of the group.
251      */

252     public void viewAccepted(View newView) {
253
254         // clean nodes that were suspected but still exist in new view
255
Iterator JavaDoc iterator = suspectedNodes.iterator();
256         while(iterator.hasNext()) {
257             Address suspectedNode = (Address)iterator.next();
258             if (newView.containsMember(suspectedNode))
259                 iterator.remove();
260         }
261
262         blocked = false;
263     }
264
265     /**
266      * Callback for notification that one node is suspected
267      */

268     public void suspect(Address suspected) {
269         suspectedNodes.add(suspected);
270     }
271
272     /**
273      * Blocks the channel until the ViewAccepted is invoked.
274      */

275     public void block() {
276         blocked = true;
277     }
278
279     /**
280      * Get the channel state.
281      *
282      * @return always <code>null</code>, we do not have any group-shared
283      * state.
284      */

285     public byte[] getState() {
286         return null;
287     }
288
289     /**
290      * Receive the message. All messages are ignored.
291      *
292      * @param msg message to check.
293      */

294     public void receive(org.jgroups.Message msg) {
295         // do nothing
296
}
297
298     /**
299      * Set the channel state. We do nothing here.
300      */

301     public void setState(byte[] state) {
302         // ignore the state, we do not have any.
303
}
304             
305     private final Set JavaDoc voteListeners = new HashSet JavaDoc();
306     private VotingListener[] listeners;
307
308     /**
309      * Vote on the specified decree requiring all nodes to vote.
310      *
311      * @param decree decree on which nodes should vote.
312      * @param timeout time during which nodes can vote.
313      *
314      * @return <code>true</code> if nodes agreed on a decree, otherwise
315      * <code>false</code>
316      *
317      * @throws ChannelException if something went wrong.
318      */

319     public boolean vote(Object JavaDoc decree, long timeout) throws ChannelException {
320         return vote(decree, VOTE_ALL, timeout);
321     }
322
323     /**
324      * Adds voting listener.
325      */

326     public void addVoteListener(VotingListener listener) {
327         voteListeners.add(listener);
328         listeners = (VotingListener[])voteListeners.toArray(
329                                 new VotingListener[voteListeners.size()]);
330     }
331
332     /**
333      * Removes voting listener.
334      */

335     public void removeVoteListener(VotingListener listener) {
336         voteListeners.remove(listener);
337
338         listeners = (VotingListener[])voteListeners.toArray(
339                                 new VotingListener[voteListeners.size()]);
340     }
341
342     /**
343      * This method performs voting on the specific decree between all
344      * local voteListeners.
345      */

346     public VoteResult localVote(Object JavaDoc decree) {
347
348         VoteResult voteResult = new VoteResult();
349     
350         for(int i = 0; i < listeners.length; i++) {
351             VotingListener listener = listeners[i];
352
353             try {
354                 voteResult.addVote(listener.vote(decree));
355             } catch (VoteException vex) {
356                 // do nothing here.
357
} catch(RuntimeException JavaDoc ex) {
358                 
359                 if(log.isErrorEnabled()) log.error(ex.toString());
360                 
361                 // if we are here, then listener
362
// had thrown a RuntimeException
363
return new FailureVoteResult(ex.getMessage());
364             }
365         }
366
367
368             if(log.isDebugEnabled()) log.debug("Voting on decree " + decree.toString() + " : " +
369             voteResult.toString());
370
371         return voteResult;
372     }
373
374     /**
375      * Convert consensus type into string representation. This method is
376      * useful for debugginf.
377      *
378      * @param consensusType type of the consensus.
379      *
380      * @return string representation of the consensus type.
381      */

382     public static String JavaDoc getConsensusStr(int consensusType) {
383         switch(consensusType) {
384     case VotingAdapter.VOTE_ALL : return "VOTE_ALL";
385     case VotingAdapter.VOTE_ANY : return "VOTE_ANY";
386     case VotingAdapter.VOTE_MAJORITY : return "VOTE_MAJORITY";
387     default : return "UNKNOWN";
388         }
389     }
390
391
392     /**
393      * This class represents the result of local voting. It contains a
394      * number of positive and negative votes collected during local voting.
395      */

396     public static class VoteResult implements Serializable JavaDoc {
397         private int positiveVotes = 0;
398         private int negativeVotes = 0;
399
400         public void addVote(boolean vote) {
401             if (vote)
402                 positiveVotes++;
403             else
404                 negativeVotes++;
405         }
406
407         public int getPositiveVotes() { return positiveVotes; }
408
409         public int getNegativeVotes() { return negativeVotes; }
410
411         public String JavaDoc toString() {
412             return "VoteResult: up=" + positiveVotes +
413                 ", down=" + negativeVotes;
414         }
415     }
416
417     /**
418      * Class that represents a result of local voting on the failed node.
419      */

420     public static class FailureVoteResult extends VoteResult {
421         private final String JavaDoc reason;
422         
423         public FailureVoteResult(String JavaDoc reason) {
424             this.reason = reason;
425         }
426         
427         public String JavaDoc getReason() {
428             return reason;
429         }
430     }
431     
432 }
433
Popular Tags