1 5 package com.tc.l2.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Sink; 11 import com.tc.l2.context.ManagedObjectSyncContext; 12 import com.tc.l2.context.SyncObjectsRequest; 13 import com.tc.l2.msg.ObjectSyncMessage; 14 import com.tc.l2.msg.RelayedCommitTransactionMessage; 15 import com.tc.l2.msg.ServerTxnAckMessage; 16 import com.tc.l2.msg.ServerTxnAckMessageFactory; 17 import com.tc.l2.objectserver.L2ObjectStateManager; 18 import com.tc.l2.objectserver.ServerTransactionFactory; 19 import com.tc.net.groups.NodeID; 20 import com.tc.net.protocol.tcm.ChannelID; 21 import com.tc.objectserver.api.ObjectManager; 22 import com.tc.objectserver.core.api.ServerConfigurationContext; 23 import com.tc.objectserver.tx.ServerTransaction; 24 import com.tc.objectserver.tx.TransactionBatchReader; 25 import com.tc.objectserver.tx.TransactionBatchReaderFactory; 26 import com.tc.objectserver.tx.TransactionalObjectManager; 27 28 import java.io.IOException ; 29 import java.util.ArrayList ; 30 import java.util.Collections ; 31 import java.util.HashSet ; 32 import java.util.List ; 33 import java.util.Set ; 34 35 public class L2ObjectSyncHandler extends AbstractEventHandler { 36 37 private final L2ObjectStateManager l2ObjectStateMgr; 38 private ObjectManager objectManager; 39 private TransactionalObjectManager txnObjectMgr; 40 private TransactionBatchReaderFactory batchReaderFactory; 41 42 private Sink dehydrateSink; 43 private Sink sendSink; 44 45 public L2ObjectSyncHandler(L2ObjectStateManager l2StateManager) { 46 l2ObjectStateMgr = l2StateManager; 47 } 48 49 public void handleEvent(EventContext context) { 50 if (context instanceof SyncObjectsRequest) { 51 SyncObjectsRequest request = (SyncObjectsRequest) context; 52 doSyncObjectsRequest(request); 53 } else if (context instanceof ObjectSyncMessage) { 54 ObjectSyncMessage syncMsg = (ObjectSyncMessage) context; 55 doSyncObjectsResponse(syncMsg); 56 } else if (context instanceof RelayedCommitTransactionMessage) { 57 RelayedCommitTransactionMessage commitMessage = (RelayedCommitTransactionMessage) context; 58 Set serverTxnIDs = processCommitTransactionMessage(commitMessage); 59 ackTransactions(commitMessage, serverTxnIDs); 60 } else { 61 throw new AssertionError ("Unknown context type : " + context.getClass().getName() + " : " + context); 62 } 63 } 64 65 private void ackTransactions(RelayedCommitTransactionMessage commitMessage, Set serverTxnIDs) { 67 ServerTxnAckMessage msg = ServerTxnAckMessageFactory.createServerTxnAckMessage(commitMessage, serverTxnIDs); 68 sendSink.add(msg); 69 } 70 71 private Set processCommitTransactionMessage(RelayedCommitTransactionMessage commitMessage) { 73 try { 74 final TransactionBatchReader reader = batchReaderFactory.newTransactionBatchReader(commitMessage); 75 ServerTransaction txn; 76 List txns = new ArrayList (reader.getNumTxns()); 77 Set serverTxnIDs = new HashSet (reader.getNumTxns()); 78 while ((txn = reader.getNextTransaction()) != null) { 79 txns.add(txn); 80 serverTxnIDs.add(txn.getServerTransactionID()); 81 } 82 txnObjectMgr.addTransactions(ChannelID.NULL_ID, txns, Collections.EMPTY_LIST); 84 return serverTxnIDs; 85 } catch (IOException e) { 86 throw new AssertionError (e); 87 } 88 } 89 90 private void doSyncObjectsResponse(ObjectSyncMessage syncMsg) { 91 ArrayList txns = new ArrayList (1); 92 ServerTransaction txn = ServerTransactionFactory.createTxnFrom(syncMsg); 93 txns.add(txn); 94 txnObjectMgr.addTransactions(ChannelID.NULL_ID, txns, Collections.EMPTY_LIST); 96 } 97 98 private void doSyncObjectsRequest(SyncObjectsRequest request) { 100 NodeID nodeID = request.getNodeID(); 101 ManagedObjectSyncContext lookupContext = l2ObjectStateMgr.getSomeObjectsToSyncContext(nodeID, 500, dehydrateSink); 102 if (lookupContext != null) { 104 objectManager.lookupObjectsAndSubObjectsFor(ChannelID.NULL_ID, lookupContext, -1); 105 } 106 } 107 108 public void initialize(ConfigurationContext context) { 109 super.initialize(context); 110 ServerConfigurationContext oscc = (ServerConfigurationContext) context; 111 this.batchReaderFactory = oscc.getTransactionBatchReaderFactory(); 112 this.objectManager = oscc.getObjectManager(); 113 this.txnObjectMgr = oscc.getTransactionalObjectManager(); 114 this.dehydrateSink = oscc.getStage(ServerConfigurationContext.OBJECTS_SYNC_DEHYDRATE_STAGE).getSink(); 115 this.sendSink = oscc.getStage(ServerConfigurationContext.OBJECTS_SYNC_SEND_STAGE).getSink(); 116 } 117 118 } 119 | Popular Tags |