KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > l2 > objectserver > ReplicatedObjectManagerImpl


1 /*
2  * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.l2.objectserver;
6
7 import com.tc.async.api.Sink;
8 import com.tc.l2.context.SyncObjectsRequest;
9 import com.tc.l2.msg.ObjectListSyncMessage;
10 import com.tc.l2.msg.ObjectListSyncMessageFactory;
11 import com.tc.l2.state.StateManager;
12 import com.tc.logging.TCLogger;
13 import com.tc.logging.TCLogging;
14 import com.tc.net.groups.GroupException;
15 import com.tc.net.groups.GroupManager;
16 import com.tc.net.groups.GroupMessage;
17 import com.tc.net.groups.GroupMessageListener;
18 import com.tc.net.groups.GroupResponse;
19 import com.tc.net.groups.NodeID;
20 import com.tc.objectserver.api.ObjectManager;
21 import com.tc.util.Assert;
22
23 import java.util.Iterator JavaDoc;
24 import java.util.Set JavaDoc;
25
26 public class ReplicatedObjectManagerImpl implements ReplicatedObjectManager, GroupMessageListener {
27
28   private static final TCLogger logger = TCLogging.getLogger(ReplicatedObjectManagerImpl.class);
29
30   private final ObjectManager objectManager;
31   private final GroupManager groupManager;
32   private final StateManager stateManager;
33   private final L2ObjectStateManager l2ObjectStateManager;
34   private final Sink objectsSyncSink;
35
36   public ReplicatedObjectManagerImpl(GroupManager groupManager, StateManager stateManager,
37                                      L2ObjectStateManager l2ObjectStateManager, ObjectManager objectManager,
38                                      Sink objectsSyncSink) {
39     this.groupManager = groupManager;
40     this.stateManager = stateManager;
41     this.objectManager = objectManager;
42     this.objectsSyncSink = objectsSyncSink;
43     this.l2ObjectStateManager = l2ObjectStateManager;
44     this.groupManager.registerForMessages(ObjectListSyncMessage.class, this);
45   }
46
47   /**
48    * This method is used to sync up all ObjectIDs from the remote ObjectManagers. It is synchronous and after when it
49    * returns nobody is allowed to join the cluster with exisiting objects.
50    */

51   public void sync() {
52     try {
53       GroupResponse gr = groupManager.sendAllAndWaitForResponse(ObjectListSyncMessageFactory
54           .createObjectListSyncRequestMessage());
55       for (Iterator JavaDoc i = gr.getResponses().iterator(); i.hasNext();) {
56         ObjectListSyncMessage msg = (ObjectListSyncMessage) i.next();
57         add2L2StateManager(msg.messageFrom(), msg.getObjectIDs());
58       }
59     } catch (GroupException e) {
60       logger.error(e);
61       throw new AssertionError JavaDoc(e);
62     }
63   }
64
65   // Query current state of the other L2
66
public void query(NodeID nodeID) {
67     try {
68       groupManager.sendTo(nodeID, ObjectListSyncMessageFactory.createObjectListSyncRequestMessage());
69     } catch (GroupException e) {
70       logger.error("Error Writting Msg : ", e);
71     }
72   }
73
74   //TODO::Verify that message order is maintained.
75
public void messageReceived(NodeID fromNode, GroupMessage msg) {
76     if (msg instanceof ObjectListSyncMessage) {
77       ObjectListSyncMessage clusterMsg = (ObjectListSyncMessage) msg;
78       handleClusterObjectMessage(fromNode, clusterMsg);
79     } else {
80       throw new AssertionError JavaDoc("ReplicatedObjectManagerImpl : Received wrong message type :" + msg.getClass().getName()
81                                + " : " + msg);
82
83     }
84   }
85
86   private void handleClusterObjectMessage(NodeID nodeID, ObjectListSyncMessage clusterMsg) {
87     try {
88       switch (clusterMsg.getType()) {
89         case ObjectListSyncMessage.REQUEST:
90           handleObjectListRequest(nodeID, clusterMsg);
91           break;
92         case ObjectListSyncMessage.RESPONSE:
93           handleObjectListResponse(nodeID, clusterMsg);
94           break;
95
96         default:
97           throw new AssertionError JavaDoc("This message shouldn't have been routed here : " + clusterMsg);
98       }
99     } catch (GroupException e) {
100       logger.error("Error handling message : " + clusterMsg, e);
101       throw new AssertionError JavaDoc(e);
102     }
103   }
104
105   private void handleObjectListResponse(NodeID nodeID, ObjectListSyncMessage clusterMsg) {
106     Assert.assertTrue(stateManager.isActiveCoordinator());
107     Set JavaDoc oids = clusterMsg.getObjectIDs();
108     if (!oids.isEmpty()) {
109       logger.error("Nodes joining the cluster after startup shouldnt have any Objects. " + nodeID + " contains "
110                    + oids.size() + " Objects !!!");
111       logger.error("Forcing node to Quit !!");
112       groupManager.zapNode(nodeID);
113     } else {
114       add2L2StateManager(nodeID, oids);
115     }
116   }
117
118   private void add2L2StateManager(NodeID nodeID, Set JavaDoc oids) {
119     int missing = l2ObjectStateManager.addL2WithObjectIDs(nodeID, oids, objectManager);
120     if (missing == 0) {
121       stateManager.moveNodeToPassiveStandby(nodeID);
122     } else {
123       objectsSyncSink.add(new SyncObjectsRequest(nodeID));
124     }
125   }
126
127   private void handleObjectListRequest(NodeID nodeID, ObjectListSyncMessage clusterMsg) throws GroupException {
128     Assert.assertFalse(stateManager.isActiveCoordinator());
129     Set JavaDoc knownIDs = objectManager.getAllObjectIDs();
130     groupManager.sendTo(nodeID, ObjectListSyncMessageFactory.createObjectListSyncResponseMessage(clusterMsg, knownIDs));
131   }
132
133   /**
134    * TODO:: This method could be more intellegent to return true only when there are passives involved
135    */

136   public boolean relayTransactions() {
137     return true;
138   }
139 }
140
Popular Tags