KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > handshakemanager > ClientHandshakeManager


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object.handshakemanager;
6
7 import com.tc.async.api.Sink;
8 import com.tc.async.api.Stage;
9 import com.tc.cluster.Cluster;
10 import com.tc.logging.TCLogger;
11 import com.tc.net.protocol.tcm.ChannelEvent;
12 import com.tc.net.protocol.tcm.ChannelEventListener;
13 import com.tc.net.protocol.tcm.ChannelEventType;
14 import com.tc.net.protocol.tcm.ChannelIDProvider;
15 import com.tc.object.ClientObjectManager;
16 import com.tc.object.ObjectID;
17 import com.tc.object.PauseListener;
18 import com.tc.object.RemoteObjectManager;
19 import com.tc.object.context.PauseContext;
20 import com.tc.object.gtx.ClientGlobalTransactionManager;
21 import com.tc.object.lockmanager.api.ClientLockManager;
22 import com.tc.object.lockmanager.api.LockContext;
23 import com.tc.object.lockmanager.api.LockRequest;
24 import com.tc.object.lockmanager.api.WaitContext;
25 import com.tc.object.lockmanager.api.WaitLockRequest;
26 import com.tc.object.msg.ClientHandshakeMessage;
27 import com.tc.object.msg.ClientHandshakeMessageFactory;
28 import com.tc.object.session.SessionManager;
29 import com.tc.object.tx.RemoteTransactionManager;
30 import com.tc.util.State;
31 import com.tc.util.Util;
32 import com.tc.util.sequence.BatchSequenceReceiver;
33
34 import java.util.Collection JavaDoc;
35 import java.util.HashSet JavaDoc;
36 import java.util.Iterator JavaDoc;
37
38 public class ClientHandshakeManager implements ChannelEventListener {
39   private static final State PAUSED = new State("PAUSED");
40   private static final State STARTING = new State("STARTING");
41   private static final State RUNNING = new State("RUNNING");
42
43   private final ClientObjectManager objectManager;
44   private final ClientLockManager lockManager;
45   private final ChannelIDProvider cidp;
46   private final ClientHandshakeMessageFactory chmf;
47   private final RemoteObjectManager remoteObjectManager;
48   private final ClientGlobalTransactionManager gtxManager;
49   private final TCLogger logger;
50   private final Collection JavaDoc stagesToPauseOnDisconnect;
51   private final Sink pauseSink;
52   private final SessionManager sessionManager;
53   private final PauseListener pauseListener;
54   private final BatchSequenceReceiver sequenceReceiver;
55   private final Cluster cluster;
56
57   private State state = PAUSED;
58   private boolean stagesPaused = false;
59   private boolean serverIsPersistent = false;
60
61   public ClientHandshakeManager(TCLogger logger, ChannelIDProvider cidp, ClientHandshakeMessageFactory chmf,
62                                 ClientObjectManager objectManager, RemoteObjectManager remoteObjectManager,
63                                 ClientLockManager lockManager, RemoteTransactionManager remoteTransactionManager,
64                                 ClientGlobalTransactionManager gtxManager, Collection JavaDoc stagesToPauseOnDisconnect,
65                                 Sink pauseSink, SessionManager sessionManager, PauseListener pauseListener,
66                                 BatchSequenceReceiver sequenceReceiver, Cluster cluster) {
67     this.logger = logger;
68     this.cidp = cidp;
69     this.chmf = chmf;
70     this.objectManager = objectManager;
71     this.remoteObjectManager = remoteObjectManager;
72     this.lockManager = lockManager;
73     this.gtxManager = gtxManager;
74     this.stagesToPauseOnDisconnect = stagesToPauseOnDisconnect;
75     this.pauseSink = pauseSink;
76     this.sessionManager = sessionManager;
77     this.pauseListener = pauseListener;
78     this.sequenceReceiver = sequenceReceiver;
79     this.cluster = cluster;
80     pauseManagers();
81   }
82
83   public void initiateHandshake() {
84     logger.debug("Initiating handshake...");
85     changeState(STARTING);
86     notifyManagersStarting();
87
88     ClientHandshakeMessage handshakeMessage = chmf.newClientHandshakeMessage();
89
90     handshakeMessage.setTransactionSequenceIDs(gtxManager.getTransactionSequenceIDs());
91     handshakeMessage.setResentTransactionIDs(gtxManager.getResentTransactionIDs());
92
93     logger.debug("Getting object ids...");
94     for (Iterator JavaDoc i = objectManager.getAllObjectIDsAndClear(new HashSet JavaDoc()).iterator(); i.hasNext();) {
95       handshakeMessage.addObjectID((ObjectID) i.next());
96     }
97
98     logger.debug("Getting lock holders...");
99     for (Iterator JavaDoc i = lockManager.addAllHeldLocksTo(new HashSet JavaDoc()).iterator(); i.hasNext();) {
100       LockRequest request = (LockRequest) i.next();
101       LockContext ctxt = new LockContext(request.lockID(), cidp.getChannelID(), request.threadID(), request.lockLevel());
102       handshakeMessage.addLockContext(ctxt);
103     }
104
105     logger.debug("Getting lock waiters...");
106     for (Iterator JavaDoc i = lockManager.addAllWaitersTo(new HashSet JavaDoc()).iterator(); i.hasNext();) {
107       WaitLockRequest request = (WaitLockRequest) i.next();
108       WaitContext ctxt = new WaitContext(request.lockID(), cidp.getChannelID(), request.threadID(),
109                                          request.lockLevel(), request.getWaitInvocation());
110       handshakeMessage.addWaitContext(ctxt);
111     }
112
113     logger.debug("Getting pending lock requests...");
114     for (Iterator JavaDoc i = lockManager.addAllPendingLockRequestsTo(new HashSet JavaDoc()).iterator(); i.hasNext();) {
115       LockRequest request = (LockRequest) i.next();
116       LockContext ctxt = new LockContext(request.lockID(), cidp.getChannelID(), request.threadID(), request.lockLevel(), request.noBlock());
117       handshakeMessage.addPendingLockContext(ctxt);
118     }
119
120     logger.debug("Checking to see if is object ids sequence is needed ...");
121     handshakeMessage.setIsObjectIDsRequested(!sequenceReceiver.hasNext());
122
123     logger.debug("Sending handshake message...");
124     handshakeMessage.send();
125   }
126
127   public void notifyChannelEvent(ChannelEvent event) {
128     if (event.getType() == ChannelEventType.TRANSPORT_DISCONNECTED_EVENT) {
129       cluster.thisNodeDisconnected();
130       sessionManager.newSession();
131       pauseSink.add(PauseContext.PAUSE);
132     } else if (event.getType() == ChannelEventType.TRANSPORT_CONNECTED_EVENT) {
133       pauseSink.add(PauseContext.UNPAUSE);
134     } else if (event.getType() == ChannelEventType.CHANNEL_CLOSED_EVENT) {
135       cluster.thisNodeDisconnected();
136     }
137   }
138
139   public void pause() {
140     logger.info("Pause " + getState());
141     if (getState() == PAUSED) {
142       logger.warn("pause called while already PAUSED");
143       return;
144     }
145     pauseStages();
146     pauseManagers();
147     changeState(PAUSED);
148   }
149
150   public void unpause() {
151     logger.info("Unpause " + getState());
152     if (getState() != PAUSED) {
153       logger.warn("unpause called while not PAUSED: " + getState());
154       return;
155     }
156     unpauseStages();
157     initiateHandshake();
158   }
159
160   public void acknowledgeHandshake(long objectIDStart, long objectIDEnd, boolean persistentServer, String JavaDoc thisNodeId,
161                                    String JavaDoc[] clusterMembers) {
162     if (getState() != STARTING) {
163       logger.warn("Handshake acknowledged while not STARTING: " + getState());
164       return;
165     }
166
167     this.serverIsPersistent = persistentServer;
168
169     cluster.thisNodeConnected(thisNodeId, clusterMembers);
170
171     if (objectIDStart < objectIDEnd) {
172       logger.debug("Setting the ObjectID sequence to: " + objectIDStart + " , " + objectIDEnd);
173       sequenceReceiver.setNextBatch(objectIDStart, objectIDEnd);
174     }
175
176     logger.debug("Re-requesting outstanding object requests...");
177     remoteObjectManager.requestOutstanding();
178
179     logger.debug("Handshake acknowledged. Resending incomplete transactions...");
180     gtxManager.resendOutstandingAndUnpause();
181     unpauseManagers();
182
183     changeState(RUNNING);
184   }
185
186   private void pauseManagers() {
187     lockManager.pause();
188     objectManager.pause();
189     remoteObjectManager.pause();
190     gtxManager.pause();
191     pauseListener.notifyPause();
192   }
193
194   private void notifyManagersStarting() {
195     lockManager.starting();
196     objectManager.starting();
197     remoteObjectManager.starting();
198     gtxManager.starting();
199   }
200
201   // XXX:: Note that gtxmanager is actually unpaused outside this method as it
202
// has to resend transactions and unpause in a single step.
203
private void unpauseManagers() {
204     lockManager.unpause();
205     objectManager.unpause();
206     remoteObjectManager.unpause();
207     pauseListener.notifyUnpause();
208   }
209
210   private void pauseStages() {
211     if (!stagesPaused) {
212       logger.debug("Pausing stages...");
213       for (Iterator JavaDoc i = stagesToPauseOnDisconnect.iterator(); i.hasNext();) {
214         ((Stage) i.next()).pause();
215       }
216       stagesPaused = true;
217     } else {
218       logger.debug("pauseStages(): Stages are paused; not pausing stages.");
219     }
220   }
221
222   private void unpauseStages() {
223     if (stagesPaused) {
224       logger.debug("Unpausing stages...");
225       for (Iterator JavaDoc i = stagesToPauseOnDisconnect.iterator(); i.hasNext();) {
226         ((Stage) i.next()).unpause();
227       }
228       stagesPaused = false;
229     } else {
230       logger.debug("unpauseStages(): Stages not paused; not unpausing stages.");
231     }
232   }
233
234   /**
235    *
236    */

237   public boolean serverIsPersistent() {
238     return this.serverIsPersistent;
239   }
240
241   public synchronized void waitForHandshake() {
242     boolean isInterrupted = false;
243     while (state != RUNNING) {
244       try {
245         wait();
246       } catch (InterruptedException JavaDoc e) {
247         logger.error("Interrupted while waiting for handshake");
248         isInterrupted = true;
249       }
250     }
251     Util.selfInterruptIfNeeded(isInterrupted);
252   }
253
254   private synchronized void changeState(State newState) {
255     state = newState;
256     notifyAll();
257   }
258
259   private synchronized State getState() {
260     return state;
261   }
262
263 }
264
Popular Tags