KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > handshakemanager > ServerClientHandshakeManager


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.handshakemanager;
6
7 import com.tc.async.api.Sink;
8 import com.tc.async.impl.NullSink;
9 import com.tc.logging.TCLogger;
10 import com.tc.net.protocol.tcm.ChannelID;
11 import com.tc.net.protocol.transport.ConnectionID;
12 import com.tc.object.lockmanager.api.LockContext;
13 import com.tc.object.lockmanager.api.WaitContext;
14 import com.tc.object.msg.ClientHandshakeMessage;
15 import com.tc.object.net.DSOChannelManager;
16 import com.tc.objectserver.api.ObjectManager;
17 import com.tc.objectserver.l1.api.ClientStateManager;
18 import com.tc.objectserver.lockmanager.api.LockManager;
19 import com.tc.objectserver.tx.ServerTransactionManager;
20 import com.tc.util.SequenceValidator;
21 import com.tc.util.TCTimer;
22 import com.tc.util.sequence.ObjectIDSequence;
23
24 import java.util.HashSet JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.Set JavaDoc;
27 import java.util.TimerTask JavaDoc;
28
29 public class ServerClientHandshakeManager {
30
31   private static final State INIT = new State("INIT");
32   private static final State STARTING = new State("STARTING");
33   private static final State STARTED = new State("STARTED");
34   private static final int BATCH_SEQUENCE_SIZE = 10000;
35
36   public static final Sink NULL_SINK = new NullSink();
37
38   private State state = INIT;
39
40   private final TCTimer timer;
41   private final ReconnectTimerTask reconnectTimerTask;
42   private final ClientStateManager clientStateManager;
43   private final LockManager lockManager;
44   private final Sink lockResponseSink;
45   private final long reconnectTimeout;
46   private final ObjectManager objectManager;
47   private final DSOChannelManager channelManager;
48   private final TCLogger logger;
49   private final SequenceValidator sequenceValidator;
50   private final Set JavaDoc existingUnconnectedClients = new HashSet JavaDoc();
51   private final ServerTransactionManager transactionManager;
52   private final ObjectIDSequence oidSequence;
53   private final Set JavaDoc clientsRequestingObjectIDSequence = new HashSet JavaDoc();
54   private final boolean persistent;
55
56   public ServerClientHandshakeManager(TCLogger logger, DSOChannelManager channelManager, ObjectManager objectManager,
57                                       SequenceValidator sequenceValidator, ClientStateManager clientStateManager,
58                                       LockManager lockManager, ServerTransactionManager transactionManager,
59                                       Sink lockResponseSink, ObjectIDSequence oidSequence, TCTimer timer,
60                                       long reconnectTimeout, boolean persistent) {
61     this.logger = logger;
62     this.channelManager = channelManager;
63     this.objectManager = objectManager;
64     this.sequenceValidator = sequenceValidator;
65     this.clientStateManager = clientStateManager;
66     this.lockManager = lockManager;
67     this.transactionManager = transactionManager;
68     this.lockResponseSink = lockResponseSink;
69     this.oidSequence = oidSequence;
70     this.reconnectTimeout = reconnectTimeout;
71     this.timer = timer;
72     this.persistent = persistent;
73     this.reconnectTimerTask = new ReconnectTimerTask(this, timer);
74   }
75
76   public synchronized boolean isStarting() {
77     return state == STARTING;
78   }
79
80   public synchronized boolean isStarted() {
81     return state == STARTED;
82   }
83
84   public void notifyClientConnect(ClientHandshakeMessage handshake) throws ClientHandshakeException {
85     ChannelID channelID = handshake.getChannelID();
86     logger.info("Client connected " + channelID);
87     synchronized (this) {
88       logger.debug("Handling client handshake...");
89       if (state == STARTED) {
90         if (handshake.getObjectIDs().size() > 0) {
91           //
92
throw new ClientHandshakeException(
93                                              "Clients connected after startup should have no existing object references.");
94         }
95         if (handshake.getWaitContexts().size() > 0) {
96           //
97
throw new ClientHandshakeException("Clients connected after startup should have no existing wait contexts.");
98         }
99         if (handshake.isObjectIDsRequested()) {
100           logger.debug("Client " + channelID + " requested Object ID Sequences ");
101           clientsRequestingObjectIDSequence.add(channelID);
102         }
103         // XXX: It would be better to not have two different code paths that both call sendAckMessageFor(..)
104
sendAckMessageFor(channelID);
105         return;
106       }
107
108       if (state == STARTING) {
109         channelManager.makeChannelActiveNoAck(handshake.getChannel());
110       }
111
112       this.sequenceValidator.initSequence(handshake.getChannelID(), handshake.getTransactionSequenceIDs());
113       this.transactionManager.setResentTransactionIDs(handshake.getChannelID(), handshake.getResentTransactionIDs());
114
115       clientStateManager.addReferences(channelID, handshake.getObjectIDs());
116
117       for (Iterator JavaDoc i = handshake.getLockContexts().iterator(); i.hasNext();) {
118         LockContext ctxt = (LockContext) i.next();
119         lockManager.reestablishLock(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(),
120                                     lockResponseSink);
121       }
122
123       for (Iterator JavaDoc i = handshake.getWaitContexts().iterator(); i.hasNext();) {
124         WaitContext ctxt = (WaitContext) i.next();
125         lockManager.reestablishWait(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(),
126                                     ctxt.getWaitInvocation(), lockResponseSink);
127       }
128
129       for (Iterator JavaDoc i = handshake.getPendingLockContexts().iterator(); i.hasNext();) {
130         LockContext ctxt = (LockContext) i.next();
131         if (ctxt.noBlock()) {
132           lockManager.tryRequestLock(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(),
133                                      lockResponseSink);
134         } else {
135           lockManager.requestLock(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(),
136                                   lockResponseSink);
137         }
138       }
139
140       if (handshake.isObjectIDsRequested()) {
141         logger.debug("Client " + channelID + " requested Object ID Sequences ");
142         clientsRequestingObjectIDSequence.add(channelID);
143       }
144
145       if (state == STARTING) {
146         logger.debug("Removing client " + channelID + " from set of existing unconnected clients.");
147         existingUnconnectedClients.remove(channelID);
148         if (existingUnconnectedClients.isEmpty()) {
149           logger.debug("Last existing unconnected client (" + channelID + ") now connected. Cancelling timer");
150           timer.cancel();
151           start();
152         }
153       } else {
154         sendAckMessageFor(channelID);
155       }
156     }
157   }
158
159   private void sendAckMessageFor(ChannelID channelID) {
160     logger.debug("Sending handshake acknowledgement to " + channelID);
161
162     final long startIDs;
163     final long endIDs;
164     if (clientsRequestingObjectIDSequence.remove(channelID)) {
165       final long ids = oidSequence.nextObjectIDBatch(BATCH_SEQUENCE_SIZE);
166       logger.debug("Giving out Object ID Sequences to " + channelID + " from " + ids + " to "
167                    + (ids + BATCH_SEQUENCE_SIZE));
168
169       startIDs = ids;
170       endIDs = ids + BATCH_SEQUENCE_SIZE;
171     } else {
172       startIDs = endIDs = 0;
173     }
174
175     // NOTE: handshake ack message initialize()/send() must be done atomically with making the channel active
176
// and is thus done inside this channel manager call
177
channelManager.makeChannelActive(channelID, startIDs, endIDs, persistent);
178   }
179
180   public synchronized void notifyTimeout() {
181     assertNotStarted();
182     logger.info("Reconnect window closing. Killing any previously connected clients that failed to connect in time: "
183                 + existingUnconnectedClients);
184     this.channelManager.closeAll(existingUnconnectedClients);
185     for (Iterator JavaDoc i = existingUnconnectedClients.iterator(); i.hasNext();) {
186       ChannelID deadClient = (ChannelID) i.next();
187       this.clientStateManager.shutdownClient(deadClient);
188       i.remove();
189     }
190     logger.info("Reconnect window closed. All dead clients removed.");
191     start();
192   }
193
194   private void start() {
195     logger.info("Starting DSO services...");
196     lockManager.start();
197     objectManager.start();
198     for (Iterator JavaDoc i = channelManager.getRawChannelIDs().iterator(); i.hasNext();) {
199       ChannelID channelID = (ChannelID) i.next();
200       sendAckMessageFor(channelID);
201     }
202     state = STARTED;
203   }
204
205   public synchronized void setStarting(Set JavaDoc existingConnections) {
206     assertInit();
207     state = STARTING;
208     if (existingConnections.isEmpty()) {
209       start();
210     } else {
211       for (Iterator JavaDoc i = existingConnections.iterator(); i.hasNext();) {
212         existingUnconnectedClients.add(new ChannelID(((ConnectionID)i.next()).getChannelID()));
213       }
214       logger.info("Starting reconnect window: " + this.reconnectTimeout + " ms.");
215       timer.schedule(reconnectTimerTask, this.reconnectTimeout);
216     }
217   }
218
219   private void assertInit() {
220     if (state != INIT) throw new AssertionError JavaDoc("Should be in STARTING state: " + state);
221   }
222
223   private void assertNotStarted() {
224     if (state == STARTED) throw new AssertionError JavaDoc("In STARTING state, but shouldn't be.");
225   }
226
227   /**
228    * Notifies handshake manager that the reconnect time has passed.
229    *
230    * @author orion
231    */

232   private static class ReconnectTimerTask extends TimerTask JavaDoc {
233
234     private final TCTimer timer;
235     private final ServerClientHandshakeManager handshakeManager;
236
237     private ReconnectTimerTask(ServerClientHandshakeManager handshakeManager, TCTimer timer) {
238       this.handshakeManager = handshakeManager;
239       this.timer = timer;
240     }
241
242     public void run() {
243       timer.cancel();
244       handshakeManager.notifyTimeout();
245     }
246
247   }
248
249   private static class State {
250     private final String JavaDoc name;
251
252     private State(String JavaDoc name) {
253       this.name = name;
254     }
255
256     public String JavaDoc toString() {
257       return getClass().getName() + "[" + name + "]";
258     }
259   }
260
261 }
262
Popular Tags