1 5 package com.tc.l2.state; 6 7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 8 9 import com.tc.async.api.Sink; 10 import com.tc.l2.context.StateChangedEvent; 11 import com.tc.l2.msg.L2StateMessage; 12 import com.tc.l2.msg.L2StateMessageFactory; 13 import com.tc.logging.TCLogger; 14 import com.tc.logging.TCLogging; 15 import com.tc.net.groups.GroupEventsListener; 16 import com.tc.net.groups.GroupException; 17 import com.tc.net.groups.GroupManager; 18 import com.tc.net.groups.GroupMessage; 19 import com.tc.net.groups.GroupMessageListener; 20 import com.tc.net.groups.NodeID; 21 import com.tc.util.Assert; 22 import com.tc.util.State; 23 24 import java.util.Iterator ; 25 26 public class StateManagerImpl implements StateManager, GroupMessageListener, GroupEventsListener { 27 28 private static final TCLogger logger = TCLogging.getLogger(StateManagerImpl.class); 29 30 private final TCLogger consoleLogger; 31 private final GroupManager groupManager; 32 private final ElectionManager electionMgr; 33 private final Sink stateChangeSink; 34 35 private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); 36 private final Object electionLock = new Object (); 37 38 private NodeID activeNode = NodeID.NULL_ID; 39 private volatile State state = START_STATE; 40 41 public StateManagerImpl(TCLogger consoleLogger, GroupManager groupManager, Sink stateChangeSink) { 42 this.consoleLogger = consoleLogger; 43 this.groupManager = groupManager; 44 this.stateChangeSink = stateChangeSink; 45 this.electionMgr = new ElectionManagerImpl(groupManager); 46 this.groupManager.registerForMessages(L2StateMessage.class, this); 47 this.groupManager.registerForGroupEvents(this); 48 } 49 50 54 public void startElection() { 55 synchronized (electionLock) { 56 if (state == START_STATE) { 57 runElection(true); 58 } else if (state == PASSIVE_STANDBY) { 59 runElection(false); 60 } else { 61 info("Ignoring Election request since not in right state"); 62 } 63 } 64 } 65 66 private void runElection(boolean isNew) { 67 NodeID myNodeID = getLocalNodeID(); 68 NodeID winner = electionMgr.runElection(myNodeID, isNew); 69 if (winner == myNodeID) { 70 moveToActiveState(); 71 } 72 } 73 74 private NodeID getLocalNodeID() { 75 try { 76 return groupManager.getLocalNodeID(); 77 } catch (GroupException e) { 78 throw new AssertionError (e); 79 } 80 } 81 82 public void registerForStateChangeEvents(StateChangeListener listener) { 83 listeners.add(listener); 84 } 85 86 public void fireStateChangedEvent(StateChangedEvent sce) { 87 for (Iterator i = listeners.iterator(); i.hasNext();) { 88 StateChangeListener listener = (StateChangeListener) i.next(); 89 listener.l2StateChanged(sce); 90 } 91 } 92 93 private synchronized void moveToPassiveState(boolean initialized) { 94 electionMgr.reset(); 95 if (state == START_STATE) { 96 state = initialized ? PASSIVE_STANDBY : PASSIVE_UNINTIALIZED; 97 info("Moved to " + state, true); 98 stateChangeSink.add(new StateChangedEvent(START_STATE, state)); 99 } else if (state == ACTIVE_COORDINATOR) { 100 throw new AssertionError ("Cant move to " + PASSIVE_UNINTIALIZED + " from " + ACTIVE_COORDINATOR 102 + " at least for now"); 103 } 104 } 105 106 private synchronized void moveToPassiveStandbyState() { 107 if (state == ACTIVE_COORDINATOR) { 108 throw new AssertionError ("Cant move to " + PASSIVE_STANDBY + " from " + ACTIVE_COORDINATOR + " at least for now"); 110 } else if (state != PASSIVE_STANDBY) { 111 stateChangeSink.add(new StateChangedEvent(state, PASSIVE_STANDBY)); 112 state = PASSIVE_STANDBY; 113 info("Moved to " + state, true); 114 } else { 115 info("Already in " + state); 116 } 117 } 118 119 private synchronized void moveToActiveState() { 120 if (state == START_STATE || state == PASSIVE_STANDBY) { 121 StateChangedEvent event = new StateChangedEvent(state, ACTIVE_COORDINATOR); 123 state = ACTIVE_COORDINATOR; 124 this.activeNode = getLocalNodeID(); 125 info("Becoming " + state, true); 126 electionMgr.declareWinner(this.activeNode); 127 stateChangeSink.add(event); 128 } else { 129 throw new AssertionError ("Cant move to " + ACTIVE_COORDINATOR + " from " + state); 130 } 131 } 132 133 public synchronized boolean isActiveCoordinator() { 134 return (state == ACTIVE_COORDINATOR); 135 } 136 137 public void moveNodeToPassiveStandby(NodeID nodeID) { 138 logger.info("Requesting node " + nodeID + " to move to " + PASSIVE_STANDBY); 139 GroupMessage msg = L2StateMessageFactory.createMoveToPassiveStandbyMessage(EnrollmentFactory 140 .createTrumpEnrollment(getLocalNodeID())); 141 try { 142 this.groupManager.sendTo(nodeID, msg); 143 } catch (GroupException e) { 144 logger.error(e); 145 } 146 } 147 148 151 public synchronized void messageReceived(NodeID fromNode, GroupMessage msg) { 152 if (!(msg instanceof L2StateMessage)) { throw new AssertionError ("StateManagerImpl : Received wrong message type :" 153 + msg); } 154 L2StateMessage clusterMsg = (L2StateMessage) msg; 155 handleClusterStateMessage(clusterMsg); 156 } 157 158 private void handleClusterStateMessage(L2StateMessage clusterMsg) { 159 try { 160 switch (clusterMsg.getType()) { 161 case L2StateMessage.START_ELECTION: 162 handleStartElectionRequest(clusterMsg); 163 break; 164 case L2StateMessage.ABORT_ELECTION: 165 handleElectionAbort(clusterMsg); 166 break; 167 case L2StateMessage.ELECTION_RESULT: 168 handleElectionResultMessage(clusterMsg); 169 break; 170 case L2StateMessage.ELECTION_WON: 171 handleElectionWonMessage(clusterMsg); 172 break; 173 case L2StateMessage.MOVE_TO_PASSIVE_STANDBY: 174 handleMoveToPassiveStandbyMessage(clusterMsg); 175 break; 176 default: 177 throw new AssertionError ("This message shouldn't have been routed here : " + clusterMsg); 178 } 179 } catch (GroupException ge) { 180 logger.error("Caught Exception while handling Message : " + clusterMsg, ge); 181 throw new AssertionError (ge); 182 183 } 184 } 185 186 private void handleMoveToPassiveStandbyMessage(L2StateMessage clusterMsg) { 187 moveToPassiveStandbyState(); 188 } 189 190 private void handleElectionWonMessage(L2StateMessage clusterMsg) { 191 if (state == ACTIVE_COORDINATOR) { 192 logger.error(state + " Received Election Won Msg : " + clusterMsg + ". Possible split brain detected "); 195 throw new AssertionError (state + " Received Election Won Msg : " + clusterMsg 196 + ". Possible split brain detected "); 197 } 198 Enrollment winningEnrollment = clusterMsg.getEnrollment(); 199 this.activeNode = winningEnrollment.getNodeID(); 200 moveToPassiveState(winningEnrollment.isANewCandidate()); 201 } 202 203 private void handleElectionResultMessage(L2StateMessage msg) throws GroupException { 204 if (activeNode.equals(msg.getEnrollment().getNodeID())) { 205 Assert.assertFalse(NodeID.NULL_ID.equals(activeNode)); 206 GroupMessage resultAgreed = L2StateMessageFactory.createResultAgreedMessage(msg, msg.getEnrollment()); 208 logger.info("Agreed with Election Result from " + msg.messageFrom() + " : " + resultAgreed); 209 groupManager.sendTo(msg.messageFrom(), resultAgreed); 210 } else if (state == ACTIVE_COORDINATOR || !activeNode.isNull()) { 211 GroupMessage resultConflict = L2StateMessageFactory.createResultConflictMessage(msg, EnrollmentFactory 215 .createTrumpEnrollment(getLocalNodeID())); 216 warn("WARNING :: Active Node = " + activeNode + " , " + state 217 + " received ELECTION_RESULT message from another node : " + msg + " : Forcing re-election " 218 + resultConflict); 219 groupManager.sendTo(msg.messageFrom(), resultConflict); 220 } else { 221 electionMgr.handleElectionResultMessage(msg); 222 } 223 } 224 225 private void handleElectionAbort(L2StateMessage clusterMsg) { 226 if (state == ACTIVE_COORDINATOR) { 227 logger.error(state + " Received Abort Election Msg : Possible split brain detected "); 229 throw new AssertionError (state + " Received Abort Election Msg : Possible split brain detected "); 230 } 231 electionMgr.handleElectionAbort(clusterMsg); 232 } 233 234 private void handleStartElectionRequest(L2StateMessage msg) throws GroupException { 235 if (state == ACTIVE_COORDINATOR) { 236 GroupMessage abortMsg = L2StateMessageFactory.createAbortElectionMessage(msg, EnrollmentFactory 238 .createTrumpEnrollment(getLocalNodeID())); 239 info("Forcing Abort Election for " + msg + " with " + abortMsg); 240 groupManager.sendTo(msg.messageFrom(), abortMsg); 241 } else { 242 electionMgr.handleStartElectionRequest(msg); 243 } 244 } 245 246 public synchronized void nodeJoined(NodeID nodeID) { 248 info("Node : " + nodeID + " joined the cluster", true); 249 if (state == ACTIVE_COORDINATOR) { 250 GroupMessage msg = L2StateMessageFactory.createElectionWonMessage(EnrollmentFactory 252 .createTrumpEnrollment(getLocalNodeID())); 253 try { 254 groupManager.sendTo(nodeID, msg); 255 } catch (GroupException e) { 256 throw new AssertionError (e); 257 } 258 } 259 } 260 261 public void nodeLeft(NodeID nodeID) { 263 warn("Node : " + nodeID + " left the cluster", true); 264 boolean elect = false; 265 synchronized (this) { 266 if (state != PASSIVE_UNINTIALIZED && state != ACTIVE_COORDINATOR 267 && (activeNode.isNull() || activeNode.equals(nodeID))) { 268 elect = true; 269 activeNode = NodeID.NULL_ID; 270 } 271 } 272 if (elect) { 273 info("Starting Election to determine cluser wide ACTIVE L2"); 274 startElection(); 275 } 276 } 277 278 private void info(String message) { 279 info(message, false); 280 } 281 282 private void info(String message, boolean console) { 283 logger.info(message); 284 if (console) { 285 consoleLogger.info(message); 286 } 287 } 288 289 private void warn(String message) { 290 warn(message, false); 291 } 292 293 private void warn(String message, boolean console) { 294 logger.warn(message); 295 if (console) { 296 consoleLogger.warn(message); 297 } 298 } 299 } 300 | Popular Tags |