1 5 package com.tc.l2.ha; 6 7 import com.tc.async.api.Sink; 8 import com.tc.async.api.StageManager; 9 import com.tc.l2.api.L2Coordinator; 10 import com.tc.l2.api.ReplicatedClusterStateManager; 11 import com.tc.l2.context.StateChangedEvent; 12 import com.tc.l2.handler.L2ObjectSyncDehydrateHandler; 13 import com.tc.l2.handler.L2ObjectSyncHandler; 14 import com.tc.l2.handler.L2ObjectSyncSendHandler; 15 import com.tc.l2.handler.L2StateChangeHandler; 16 import com.tc.l2.handler.ServerTransactionAckHandler; 17 import com.tc.l2.handler.TransactionRelayHandler; 18 import com.tc.l2.msg.ObjectSyncMessage; 19 import com.tc.l2.msg.RelayedCommitTransactionMessage; 20 import com.tc.l2.msg.ServerTxnAckMessage; 21 import com.tc.l2.objectserver.L2ObjectStateManager; 22 import com.tc.l2.objectserver.L2ObjectStateManagerImpl; 23 import com.tc.l2.objectserver.ReplicatedObjectManager; 24 import com.tc.l2.objectserver.ReplicatedObjectManagerImpl; 25 import com.tc.l2.state.StateChangeListener; 26 import com.tc.l2.state.StateManager; 27 import com.tc.l2.state.StateManagerImpl; 28 import com.tc.logging.TCLogger; 29 import com.tc.logging.TCLogging; 30 import com.tc.net.groups.GroupEventsListener; 31 import com.tc.net.groups.GroupException; 32 import com.tc.net.groups.GroupManager; 33 import com.tc.net.groups.GroupManagerFactory; 34 import com.tc.net.groups.NodeID; 35 import com.tc.objectserver.api.ObjectManager; 36 import com.tc.objectserver.core.api.ServerConfigurationContext; 37 import com.tc.objectserver.impl.DistributedObjectServer; 38 import com.tc.objectserver.persistence.api.PersistentMapStore; 39 40 import java.io.IOException ; 41 42 public class L2HACoordinator implements L2Coordinator, StateChangeListener, GroupEventsListener { 43 44 private static final TCLogger logger = TCLogging.getLogger(L2HACoordinator.class); 45 46 private final TCLogger consoleLogger; 47 private final DistributedObjectServer server; 48 49 private GroupManager groupManager; 50 private StateManager stateManager; 51 private ReplicatedObjectManager rObjectManager; 52 private L2ObjectStateManager l2ObjectStateManager; 53 private ReplicatedClusterStateManager rClusterStateMgr; 54 55 private ClusterState clusterState; 56 57 public L2HACoordinator(TCLogger consoleLogger, DistributedObjectServer server, StageManager stageManager, 58 PersistentMapStore clusterStateStore, ObjectManager objectManager) { 59 this.consoleLogger = consoleLogger; 60 this.server = server; 61 init(stageManager, clusterStateStore, objectManager); 62 } 63 64 private void init(StageManager stageManager, PersistentMapStore clusterStateStore, ObjectManager objectManager) { 65 try { 66 basicInit(stageManager, clusterStateStore, objectManager); 67 } catch (GroupException e) { 68 logger.error(e); 69 throw new AssertionError (e); 70 } 71 } 72 73 private void basicInit(StageManager stageManager, PersistentMapStore clusterStateStore, ObjectManager objectManager) 74 throws GroupException { 75 76 this.clusterState = new ClusterState(clusterStateStore, server.getManagedObjectStore(), server 77 .getConnectionIdFactory()); 78 79 final Sink stateChangeSink = stageManager.createStage(ServerConfigurationContext.L2_STATE_CHANGE_STAGE, 80 new L2StateChangeHandler(), 1, Integer.MAX_VALUE).getSink(); 81 this.groupManager = GroupManagerFactory.createGroupManager(); 82 this.stateManager = new StateManagerImpl(consoleLogger, groupManager, stateChangeSink); 83 this.stateManager.registerForStateChangeEvents(this); 84 85 this.l2ObjectStateManager = new L2ObjectStateManagerImpl(); 86 87 final Sink objectsSyncSink = stageManager.createStage(ServerConfigurationContext.OBJECTS_SYNC_STAGE, 88 new L2ObjectSyncHandler(this.l2ObjectStateManager), 1, 89 Integer.MAX_VALUE).getSink(); 90 stageManager.createStage(ServerConfigurationContext.OBJECTS_SYNC_DEHYDRATE_STAGE, 91 new L2ObjectSyncDehydrateHandler(), 1, Integer.MAX_VALUE); 92 stageManager.createStage(ServerConfigurationContext.OBJECTS_SYNC_SEND_STAGE, 93 new L2ObjectSyncSendHandler(this.l2ObjectStateManager), 1, Integer.MAX_VALUE); 94 stageManager.createStage(ServerConfigurationContext.TRANSACTION_RELAY_STAGE, 95 new TransactionRelayHandler(this.l2ObjectStateManager), 1, Integer.MAX_VALUE); 96 final Sink ackProcessingStage = stageManager 97 .createStage(ServerConfigurationContext.SERVER_TRANSACTION_ACK_PROCESSING_STAGE, 98 new ServerTransactionAckHandler(), 1, Integer.MAX_VALUE).getSink(); 99 this.rObjectManager = new ReplicatedObjectManagerImpl(groupManager, stateManager, l2ObjectStateManager, 100 objectManager, objectsSyncSink); 101 102 this.rClusterStateMgr = new ReplicatedClusterStateManagerImpl(groupManager, stateManager, clusterState, server 103 .getConnectionIdFactory()); 104 105 this.groupManager.routeMessages(ObjectSyncMessage.class, objectsSyncSink); 106 this.groupManager.routeMessages(RelayedCommitTransactionMessage.class, objectsSyncSink); 107 this.groupManager.routeMessages(ServerTxnAckMessage.class, ackProcessingStage); 108 109 this.groupManager.registerForGroupEvents(this); 110 } 111 112 public void start() { 113 NodeID myNodeId; 114 try { 115 myNodeId = groupManager.join(); 116 } catch (GroupException e) { 117 logger.error("Caught Exception :", e); 118 throw new AssertionError (e); 119 } 120 logger.info("This L2 Node ID = " + myNodeId); 121 stateManager.startElection(); 122 } 123 124 public StateManager getStateManager() { 125 return stateManager; 126 } 127 128 public ReplicatedClusterStateManager getReplicatedClusterStateManager() { 129 return rClusterStateMgr; 130 } 131 132 public ReplicatedObjectManager getReplicatedObjectManager() { 133 return rObjectManager; 134 } 135 136 public GroupManager getGroupManager() { 137 return groupManager; 138 } 139 140 public void l2StateChanged(StateChangedEvent sce) { 141 clusterState.setCurrentState(sce.getCurrentState()); 142 if (sce.movedToActive()) { 143 rClusterStateMgr.goActiveAndSyncState(); 144 rObjectManager.sync(); 145 try { 146 server.startActiveMode(); 147 } catch (IOException e) { 148 throw new AssertionError (e); 149 } 150 } else { 151 logger.info("Recd. " + sce + " ! Ignoring for now !!!!"); 153 } 154 } 155 156 public void nodeJoined(NodeID nodeID) { 157 if (stateManager.isActiveCoordinator()) { 158 rClusterStateMgr.publishClusterState(nodeID); 159 rObjectManager.query(nodeID); 160 } 161 } 162 163 public void nodeLeft(NodeID nodeID) { 164 if (stateManager.isActiveCoordinator()) { 165 l2ObjectStateManager.removeL2(nodeID); 166 } 167 } 168 169 } 170 | Popular Tags |