KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > l2 > handler > L2ObjectSyncHandler


1 /*
2  * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.l2.handler;
6
7 import com.tc.async.api.AbstractEventHandler;
8 import com.tc.async.api.ConfigurationContext;
9 import com.tc.async.api.EventContext;
10 import com.tc.async.api.Sink;
11 import com.tc.l2.context.ManagedObjectSyncContext;
12 import com.tc.l2.context.SyncObjectsRequest;
13 import com.tc.l2.msg.ObjectSyncMessage;
14 import com.tc.l2.msg.RelayedCommitTransactionMessage;
15 import com.tc.l2.msg.ServerTxnAckMessage;
16 import com.tc.l2.msg.ServerTxnAckMessageFactory;
17 import com.tc.l2.objectserver.L2ObjectStateManager;
18 import com.tc.l2.objectserver.ServerTransactionFactory;
19 import com.tc.net.groups.NodeID;
20 import com.tc.net.protocol.tcm.ChannelID;
21 import com.tc.objectserver.api.ObjectManager;
22 import com.tc.objectserver.core.api.ServerConfigurationContext;
23 import com.tc.objectserver.tx.ServerTransaction;
24 import com.tc.objectserver.tx.TransactionBatchReader;
25 import com.tc.objectserver.tx.TransactionBatchReaderFactory;
26 import com.tc.objectserver.tx.TransactionalObjectManager;
27
28 import java.io.IOException JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Collections JavaDoc;
31 import java.util.HashSet JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Set JavaDoc;
34
35 public class L2ObjectSyncHandler extends AbstractEventHandler {
36
37   private final L2ObjectStateManager l2ObjectStateMgr;
38   private ObjectManager objectManager;
39   private TransactionalObjectManager txnObjectMgr;
40   private TransactionBatchReaderFactory batchReaderFactory;
41
42   private Sink dehydrateSink;
43   private Sink sendSink;
44
45   public L2ObjectSyncHandler(L2ObjectStateManager l2StateManager) {
46     l2ObjectStateMgr = l2StateManager;
47   }
48
49   public void handleEvent(EventContext context) {
50     if (context instanceof SyncObjectsRequest) {
51       SyncObjectsRequest request = (SyncObjectsRequest) context;
52       doSyncObjectsRequest(request);
53     } else if (context instanceof ObjectSyncMessage) {
54       ObjectSyncMessage syncMsg = (ObjectSyncMessage) context;
55       doSyncObjectsResponse(syncMsg);
56     } else if (context instanceof RelayedCommitTransactionMessage) {
57       RelayedCommitTransactionMessage commitMessage = (RelayedCommitTransactionMessage) context;
58       Set JavaDoc serverTxnIDs = processCommitTransactionMessage(commitMessage);
59       ackTransactions(commitMessage, serverTxnIDs);
60     } else {
61       throw new AssertionError JavaDoc("Unknown context type : " + context.getClass().getName() + " : " + context);
62     }
63   }
64
65   //TODO:: Implement throttling between active/passive
66
private void ackTransactions(RelayedCommitTransactionMessage commitMessage, Set JavaDoc serverTxnIDs) {
67     ServerTxnAckMessage msg = ServerTxnAckMessageFactory.createServerTxnAckMessage(commitMessage, serverTxnIDs);
68     sendSink.add(msg);
69   }
70
71   // TODO::recycle msg after use
72
private Set JavaDoc processCommitTransactionMessage(RelayedCommitTransactionMessage commitMessage) {
73     try {
74       final TransactionBatchReader reader = batchReaderFactory.newTransactionBatchReader(commitMessage);
75       ServerTransaction txn;
76       List JavaDoc txns = new ArrayList JavaDoc(reader.getNumTxns());
77       Set JavaDoc serverTxnIDs = new HashSet JavaDoc(reader.getNumTxns());
78       while ((txn = reader.getNextTransaction()) != null) {
79         txns.add(txn);
80         serverTxnIDs.add(txn.getServerTransactionID());
81       }
82       // TODO:: remove channelID.NULL_ID thingy
83
txnObjectMgr.addTransactions(ChannelID.NULL_ID, txns, Collections.EMPTY_LIST);
84       return serverTxnIDs;
85     } catch (IOException JavaDoc e) {
86       throw new AssertionError JavaDoc(e);
87     }
88   }
89
90   private void doSyncObjectsResponse(ObjectSyncMessage syncMsg) {
91     ArrayList JavaDoc txns = new ArrayList JavaDoc(1);
92     ServerTransaction txn = ServerTransactionFactory.createTxnFrom(syncMsg);
93     txns.add(txn);
94     // TODO:: remove channelID.NULL_ID thingy
95
txnObjectMgr.addTransactions(ChannelID.NULL_ID, txns, Collections.EMPTY_LIST);
96   }
97
98   // TODO:: Update stats so that admin console reflects these data
99
private void doSyncObjectsRequest(SyncObjectsRequest request) {
100     NodeID nodeID = request.getNodeID();
101     ManagedObjectSyncContext lookupContext = l2ObjectStateMgr.getSomeObjectsToSyncContext(nodeID, 500, dehydrateSink);
102     // TODO:: Remove ChannelID from ObjectManager interface
103
if (lookupContext != null) {
104       objectManager.lookupObjectsAndSubObjectsFor(ChannelID.NULL_ID, lookupContext, -1);
105     }
106   }
107
108   public void initialize(ConfigurationContext context) {
109     super.initialize(context);
110     ServerConfigurationContext oscc = (ServerConfigurationContext) context;
111     this.batchReaderFactory = oscc.getTransactionBatchReaderFactory();
112     this.objectManager = oscc.getObjectManager();
113     this.txnObjectMgr = oscc.getTransactionalObjectManager();
114     this.dehydrateSink = oscc.getStage(ServerConfigurationContext.OBJECTS_SYNC_DEHYDRATE_STAGE).getSink();
115     this.sendSink = oscc.getStage(ServerConfigurationContext.OBJECTS_SYNC_SEND_STAGE).getSink();
116   }
117
118 }
119
Popular Tags