KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > l2 > state > StateManagerImpl


1 /*
2  * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.l2.state;
6
7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
8
9 import com.tc.async.api.Sink;
10 import com.tc.l2.context.StateChangedEvent;
11 import com.tc.l2.msg.L2StateMessage;
12 import com.tc.l2.msg.L2StateMessageFactory;
13 import com.tc.logging.TCLogger;
14 import com.tc.logging.TCLogging;
15 import com.tc.net.groups.GroupEventsListener;
16 import com.tc.net.groups.GroupException;
17 import com.tc.net.groups.GroupManager;
18 import com.tc.net.groups.GroupMessage;
19 import com.tc.net.groups.GroupMessageListener;
20 import com.tc.net.groups.NodeID;
21 import com.tc.util.Assert;
22 import com.tc.util.State;
23
24 import java.util.Iterator JavaDoc;
25
26 public class StateManagerImpl implements StateManager, GroupMessageListener, GroupEventsListener {
27
28   private static final TCLogger logger = TCLogging.getLogger(StateManagerImpl.class);
29
30   private final TCLogger consoleLogger;
31   private final GroupManager groupManager;
32   private final ElectionManager electionMgr;
33   private final Sink stateChangeSink;
34
35   private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
36   private final Object JavaDoc electionLock = new Object JavaDoc();
37
38   private NodeID activeNode = NodeID.NULL_ID;
39   private volatile State state = START_STATE;
40
41   public StateManagerImpl(TCLogger consoleLogger, GroupManager groupManager, Sink stateChangeSink) {
42     this.consoleLogger = consoleLogger;
43     this.groupManager = groupManager;
44     this.stateChangeSink = stateChangeSink;
45     this.electionMgr = new ElectionManagerImpl(groupManager);
46     this.groupManager.registerForMessages(L2StateMessage.class, this);
47     this.groupManager.registerForGroupEvents(this);
48   }
49
50   /*
51    * XXX:: If ACTIVE went dead before any passive moved to STANDBY state, then the cluster is hung and there is no going
52    * around it. If ACTIVE in persistent mode, it can come back and recover the cluster
53    */

54   public void startElection() {
55     synchronized (electionLock) {
56       if (state == START_STATE) {
57         runElection(true);
58       } else if (state == PASSIVE_STANDBY) {
59         runElection(false);
60       } else {
61         info("Ignoring Election request since not in right state");
62       }
63     }
64   }
65
66   private void runElection(boolean isNew) {
67     NodeID myNodeID = getLocalNodeID();
68     NodeID winner = electionMgr.runElection(myNodeID, isNew);
69     if (winner == myNodeID) {
70       moveToActiveState();
71     }
72   }
73
74   private NodeID getLocalNodeID() {
75     try {
76       return groupManager.getLocalNodeID();
77     } catch (GroupException e) {
78       throw new AssertionError JavaDoc(e);
79     }
80   }
81
82   public void registerForStateChangeEvents(StateChangeListener listener) {
83     listeners.add(listener);
84   }
85
86   public void fireStateChangedEvent(StateChangedEvent sce) {
87     for (Iterator JavaDoc i = listeners.iterator(); i.hasNext();) {
88       StateChangeListener listener = (StateChangeListener) i.next();
89       listener.l2StateChanged(sce);
90     }
91   }
92
93   private synchronized void moveToPassiveState(boolean initialized) {
94     electionMgr.reset();
95     if (state == START_STATE) {
96       state = initialized ? PASSIVE_STANDBY : PASSIVE_UNINTIALIZED;
97       info("Moved to " + state, true);
98       stateChangeSink.add(new StateChangedEvent(START_STATE, state));
99     } else if (state == ACTIVE_COORDINATOR) {
100       // TODO:: Support this later
101
throw new AssertionError JavaDoc("Cant move to " + PASSIVE_UNINTIALIZED + " from " + ACTIVE_COORDINATOR
102                                + " at least for now");
103     }
104   }
105
106   private synchronized void moveToPassiveStandbyState() {
107     if (state == ACTIVE_COORDINATOR) {
108       // TODO:: Support this later
109
throw new AssertionError JavaDoc("Cant move to " + PASSIVE_STANDBY + " from " + ACTIVE_COORDINATOR + " at least for now");
110     } else if (state != PASSIVE_STANDBY) {
111       stateChangeSink.add(new StateChangedEvent(state, PASSIVE_STANDBY));
112       state = PASSIVE_STANDBY;
113       info("Moved to " + state, true);
114     } else {
115       info("Already in " + state);
116     }
117   }
118
119   private synchronized void moveToActiveState() {
120     if (state == START_STATE || state == PASSIVE_STANDBY) {
121       // TODO :: If state == START_STATE publish cluster ID
122
StateChangedEvent event = new StateChangedEvent(state, ACTIVE_COORDINATOR);
123       state = ACTIVE_COORDINATOR;
124       this.activeNode = getLocalNodeID();
125       info("Becoming " + state, true);
126       electionMgr.declareWinner(this.activeNode);
127       stateChangeSink.add(event);
128     } else {
129       throw new AssertionError JavaDoc("Cant move to " + ACTIVE_COORDINATOR + " from " + state);
130     }
131   }
132
133   public synchronized boolean isActiveCoordinator() {
134     return (state == ACTIVE_COORDINATOR);
135   }
136
137   public void moveNodeToPassiveStandby(NodeID nodeID) {
138     logger.info("Requesting node " + nodeID + " to move to " + PASSIVE_STANDBY);
139     GroupMessage msg = L2StateMessageFactory.createMoveToPassiveStandbyMessage(EnrollmentFactory
140         .createTrumpEnrollment(getLocalNodeID()));
141     try {
142       this.groupManager.sendTo(nodeID, msg);
143     } catch (GroupException e) {
144       logger.error(e);
145     }
146   }
147
148   /**
149    * Message Listener Interface, TODO::move to a stage
150    */

