1 5 package com.tc.l2.state; 6 7 import com.tc.l2.msg.L2StateMessage; 8 import com.tc.l2.msg.L2StateMessageFactory; 9 import com.tc.logging.TCLogger; 10 import com.tc.logging.TCLogging; 11 import com.tc.net.groups.GroupException; 12 import com.tc.net.groups.GroupManager; 13 import com.tc.net.groups.GroupMessage; 14 import com.tc.net.groups.GroupResponse; 15 import com.tc.net.groups.NodeID; 16 import com.tc.properties.TCPropertiesImpl; 17 import com.tc.util.Assert; 18 import com.tc.util.State; 19 20 import java.util.HashSet ; 21 import java.util.Iterator ; 22 23 public class ElectionManagerImpl implements ElectionManager { 24 25 private static final TCLogger logger = TCLogging.getLogger(ElectionManagerImpl.class); 26 27 private static final State INIT = new State("Initial-State"); 28 private static final State ELECTION_COMPLETE = new State("Election-Complete"); 29 private static final State ELECTION_IN_PROGRESS = new State("Election-In-Progress"); 30 31 private static final long ELECTION_TIME = TCPropertiesImpl.getProperties() 32 .getLong("l2.ha.electionmanager.electionTimePeriod"); 33 34 private final GroupManager groupManager; 35 private State state = INIT; 36 37 private HashSet votes = new HashSet (); 39 private Enrollment myVote = null; 40 private Enrollment winner; 41 42 public ElectionManagerImpl(GroupManager groupManager) { 43 this.groupManager = groupManager; 44 } 45 46 public synchronized void handleStartElectionRequest(L2StateMessage msg) { 47 Assert.assertEquals(L2StateMessage.START_ELECTION, msg.getType()); 48 if (state == ELECTION_IN_PROGRESS) { 49 Assert.assertNotNull(myVote); 52 votes.add(msg.getEnrollment()); 53 if (msg.inResponseTo().isNull()) { 54 GroupMessage response = createElectionStartedMessage(msg, myVote); 56 logger.info("Casted vote from " + msg + " My Response : " + response); 57 try { 58 groupManager.sendTo(msg.messageFrom(), response); 59 } catch (GroupException e) { 60 throw new AssertionError (e); 61 } 62 } else { 63 logger.info("Casted vote from " + msg); 64 } 65 } else { 66 logger.info("Ignoring Start Election Request : " + msg + " My state = " + state); 67 } 68 } 69 70 public synchronized void handleElectionAbort(L2StateMessage msg) { 71 Assert.assertEquals(L2StateMessage.ABORT_ELECTION, msg.getType()); 72 if (state == ELECTION_IN_PROGRESS) { 73 Assert.assertNotNull(myVote); 75 basicAbort(msg); 76 } else { 77 logger.warn("Ignoring Abort Election Request : " + msg + " My state = " + state); 78 } 79 } 80 81 public synchronized void handleElectionResultMessage(L2StateMessage msg) { 82 Assert.assertEquals(L2StateMessage.ELECTION_RESULT, msg.getType()); 83 if (state == ELECTION_COMPLETE && !this.winner.equals(msg.getEnrollment())) { 84 GroupMessage resultConflict = L2StateMessageFactory.createResultConflictMessage(msg, this.winner); 86 logger.warn("WARNING :: Election result conflict : Winner local = " + this.winner + " : remote winner = " 87 + msg.getEnrollment()); 88 try { 89 groupManager.sendTo(msg.messageFrom(), resultConflict); 90 } catch (GroupException e) { 91 throw new AssertionError (e); 92 } 93 } 94 if (state == ELECTION_IN_PROGRESS) { 95 basicAbort(msg); 97 } 98 GroupMessage resultAgreed = L2StateMessageFactory.createResultAgreedMessage(msg, msg.getEnrollment()); 99 logger.info("Agreed with Election Result from " + msg.messageFrom() + " : " + resultAgreed); 100 try { 101 groupManager.sendTo(msg.messageFrom(), resultAgreed); 102 } catch (GroupException e) { 103 throw new AssertionError (e); 104 } 105 } 106 107 private void basicAbort(L2StateMessage msg) { 108 this.winner = msg.getEnrollment(); 109 reset(); 110 logger.info("Aborted Election : Winner is : " + this.winner); 111 notifyAll(); 112 } 113 114 117 public synchronized void declareWinner(NodeID myNodeId) { 118 Assert.assertEquals(winner.getNodeID(), myNodeId); 119 GroupMessage msg = createElectionWonMessage(this.winner); 120 try { 121 this.groupManager.sendAll(msg); 122 } catch (GroupException e) { 123 logger.error(e); 124 throw new AssertionError (e); 125 } 126 logger.info("Declared as Winner: Winner is : " + this.winner); 127 reset(); 128 } 129 130 public synchronized void reset() { 131 this.state = INIT; 132 this.votes.clear(); 133 this.myVote = null; 134 } 135 136 public NodeID runElection(NodeID myNodeId, boolean isNew) { 137 NodeID winnerID; 138 try { 139 while ((winnerID = doElection(myNodeId, isNew)).isNull()) { 140 logger.info("Requesting Re-election !!!"); 142 } 143 } catch (GroupException e1) { 144 logger.error("Error during election : ", e1); 145 throw new AssertionError (e1); 146 } 147 return winnerID; 148 } 149 150 private synchronized void electionStarted(Enrollment e) { 151 if (this.state == ELECTION_IN_PROGRESS) { throw new AssertionError ("Election Already in Progress"); } 152 this.state = ELECTION_IN_PROGRESS; 153 this.myVote = e; 154 this.winner = null; 155 this.votes.clear(); 156 this.votes.add(e); logger.info("Election Started : " + e); 158 } 159 160 private NodeID doElection(NodeID myNodeId, boolean isNew) throws GroupException { 161 162 Enrollment e = EnrollmentFactory.createEnrollment(myNodeId, isNew); 164 electionStarted(e); 165 166 GroupMessage msg = createElectionStartedMessage(e); 167 groupManager.sendAll(msg); 168 169 waitTillElectionComplete(); 171 172 Enrollment lWinner = computeResult(); 174 if (lWinner != e) { 175 logger.info("Election lost : Winner is : " + lWinner); 176 Assert.assertNotNull(lWinner); 177 return lWinner.getNodeID(); 178 } 179 msg = createElectionResultMessage(e); 181 GroupResponse responses = groupManager.sendAllAndWaitForResponse(msg); 182 for (Iterator i = responses.getResponses().iterator(); i.hasNext();) { 183 L2StateMessage response = (L2StateMessage) i.next(); 184 Assert.assertEquals(msg.getMessageID(), response.inResponseTo()); 185 if (response.getType() == L2StateMessage.RESULT_AGREED) { 186 Assert.assertEquals(e, response.getEnrollment()); 187 } else if (response.getType() == L2StateMessage.RESULT_CONFLICT) { 188 logger.info("Result Conflict: Local Result : " + e + " From : " + response.messageFrom() + " Result : " 189 + response.getEnrollment()); 190 return NodeID.NULL_ID; 191 } else { 192 throw new AssertionError ("Node : " + response.messageFrom() 193 + " responded neither with RESULT_AGREED or RESULT_CONFLICT :" + response); 194 } 195 } 196 197 return myNodeId; 199 } 200 201 private synchronized Enrollment computeResult() { 202 if (state == ELECTION_IN_PROGRESS) { 203 state = ELECTION_COMPLETE; 204 logger.info("Election Complete : " + votes + " : " + state); 205 winner = countVotes(); 206 } 207 return winner; 208 } 209 210 private Enrollment countVotes() { 211 Enrollment computedWinner = null; 212 for (Iterator i = votes.iterator(); i.hasNext();) { 213 Enrollment e = (Enrollment) i.next(); 214 if (computedWinner == null) { 215 computedWinner = e; 216 } else if (e.wins(computedWinner)) { 217 computedWinner = e; 218 } 219 } 220 Assert.assertNotNull(computedWinner); 221 return computedWinner; 222 } 223 224 private synchronized void waitTillElectionComplete() { 225 long start = System.currentTimeMillis(); 226 long diff = ELECTION_TIME; 227 while (state == ELECTION_IN_PROGRESS && diff > 0) { 228 try { 229 wait(diff); 230 } catch (InterruptedException e) { 231 throw new AssertionError (e); 232 } 233 diff = diff - (System.currentTimeMillis() - start); 234 } 235 } 236 237 private GroupMessage createElectionStartedMessage(Enrollment e) { 238 return L2StateMessageFactory.createElectionStartedMessage(e); 239 } 240 241 private GroupMessage createElectionWonMessage(Enrollment e) { 242 return L2StateMessageFactory.createElectionWonMessage(e); 243 } 244 245 private GroupMessage createElectionResultMessage(Enrollment e) { 246 return L2StateMessageFactory.createElectionResultMessage(e); 247 } 248 249 private GroupMessage createElectionStartedMessage(L2StateMessage msg, Enrollment e) { 250 return L2StateMessageFactory.createElectionStartedMessage(msg, e); 251 } 252 253 } 254 | Popular Tags |