1 package org.jgroups.blocks; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.util.Rsp; 7 import org.jgroups.util.RspList; 8 9 import java.io.Serializable ; 10 import java.util.HashSet ; 11 import java.util.Iterator ; 12 import java.util.Set ; 13 14 32 public class VotingAdapter implements MessageListener, MembershipListener { 33 34 38 public static final int VOTE_ANY = 0; 39 40 44 public static final int VOTE_ALL = 1; 45 46 50 public static final int VOTE_MAJORITY = 2; 51 52 53 private static final int PROCESS_CONTINUE = 0; 54 private static final int PROCESS_SKIP = 1; 55 private static final int PROCESS_BREAK = 2; 56 57 58 private final RpcDispatcher rpcDispatcher; 59 60 protected final Log log=LogFactory.getLog(getClass()); 61 62 private final HashSet suspectedNodes = new HashSet (); 63 private boolean blocked = false; 64 private boolean closed; 65 66 71 public VotingAdapter(Channel channel) { 72 rpcDispatcher = new RpcDispatcher(channel, this, this, this); 73 } 74 75 public VotingAdapter(PullPushAdapter adapter, Serializable id) { 76 rpcDispatcher = new RpcDispatcher(adapter, id, this, this, this); 77 } 78 79 80 84 public boolean vote(Object decree, int consensusType, long timeout) 85 throws ChannelException 86 { 87 if (closed) 88 throw new ChannelException("Channel was closed."); 89 90 91 if(log.isDebugEnabled()) log.debug("Conducting voting on decree " + decree + ", consensus type " + 92 getConsensusStr(consensusType) + ", timeout " + timeout); 93 94 int mode = GroupRequest.GET_ALL; 95 96 switch (consensusType) { 98 case VotingAdapter.VOTE_ALL : mode = GroupRequest.GET_ALL; break; 99 case VotingAdapter.VOTE_ANY : mode = GroupRequest.GET_FIRST; break; 100 case VotingAdapter.VOTE_MAJORITY : mode = GroupRequest.GET_MAJORITY; break; 101 default : mode = GroupRequest.GET_ALL; 102 } 103 104 try { 105 java.lang.reflect.Method method = this.getClass().getMethod( 106 "localVote", new Class [] { Object .class }); 107 108 MethodCall methodCall = new MethodCall(method, new Object [] {decree}); 109 110 111 if(log.isDebugEnabled()) log.debug("Calling remote methods..."); 112 113 RspList responses = rpcDispatcher.callRemoteMethods( 115 null, methodCall, mode, timeout); 116 117 118 if(log.isDebugEnabled()) log.debug("Checking responses."); 119 120 121 return processResponses(responses, consensusType); 122 123 } catch(NoSuchMethodException nsmex) { 124 125 127 if(log.isErrorEnabled()) log.error("Could not find method localVote(Object). " + 128 nsmex.toString()); 129 130 throw new UnsupportedOperationException ( 131 "Cannot execute voting because of absence of " + 132 this.getClass().getName() + ".localVote(Object) method."); 133 } 134 } 135 136 143 private boolean processResponses(RspList responses, int consensusType) 144 throws ChannelException 145 { 146 if (responses == null) { 147 return false; 148 } 149 150 boolean voteResult = false; 151 int totalPositiveVotes = 0; 152 int totalNegativeVotes = 0; 153 154 for(int i = 0; i < responses.size(); i++) { 155 Rsp response = (Rsp)responses.elementAt(i); 156 157 switch(checkResponse(response)) { 158 case PROCESS_SKIP : continue; 159 case PROCESS_BREAK : return false; 160 } 161 162 VoteResult result = (VoteResult)response.getValue(); 163 164 totalPositiveVotes += result.getPositiveVotes(); 165 totalNegativeVotes += result.getNegativeVotes(); 166 } 167 168 switch(consensusType) { 169 case VotingAdapter.VOTE_ALL : 170 voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0); 171 break; 172 case VotingAdapter.VOTE_ANY : 173 voteResult = (totalPositiveVotes > 0); 174 break; 175 case VotingAdapter.VOTE_MAJORITY : 176 voteResult = (totalPositiveVotes > totalNegativeVotes); 177 } 178 179 return voteResult; 180 } 181 182 191 private int checkResponse(Rsp response) throws ChannelException { 192 193 if (!response.wasReceived()) { 194 195 196 if(log.isDebugEnabled()) log.debug("Response from node " + response.getSender() + 197 " was not received."); 198 199 return PROCESS_BREAK ; 203 } 204 205 206 if (response.wasSuspected()) { 207 208 209 if(log.isDebugEnabled()) log.debug("Node " + response.getSender() + " was suspected."); 210 211 return PROCESS_SKIP ; 213 } 214 215 Object object = response.getValue(); 216 217 if (object instanceof Throwable ) { 220 throw new ChannelException("Node " + response.getSender() + 221 " is faulty."); 222 } 223 224 if (object == null) { 225 return PROCESS_SKIP; 226 } 227 228 if (!(object instanceof VoteResult)) { 230 String faultClass = object.getClass().getName(); 231 232 throw new ChannelException("Node " + response.getSender() + 234 " generated fault (class " + faultClass + ')'); 235 } 236 237 if (object instanceof FailureVoteResult) { 239 240 if(log.isErrorEnabled()) log.error(((FailureVoteResult)object).getReason()); 241 242 return PROCESS_BREAK; 243 } 244 245 return PROCESS_CONTINUE; 247 } 248 249 252 public void viewAccepted(View newView) { 253 254 Iterator iterator = suspectedNodes.iterator(); 256 while(iterator.hasNext()) { 257 Address suspectedNode = (Address)iterator.next(); 258 if (newView.containsMember(suspectedNode)) 259 iterator.remove(); 260 } 261 262 blocked = false; 263 } 264 265 268 public void suspect(Address suspected) { 269 suspectedNodes.add(suspected); 270 } 271 272 275 public void block() { 276 blocked = true; 277 } 278 279 285 public byte[] getState() { 286 return null; 287 } 288 289 294 public void receive(org.jgroups.Message msg) { 295 } 297 298 301 public void setState(byte[] state) { 302 } 304 305 private final Set voteListeners = new HashSet (); 306 private VotingListener[] listeners; 307 308 319 public boolean vote(Object decree, long timeout) throws ChannelException { 320 return vote(decree, VOTE_ALL, timeout); 321 } 322 323 326 public void addVoteListener(VotingListener listener) { 327 voteListeners.add(listener); 328 listeners = (VotingListener[])voteListeners.toArray( 329 new VotingListener[voteListeners.size()]); 330 } 331 332 335 public void removeVoteListener(VotingListener listener) { 336 voteListeners.remove(listener); 337 338 listeners = (VotingListener[])voteListeners.toArray( 339 new VotingListener[voteListeners.size()]); 340 } 341 342 346 public VoteResult localVote(Object decree) { 347 348 VoteResult voteResult = new VoteResult(); 349 350 for(int i = 0; i < listeners.length; i++) { 351 VotingListener listener = listeners[i]; 352 353 try { 354 voteResult.addVote(listener.vote(decree)); 355 } catch (VoteException vex) { 356 } catch(RuntimeException ex) { 358 359 if(log.isErrorEnabled()) log.error(ex.toString()); 360 361 return new FailureVoteResult(ex.getMessage()); 364 } 365 } 366 367 368 if(log.isDebugEnabled()) log.debug("Voting on decree " + decree.toString() + " : " + 369 voteResult.toString()); 370 371 return voteResult; 372 } 373 374 382 public static String getConsensusStr(int consensusType) { 383 switch(consensusType) { 384 case VotingAdapter.VOTE_ALL : return "VOTE_ALL"; 385 case VotingAdapter.VOTE_ANY : return "VOTE_ANY"; 386 case VotingAdapter.VOTE_MAJORITY : return "VOTE_MAJORITY"; 387 default : return "UNKNOWN"; 388 } 389 } 390 391 392 396 public static class VoteResult implements Serializable { 397 private int positiveVotes = 0; 398 private int negativeVotes = 0; 399 400 public void addVote(boolean vote) { 401 if (vote) 402 positiveVotes++; 403 else 404 negativeVotes++; 405 } 406 407 public int getPositiveVotes() { return positiveVotes; } 408 409 public int getNegativeVotes() { return negativeVotes; } 410 411 public String toString() { 412 return "VoteResult: up=" + positiveVotes + 413 ", down=" + negativeVotes; 414 } 415 } 416 417 420 public static class FailureVoteResult extends VoteResult { 421 private final String reason; 422 423 public FailureVoteResult(String reason) { 424 this.reason = reason; 425 } 426 427 public String getReason() { 428 return reason; 429 } 430 } 431 432 } 433 | Popular Tags |