1 5 package com.tc.l2.ha; 6 7 import com.tc.l2.api.ReplicatedClusterStateManager; 8 import com.tc.l2.msg.ClusterStateMessage; 9 import com.tc.l2.msg.ClusterStateMessageFactory; 10 import com.tc.l2.state.StateManager; 11 import com.tc.logging.TCLogger; 12 import com.tc.logging.TCLogging; 13 import com.tc.net.groups.GroupException; 14 import com.tc.net.groups.GroupManager; 15 import com.tc.net.groups.GroupMessage; 16 import com.tc.net.groups.GroupMessageListener; 17 import com.tc.net.groups.GroupResponse; 18 import com.tc.net.groups.NodeID; 19 import com.tc.net.protocol.transport.ConnectionID; 20 import com.tc.net.protocol.transport.ConnectionIDFactory; 21 import com.tc.net.protocol.transport.ConnectionIDFactoryListener; 22 import com.tc.util.Assert; 23 import com.tc.util.UUID; 24 25 import java.util.Iterator ; 26 27 public class ReplicatedClusterStateManagerImpl implements ReplicatedClusterStateManager, GroupMessageListener, 28 ConnectionIDFactoryListener { 29 30 private static final TCLogger logger = TCLogging.getLogger(ReplicatedClusterStateManagerImpl.class); 31 32 private final GroupManager groupManager; 33 private final ClusterState state; 34 35 private final StateManager stateManager; 36 37 public ReplicatedClusterStateManagerImpl(GroupManager groupManager, StateManager stateManager, 38 ClusterState clusterState, ConnectionIDFactory factory) { 39 this.groupManager = groupManager; 40 this.stateManager = stateManager; 41 state = clusterState; 42 groupManager.registerForMessages(ClusterStateMessage.class, this); 43 factory.registerForConnectionIDEvents(this); 44 } 45 46 public synchronized void goActiveAndSyncState() { 47 generateClusterIDIfNeeded(); 48 49 state.syncInternal(); 51 52 publishToAll(ClusterStateMessageFactory.createClusterStateMessage(state)); 54 } 55 56 private void generateClusterIDIfNeeded() { 57 if (state.getClusterID() == null) { 58 state.setClusterID(UUID.getUUID().toString()); 60 } 61 } 62 63 public synchronized void publishClusterState(NodeID nodeID) { 64 65 try { 66 ClusterStateMessage msg = (ClusterStateMessage) groupManager 67 .sendToAndWaitForResponse(nodeID, ClusterStateMessageFactory.createClusterStateMessage(state)); 68 validateResponse(nodeID, msg); 69 } catch (GroupException e) { 70 logger.error("Error plublishing cluster state : " + nodeID + " terminating it"); 71 groupManager.zapNode(nodeID); 72 } 73 74 } 75 76 private void validateResponse(NodeID nodeID, ClusterStateMessage msg) { 77 if (msg == null || msg.getType() != ClusterStateMessage.OPERATION_SUCCESS) { 78 logger.error("Recd wrong response from : " + nodeID + " : msg = " + msg 79 + " while publishing Next Available ObjectID: Killing the node"); 80 groupManager.zapNode(nodeID); 81 } 82 } 83 84 public synchronized void publishNextAvailableObjectID(long maxID) { 86 state.setNextAvailableObjectID(maxID); 87 publishToAll(ClusterStateMessageFactory.createNextAvailableObjectIDMessage(state)); 88 } 89 90 public synchronized void connectionIDCreated(ConnectionID connectionID) { 91 Assert.assertTrue(stateManager.isActiveCoordinator()); 92 state.addNewConnection(connectionID); 93 publishToAll(ClusterStateMessageFactory.createNewConnectionCreatedMessage(connectionID)); 94 } 95 96 public synchronized void connectionIDDestroyed(ConnectionID connectionID) { 97 Assert.assertTrue(stateManager.isActiveCoordinator()); 98 state.removeConnection(connectionID); 99 publishToAll(ClusterStateMessageFactory.createConnectionDestroyedMessage(connectionID)); 100 } 101 102 private void publishToAll(GroupMessage message) { 103 try { 104 GroupResponse gr = groupManager.sendAllAndWaitForResponse(message); 105 for (Iterator i = gr.getResponses().iterator(); i.hasNext();) { 106 ClusterStateMessage msg = (ClusterStateMessage) i.next(); 107 validateResponse(msg.messageFrom(), msg); 108 } 109 } catch (GroupException e) { 110 throw new AssertionError (e); 112 } 113 } 114 115 public void messageReceived(NodeID fromNode, GroupMessage msg) { 116 if (msg instanceof ClusterStateMessage) { 117 ClusterStateMessage clusterMsg = (ClusterStateMessage) msg; 118 handleClusterStateMessage(fromNode, clusterMsg); 119 } else { 120 throw new AssertionError ("ReplicatedClusterStateManagerImpl : Received wrong message type :" 121 + msg.getClass().getName() + " : " + msg); 122 } 123 } 124 125 private void handleClusterStateMessage(NodeID fromNode, ClusterStateMessage msg) { 126 if (stateManager.isActiveCoordinator()) { 127 logger.warn("Recd ClusterStateMessage from " + fromNode 128 + " while I am the cluster co-ordinator. This is bad. Ignoring the message"); 129 return; 130 } 131 msg.initState(state); 132 sendOKResponse(fromNode, msg); 133 } 134 135 private void sendOKResponse(NodeID fromNode, ClusterStateMessage msg) { 136 try { 137 groupManager.sendTo(fromNode, ClusterStateMessageFactory.createOKResponse(msg)); 138 } catch (GroupException e) { 139 logger.error("Error handling message : " + msg, e); 140 } 141 } 142 143 } 144 | Popular Tags |