1 5 package com.tc.l2.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Sink; 11 import com.tc.l2.api.L2Coordinator; 12 import com.tc.l2.context.ManagedObjectSyncContext; 13 import com.tc.l2.context.SyncObjectsRequest; 14 import com.tc.l2.msg.ObjectSyncMessage; 15 import com.tc.l2.msg.ObjectSyncMessageFactory; 16 import com.tc.l2.msg.ServerTxnAckMessage; 17 import com.tc.l2.objectserver.L2ObjectStateManager; 18 import com.tc.l2.state.StateManager; 19 import com.tc.logging.TCLogger; 20 import com.tc.logging.TCLogging; 21 import com.tc.net.groups.GroupException; 22 import com.tc.net.groups.GroupManager; 23 import com.tc.objectserver.core.api.ServerConfigurationContext; 24 25 public class L2ObjectSyncSendHandler extends AbstractEventHandler { 26 27 private static final TCLogger logger = TCLogging.getLogger(L2ObjectSyncSendHandler.class); 28 29 private final L2ObjectStateManager objectStateManager; 30 private GroupManager groupManager; 31 private StateManager stateManager; 32 33 private Sink syncRequestSink; 34 35 public L2ObjectSyncSendHandler(L2ObjectStateManager objectStateManager) { 36 this.objectStateManager = objectStateManager; 37 } 38 39 public void handleEvent(EventContext context) { 40 if (context instanceof ManagedObjectSyncContext) { 41 ManagedObjectSyncContext mosc = (ManagedObjectSyncContext) context; 42 if (sendObjects(mosc)) { 43 if (mosc.hasMore()) { 44 syncRequestSink.add(new SyncObjectsRequest(mosc.getNodeID())); 45 } else { 46 stateManager.moveNodeToPassiveStandby(mosc.getNodeID()); 47 } 48 } 49 } else if(context instanceof ServerTxnAckMessage) { 50 ServerTxnAckMessage txnMsg = (ServerTxnAckMessage) context; 51 sendAcks(txnMsg); 52 } else { 53 throw new AssertionError ("Unknown context type : " + context.getClass().getName() + " : " + context); 54 } 55 } 56 57 private void sendAcks(ServerTxnAckMessage ackMsg) { 58 try { 59 this.groupManager.sendTo(ackMsg.getDestinationID(), ackMsg); 60 } catch (GroupException e) { 61 logger.error("ERROR sending ACKS: Caught exception while sending message to ACTIVE", e); 62 } 64 } 65 66 private boolean sendObjects(ManagedObjectSyncContext mosc) { 67 objectStateManager.close(mosc); 68 ObjectSyncMessage msg = ObjectSyncMessageFactory.createObjectSyncMessageFrom(mosc); 69 try { 70 this.groupManager.sendTo(mosc.getNodeID(), msg); 71 logger.info("Sent " + mosc.getDNACount() + " objects to " + mosc.getNodeID() + " roots = " 72 + mosc.getRootsMap().size()); 73 return true; 74 } catch (GroupException e) { 75 logger.error("Removing " + mosc.getNodeID() + " from group because of Exception :", e); 76 groupManager.zapNode(mosc.getNodeID()); 77 return false; 78 } 79 } 80 81 public void initialize(ConfigurationContext context) { 82 super.initialize(context); 83 ServerConfigurationContext oscc = (ServerConfigurationContext) context; 84 L2Coordinator l2Coordinator = oscc.getL2Coordinator(); 85 this.groupManager = l2Coordinator.getGroupManager(); 86 this.stateManager = l2Coordinator.getStateManager(); 87 this.syncRequestSink = oscc.getStage(ServerConfigurationContext.OBJECTS_SYNC_STAGE).getSink(); 88 } 89 90 } 91 | Popular Tags |