1 5 package com.tc.objectserver.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Sink; 11 import com.tc.async.api.Stage; 12 import com.tc.l2.context.IncomingTransactionContext; 13 import com.tc.l2.objectserver.ReplicatedObjectManager; 14 import com.tc.logging.TCLogger; 15 import com.tc.logging.TCLogging; 16 import com.tc.net.protocol.tcm.ChannelID; 17 import com.tc.net.protocol.tcm.MessageChannel; 18 import com.tc.object.msg.CommitTransactionMessageImpl; 19 import com.tc.object.msg.MessageRecycler; 20 import com.tc.objectserver.core.api.ServerConfigurationContext; 21 import com.tc.objectserver.tx.ServerTransaction; 22 import com.tc.objectserver.tx.ServerTransactionManager; 23 import com.tc.objectserver.tx.TransactionBatchManager; 24 import com.tc.objectserver.tx.TransactionBatchReader; 25 import com.tc.objectserver.tx.TransactionBatchReaderFactory; 26 import com.tc.objectserver.tx.TransactionalObjectManager; 27 import com.tc.util.SequenceValidator; 28 29 import java.util.ArrayList ; 30 import java.util.Collection ; 31 import java.util.HashSet ; 32 import java.util.List ; 33 import java.util.Set ; 34 35 public class ProcessTransactionHandler extends AbstractEventHandler { 36 private static final TCLogger logger = TCLogging.getLogger(ProcessTransactionHandler.class); 37 38 private TransactionBatchReaderFactory batchReaderFactory; 39 private ReplicatedObjectManager replicatedObjectMgr; 40 41 private final TransactionBatchManager transactionBatchManager; 42 private final MessageRecycler messageRecycler; 43 private final SequenceValidator sequenceValidator; 44 private final TransactionalObjectManager txnObjectManager; 45 46 private Sink txnRelaySink; 47 48 private ServerTransactionManager transactionManager; 49 50 public ProcessTransactionHandler(TransactionBatchManager transactionBatchManager, 51 TransactionalObjectManager txnObjectManager, SequenceValidator sequenceValidator, 52 MessageRecycler messageRecycler) { 53 this.transactionBatchManager = transactionBatchManager; 54 this.txnObjectManager = txnObjectManager; 55 this.sequenceValidator = sequenceValidator; 56 this.messageRecycler = messageRecycler; 57 } 58 59 public void handleEvent(EventContext context) { 60 final CommitTransactionMessageImpl ctm = (CommitTransactionMessageImpl) context; 61 try { 62 final TransactionBatchReader reader = batchReaderFactory.newTransactionBatchReader(ctm); 63 transactionBatchManager.defineBatch(reader.getChannelID(), reader.getBatchID(), reader.getNumTxns()); 64 Collection completedTxnIds = reader.addAcknowledgedTransactionIDsTo(new HashSet ()); 65 ServerTransaction txn; 66 67 List txns = new ArrayList (reader.getNumTxns()); 68 Set serverTxnIDs = new HashSet (reader.getNumTxns()); 69 ChannelID channelID = reader.getChannelID(); 70 while ((txn = reader.getNextTransaction()) != null) { 71 sequenceValidator.setCurrent(channelID, txn.getClientSequenceID()); 72 txns.add(txn); 73 serverTxnIDs.add(txn.getServerTransactionID()); 74 } 75 messageRecycler.addMessage(ctm, serverTxnIDs); 76 if (replicatedObjectMgr.relayTransactions()) { 77 transactionManager.incomingTransactions(channelID, serverTxnIDs, true); 79 txnRelaySink.add(new IncomingTransactionContext(channelID, ctm, txns, serverTxnIDs)); 80 } else { 81 transactionManager.incomingTransactions(channelID, serverTxnIDs, false); 82 } 83 txnObjectManager.addTransactions(channelID, txns, completedTxnIds); 84 } catch (Exception e) { 85 logger.error("Error reading transaction batch. : ", e); 86 MessageChannel c = ctm.getChannel(); 87 logger.error("Closing channel " + c.getChannelID() + " due to previous errors !"); 88 c.close(); 89 } 90 } 91 92 public void initialize(ConfigurationContext context) { 93 super.initialize(context); 94 ServerConfigurationContext oscc = (ServerConfigurationContext) context; 95 batchReaderFactory = oscc.getTransactionBatchReaderFactory(); 96 transactionManager = oscc.getTransactionManager(); 97 replicatedObjectMgr = oscc.getL2Coordinator().getReplicatedObjectManager(); 98 Stage relayStage = oscc.getStage(ServerConfigurationContext.TRANSACTION_RELAY_STAGE); 99 if (relayStage != null) { 100 txnRelaySink = relayStage.getSink(); 101 } 102 } 103 } 104 | Popular Tags |