KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > handler > ProcessTransactionHandler


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.handler;
6
7 import com.tc.async.api.AbstractEventHandler;
8 import com.tc.async.api.ConfigurationContext;
9 import com.tc.async.api.EventContext;
10 import com.tc.async.api.Sink;
11 import com.tc.async.api.Stage;
12 import com.tc.l2.context.IncomingTransactionContext;
13 import com.tc.l2.objectserver.ReplicatedObjectManager;
14 import com.tc.logging.TCLogger;
15 import com.tc.logging.TCLogging;
16 import com.tc.net.protocol.tcm.ChannelID;
17 import com.tc.net.protocol.tcm.MessageChannel;
18 import com.tc.object.msg.CommitTransactionMessageImpl;
19 import com.tc.object.msg.MessageRecycler;
20 import com.tc.objectserver.core.api.ServerConfigurationContext;
21 import com.tc.objectserver.tx.ServerTransaction;
22 import com.tc.objectserver.tx.ServerTransactionManager;
23 import com.tc.objectserver.tx.TransactionBatchManager;
24 import com.tc.objectserver.tx.TransactionBatchReader;
25 import com.tc.objectserver.tx.TransactionBatchReaderFactory;
26 import com.tc.objectserver.tx.TransactionalObjectManager;
27 import com.tc.util.SequenceValidator;
28
29 import java.util.ArrayList JavaDoc;
30 import java.util.Collection JavaDoc;
31 import java.util.HashSet JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Set JavaDoc;
34
35 public class ProcessTransactionHandler extends AbstractEventHandler {
36   private static final TCLogger logger = TCLogging.getLogger(ProcessTransactionHandler.class);
37
38   private TransactionBatchReaderFactory batchReaderFactory;
39   private ReplicatedObjectManager replicatedObjectMgr;
40
41   private final TransactionBatchManager transactionBatchManager;
42   private final MessageRecycler messageRecycler;
43   private final SequenceValidator sequenceValidator;
44   private final TransactionalObjectManager txnObjectManager;
45
46   private Sink txnRelaySink;
47
48   private ServerTransactionManager transactionManager;
49
50   public ProcessTransactionHandler(TransactionBatchManager transactionBatchManager,
51                                    TransactionalObjectManager txnObjectManager, SequenceValidator sequenceValidator,
52                                    MessageRecycler messageRecycler) {
53     this.transactionBatchManager = transactionBatchManager;
54     this.txnObjectManager = txnObjectManager;
55     this.sequenceValidator = sequenceValidator;
56     this.messageRecycler = messageRecycler;
57   }
58
59   public void handleEvent(EventContext context) {
60     final CommitTransactionMessageImpl ctm = (CommitTransactionMessageImpl) context;
61     try {
62       final TransactionBatchReader reader = batchReaderFactory.newTransactionBatchReader(ctm);
63       transactionBatchManager.defineBatch(reader.getChannelID(), reader.getBatchID(), reader.getNumTxns());
64       Collection JavaDoc completedTxnIds = reader.addAcknowledgedTransactionIDsTo(new HashSet JavaDoc());
65       ServerTransaction txn;
66
67       List JavaDoc txns = new ArrayList JavaDoc(reader.getNumTxns());
68       Set JavaDoc serverTxnIDs = new HashSet JavaDoc(reader.getNumTxns());
69       ChannelID channelID = reader.getChannelID();
70       while ((txn = reader.getNextTransaction()) != null) {
71         sequenceValidator.setCurrent(channelID, txn.getClientSequenceID());
72         txns.add(txn);
73         serverTxnIDs.add(txn.getServerTransactionID());
74       }
75       messageRecycler.addMessage(ctm, serverTxnIDs);
76       if (replicatedObjectMgr.relayTransactions()) {
77         // TODO:: change it to become event based
78
transactionManager.incomingTransactions(channelID, serverTxnIDs, true);
79         txnRelaySink.add(new IncomingTransactionContext(channelID, ctm, txns, serverTxnIDs));
80       } else {
81         transactionManager.incomingTransactions(channelID, serverTxnIDs, false);
82       }
83       txnObjectManager.addTransactions(channelID, txns, completedTxnIds);
84     } catch (Exception JavaDoc e) {
85       logger.error("Error reading transaction batch. : ", e);
86       MessageChannel c = ctm.getChannel();
87       logger.error("Closing channel " + c.getChannelID() + " due to previous errors !");
88       c.close();
89     }
90   }
91
92   public void initialize(ConfigurationContext context) {
93     super.initialize(context);
94     ServerConfigurationContext oscc = (ServerConfigurationContext) context;
95     batchReaderFactory = oscc.getTransactionBatchReaderFactory();
96     transactionManager = oscc.getTransactionManager();
97     replicatedObjectMgr = oscc.getL2Coordinator().getReplicatedObjectManager();
98     Stage relayStage = oscc.getStage(ServerConfigurationContext.TRANSACTION_RELAY_STAGE);
99     if (relayStage != null) {
100       txnRelaySink = relayStage.getSink();
101     }
102   }
103 }
104
Popular Tags