KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > l2 > ha > ReplicatedClusterStateManagerImpl


1 /*
2  * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.l2.ha;
6
7 import com.tc.l2.api.ReplicatedClusterStateManager;
8 import com.tc.l2.msg.ClusterStateMessage;
9 import com.tc.l2.msg.ClusterStateMessageFactory;
10 import com.tc.l2.state.StateManager;
11 import com.tc.logging.TCLogger;
12 import com.tc.logging.TCLogging;
13 import com.tc.net.groups.GroupException;
14 import com.tc.net.groups.GroupManager;
15 import com.tc.net.groups.GroupMessage;
16 import com.tc.net.groups.GroupMessageListener;
17 import com.tc.net.groups.GroupResponse;
18 import com.tc.net.groups.NodeID;
19 import com.tc.net.protocol.transport.ConnectionID;
20 import com.tc.net.protocol.transport.ConnectionIDFactory;
21 import com.tc.net.protocol.transport.ConnectionIDFactoryListener;
22 import com.tc.util.Assert;
23 import com.tc.util.UUID;
24
25 import java.util.Iterator JavaDoc;
26
27 public class ReplicatedClusterStateManagerImpl implements ReplicatedClusterStateManager, GroupMessageListener,
28     ConnectionIDFactoryListener {
29
30   private static final TCLogger logger = TCLogging.getLogger(ReplicatedClusterStateManagerImpl.class);
31
32   private final GroupManager groupManager;
33   private final ClusterState state;
34
35   private final StateManager stateManager;
36
37   public ReplicatedClusterStateManagerImpl(GroupManager groupManager, StateManager stateManager,
38                                            ClusterState clusterState, ConnectionIDFactory factory) {
39     this.groupManager = groupManager;
40     this.stateManager = stateManager;
41     state = clusterState;
42     groupManager.registerForMessages(ClusterStateMessage.class, this);
43     factory.registerForConnectionIDEvents(this);
44   }
45
46   public synchronized void goActiveAndSyncState() {
47     generateClusterIDIfNeeded();
48
49     // Sync state to internal DB
50
state.syncInternal();
51
52     // Sync state to external passive servers
53
publishToAll(ClusterStateMessageFactory.createClusterStateMessage(state));
54   }
55
56   private void generateClusterIDIfNeeded() {
57     if (state.getClusterID() == null) {
58       // This is the first time an L2 goes active in the cluster of L2s. Generate a new clusterID. this will stick.
59
state.setClusterID(UUID.getUUID().toString());
60     }
61   }
62
63   public synchronized void publishClusterState(NodeID nodeID) {
64
65     try {
66       ClusterStateMessage msg = (ClusterStateMessage) groupManager
67           .sendToAndWaitForResponse(nodeID, ClusterStateMessageFactory.createClusterStateMessage(state));
68       validateResponse(nodeID, msg);
69     } catch (GroupException e) {
70       logger.error("Error plublishing cluster state : " + nodeID + " terminating it");
71       groupManager.zapNode(nodeID);
72     }
73
74   }
75
76   private void validateResponse(NodeID nodeID, ClusterStateMessage msg) {
77     if (msg == null || msg.getType() != ClusterStateMessage.OPERATION_SUCCESS) {
78       logger.error("Recd wrong response from : " + nodeID + " : msg = " + msg
79                    + " while publishing Next Available ObjectID: Killing the node");
80       groupManager.zapNode(nodeID);
81     }
82   }
83
84   // TODO:: Sync only once a while to the passives
85
public synchronized void publishNextAvailableObjectID(long maxID) {
86     state.setNextAvailableObjectID(maxID);
87     publishToAll(ClusterStateMessageFactory.createNextAvailableObjectIDMessage(state));
88   }
89
90   public synchronized void connectionIDCreated(ConnectionID connectionID) {
91     Assert.assertTrue(stateManager.isActiveCoordinator());
92     state.addNewConnection(connectionID);
93     publishToAll(ClusterStateMessageFactory.createNewConnectionCreatedMessage(connectionID));
94   }
95
96   public synchronized void connectionIDDestroyed(ConnectionID connectionID) {
97     Assert.assertTrue(stateManager.isActiveCoordinator());
98     state.removeConnection(connectionID);
99     publishToAll(ClusterStateMessageFactory.createConnectionDestroyedMessage(connectionID));
100   }
101
102   private void publishToAll(GroupMessage message) {
103     try {
104       GroupResponse gr = groupManager.sendAllAndWaitForResponse(message);
105       for (Iterator JavaDoc i = gr.getResponses().iterator(); i.hasNext();) {
106         ClusterStateMessage msg = (ClusterStateMessage) i.next();
107         validateResponse(msg.messageFrom(), msg);
108       }
109     } catch (GroupException e) {
110       // TODO:: Is this extreme ?
111
throw new AssertionError JavaDoc(e);
112     }
113   }
114
115   public void messageReceived(NodeID fromNode, GroupMessage msg) {
116     if (msg instanceof ClusterStateMessage) {
117       ClusterStateMessage clusterMsg = (ClusterStateMessage) msg;
118       handleClusterStateMessage(fromNode, clusterMsg);
119     } else {
120       throw new AssertionError JavaDoc("ReplicatedClusterStateManagerImpl : Received wrong message type :"
121                                + msg.getClass().getName() + " : " + msg);
122     }
123   }
124
125   private void handleClusterStateMessage(NodeID fromNode, ClusterStateMessage msg) {
126     if (stateManager.isActiveCoordinator()) {
127       logger.warn("Recd ClusterStateMessage from " + fromNode
128                   + " while I am the cluster co-ordinator. This is bad. Ignoring the message");
129       return;
130     }
131     msg.initState(state);
132     sendOKResponse(fromNode, msg);
133   }
134
135   private void sendOKResponse(NodeID fromNode, ClusterStateMessage msg) {
136     try {
137       groupManager.sendTo(fromNode, ClusterStateMessageFactory.createOKResponse(msg));
138     } catch (GroupException e) {
139       logger.error("Error handling message : " + msg, e);
140     }
141   }
142
143 }
144
Popular Tags