1 5 package com.tc.l2.objectserver; 6 7 import com.tc.async.api.Sink; 8 import com.tc.l2.context.SyncObjectsRequest; 9 import com.tc.l2.msg.ObjectListSyncMessage; 10 import com.tc.l2.msg.ObjectListSyncMessageFactory; 11 import com.tc.l2.state.StateManager; 12 import com.tc.logging.TCLogger; 13 import com.tc.logging.TCLogging; 14 import com.tc.net.groups.GroupException; 15 import com.tc.net.groups.GroupManager; 16 import com.tc.net.groups.GroupMessage; 17 import com.tc.net.groups.GroupMessageListener; 18 import com.tc.net.groups.GroupResponse; 19 import com.tc.net.groups.NodeID; 20 import com.tc.objectserver.api.ObjectManager; 21 import com.tc.util.Assert; 22 23 import java.util.Iterator ; 24 import java.util.Set ; 25 26 public class ReplicatedObjectManagerImpl implements ReplicatedObjectManager, GroupMessageListener { 27 28 private static final TCLogger logger = TCLogging.getLogger(ReplicatedObjectManagerImpl.class); 29 30 private final ObjectManager objectManager; 31 private final GroupManager groupManager; 32 private final StateManager stateManager; 33 private final L2ObjectStateManager l2ObjectStateManager; 34 private final Sink objectsSyncSink; 35 36 public ReplicatedObjectManagerImpl(GroupManager groupManager, StateManager stateManager, 37 L2ObjectStateManager l2ObjectStateManager, ObjectManager objectManager, 38 Sink objectsSyncSink) { 39 this.groupManager = groupManager; 40 this.stateManager = stateManager; 41 this.objectManager = objectManager; 42 this.objectsSyncSink = objectsSyncSink; 43 this.l2ObjectStateManager = l2ObjectStateManager; 44 this.groupManager.registerForMessages(ObjectListSyncMessage.class, this); 45 } 46 47 51 public void sync() { 52 try { 53 GroupResponse gr = groupManager.sendAllAndWaitForResponse(ObjectListSyncMessageFactory 54 .createObjectListSyncRequestMessage()); 55 for (Iterator i = gr.getResponses().iterator(); i.hasNext();) { 56 ObjectListSyncMessage msg = (ObjectListSyncMessage) i.next(); 57 add2L2StateManager(msg.messageFrom(), msg.getObjectIDs()); 58 } 59 } catch (GroupException e) { 60 logger.error(e); 61 throw new AssertionError (e); 62 } 63 } 64 65 public void query(NodeID nodeID) { 67 try { 68 groupManager.sendTo(nodeID, ObjectListSyncMessageFactory.createObjectListSyncRequestMessage()); 69 } catch (GroupException e) { 70 logger.error("Error Writting Msg : ", e); 71 } 72 } 73 74 public void messageReceived(NodeID fromNode, GroupMessage msg) { 76 if (msg instanceof ObjectListSyncMessage) { 77 ObjectListSyncMessage clusterMsg = (ObjectListSyncMessage) msg; 78 handleClusterObjectMessage(fromNode, clusterMsg); 79 } else { 80 throw new AssertionError ("ReplicatedObjectManagerImpl : Received wrong message type :" + msg.getClass().getName() 81 + " : " + msg); 82 83 } 84 } 85 86 private void handleClusterObjectMessage(NodeID nodeID, ObjectListSyncMessage clusterMsg) { 87 try { 88 switch (clusterMsg.getType()) { 89 case ObjectListSyncMessage.REQUEST: 90 handleObjectListRequest(nodeID, clusterMsg); 91 break; 92 case ObjectListSyncMessage.RESPONSE: 93 handleObjectListResponse(nodeID, clusterMsg); 94 break; 95 96 default: 97 throw new AssertionError ("This message shouldn't have been routed here : " + clusterMsg); 98 } 99 } catch (GroupException e) { 100 logger.error("Error handling message : " + clusterMsg, e); 101 throw new AssertionError (e); 102 } 103 } 104 105 private void handleObjectListResponse(NodeID nodeID, ObjectListSyncMessage clusterMsg) { 106 Assert.assertTrue(stateManager.isActiveCoordinator()); 107 Set oids = clusterMsg.getObjectIDs(); 108 if (!oids.isEmpty()) { 109 logger.error("Nodes joining the cluster after startup shouldnt have any Objects. " + nodeID + " contains " 110 + oids.size() + " Objects !!!"); 111 logger.error("Forcing node to Quit !!"); 112 groupManager.zapNode(nodeID); 113 } else { 114 add2L2StateManager(nodeID, oids); 115 } 116 } 117 118 private void add2L2StateManager(NodeID nodeID, Set oids) { 119 int missing = l2ObjectStateManager.addL2WithObjectIDs(nodeID, oids, objectManager); 120 if (missing == 0) { 121 stateManager.moveNodeToPassiveStandby(nodeID); 122 } else { 123 objectsSyncSink.add(new SyncObjectsRequest(nodeID)); 124 } 125 } 126 127 private void handleObjectListRequest(NodeID nodeID, ObjectListSyncMessage clusterMsg) throws GroupException { 128 Assert.assertFalse(stateManager.isActiveCoordinator()); 129 Set knownIDs = objectManager.getAllObjectIDs(); 130 groupManager.sendTo(nodeID, ObjectListSyncMessageFactory.createObjectListSyncResponseMessage(clusterMsg, knownIDs)); 131 } 132 133 136 public boolean relayTransactions() { 137 return true; 138 } 139 } 140 | Popular Tags |