151   public synchronized void messageReceived(NodeID fromNode, GroupMessage msg) {
152     if (!(msg instanceof L2StateMessage)) { throw new AssertionError JavaDoc("StateManagerImpl : Received wrong message type :"
153                                                                      + msg); }
154     L2StateMessage clusterMsg = (L2StateMessage) msg;
155     handleClusterStateMessage(clusterMsg);
156   }
157
158   private void handleClusterStateMessage(L2StateMessage clusterMsg) {
159     try {
160       switch (clusterMsg.getType()) {
161         case L2StateMessage.START_ELECTION:
162           handleStartElectionRequest(clusterMsg);
163           break;
164         case L2StateMessage.ABORT_ELECTION:
165           handleElectionAbort(clusterMsg);
166           break;
167         case L2StateMessage.ELECTION_RESULT:
168           handleElectionResultMessage(clusterMsg);
169           break;
170         case L2StateMessage.ELECTION_WON:
171           handleElectionWonMessage(clusterMsg);
172           break;
173         case L2StateMessage.MOVE_TO_PASSIVE_STANDBY:
174           handleMoveToPassiveStandbyMessage(clusterMsg);
175           break;
176         default:
177           throw new AssertionError JavaDoc("This message shouldn't have been routed here : " + clusterMsg);
178       }
179     } catch (GroupException ge) {
180       logger.error("Caught Exception while handling Message : " + clusterMsg, ge);
181       throw new AssertionError JavaDoc(ge);
182
183     }
184   }
185
186   private void handleMoveToPassiveStandbyMessage(L2StateMessage clusterMsg) {
187     moveToPassiveStandbyState();
188   }
189
190   private void handleElectionWonMessage(L2StateMessage clusterMsg) {
191     if (state == ACTIVE_COORDINATOR) {
192       // Cant get Election Won from another node : Split brain
193
// TODO:: Add some reconcile path
194
logger.error(state + " Received Election Won Msg : " + clusterMsg + ". Possible split brain detected ");
195       throw new AssertionError JavaDoc(state + " Received Election Won Msg : " + clusterMsg
196                                + ". Possible split brain detected ");
197     }
198     Enrollment winningEnrollment = clusterMsg.getEnrollment();
199     this.activeNode = winningEnrollment.getNodeID();
200     moveToPassiveState(winningEnrollment.isANewCandidate());
201   }
202
203   private void handleElectionResultMessage(L2StateMessage msg) throws GroupException {
204     if (activeNode.equals(msg.getEnrollment().getNodeID())) {
205       Assert.assertFalse(NodeID.NULL_ID.equals(activeNode));
206       // This wouldnt normally happen, but we agree - so ack
207
GroupMessage resultAgreed = L2StateMessageFactory.createResultAgreedMessage(msg, msg.getEnrollment());
208       logger.info("Agreed with Election Result from " + msg.messageFrom() + " : " + resultAgreed);
209       groupManager.sendTo(msg.messageFrom(), resultAgreed);
210     } else if (state == ACTIVE_COORDINATOR || !activeNode.isNull()) {
211       // This shouldn't happen normally, but is possible when there is some weird network error where A sees B,
212
// B sees A/C and C sees B and A is active and C is trying to run election
213
// Force other node to rerun election so that we can abort
214
GroupMessage resultConflict = L2StateMessageFactory.createResultConflictMessage(msg, EnrollmentFactory
215           .createTrumpEnrollment(getLocalNodeID()));
216       warn("WARNING :: Active Node = " + activeNode + " , " + state
217            + " received ELECTION_RESULT message from another node : " + msg + " : Forcing re-election "
218            + resultConflict);
219       groupManager.sendTo(msg.messageFrom(), resultConflict);
220     } else {
221       electionMgr.handleElectionResultMessage(msg);
222     }
223   }
224
225   private void handleElectionAbort(L2StateMessage clusterMsg) {
226     if (state == ACTIVE_COORDINATOR) {
227       // Cant get Abort back to ACTIVE, if so then there is a split brain
228
logger.error(state + " Received Abort Election Msg : Possible split brain detected ");
229       throw new AssertionError JavaDoc(state + " Received Abort Election Msg : Possible split brain detected ");
230     }
231     electionMgr.handleElectionAbort(clusterMsg);
232   }
233
234   private void handleStartElectionRequest(L2StateMessage msg) throws GroupException {
235     if (state == ACTIVE_COORDINATOR) {
236       // This is either a new L2 joining a cluster or a renegade L2. Force it to abort
237
GroupMessage abortMsg = L2StateMessageFactory.createAbortElectionMessage(msg, EnrollmentFactory
238           .createTrumpEnrollment(getLocalNodeID()));
239       info("Forcing Abort Election for " + msg + " with " + abortMsg);
240       groupManager.sendTo(msg.messageFrom(), abortMsg);
241     } else {
242       electionMgr.handleStartElectionRequest(msg);
243     }
244   }
245
246   // TODO:: Make it a handler on a stage
247
public synchronized void nodeJoined(NodeID nodeID) {
248     info("Node : " + nodeID + " joined the cluster", true);
249     if (state == ACTIVE_COORDINATOR) {
250       // notify new node
251
GroupMessage msg = L2StateMessageFactory.createElectionWonMessage(EnrollmentFactory
252           .createTrumpEnrollment(getLocalNodeID()));
253       try {
254         groupManager.sendTo(nodeID, msg);
255       } catch (GroupException e) {
256         throw new AssertionError JavaDoc(e);
257       }
258     }
259   }
260
261   // TODO:: Make it a handler on a stage
262
public void nodeLeft(NodeID nodeID) {
263     warn("Node : " + nodeID + " left the cluster", true);
264     boolean elect = false;
265     synchronized (this) {
266       if (state != PASSIVE_UNINTIALIZED && state != ACTIVE_COORDINATOR
267           && (activeNode.isNull() || activeNode.equals(nodeID))) {
268         elect = true;
269         activeNode = NodeID.NULL_ID;
270       }
271     }
272     if (elect) {
273       info("Starting Election to determine cluser wide ACTIVE L2");
274       startElection();
275     }
276   }
277
278   private void info(String JavaDoc message) {
279     info(message, false);
280   }
281
282   private void info(String JavaDoc message, boolean console) {
283     logger.info(message);
284     if (console) {
285       consoleLogger.info(message);
286     }
287   }
288
289   private void warn(String JavaDoc message) {
290     warn(message, false);
291   }
292
293   private void warn(String JavaDoc message, boolean console) {
294     logger.warn(message);
295     if (console) {
296       consoleLogger.warn(message);
297     }
298   }
299 }
300
Popular Tags