1 5 package com.tc.objectserver.handshakemanager; 6 7 import com.tc.async.api.Sink; 8 import com.tc.async.impl.NullSink; 9 import com.tc.logging.TCLogger; 10 import com.tc.net.protocol.tcm.ChannelID; 11 import com.tc.net.protocol.transport.ConnectionID; 12 import com.tc.object.lockmanager.api.LockContext; 13 import com.tc.object.lockmanager.api.WaitContext; 14 import com.tc.object.msg.ClientHandshakeMessage; 15 import com.tc.object.net.DSOChannelManager; 16 import com.tc.objectserver.api.ObjectManager; 17 import com.tc.objectserver.l1.api.ClientStateManager; 18 import com.tc.objectserver.lockmanager.api.LockManager; 19 import com.tc.objectserver.tx.ServerTransactionManager; 20 import com.tc.util.SequenceValidator; 21 import com.tc.util.TCTimer; 22 import com.tc.util.sequence.ObjectIDSequence; 23 24 import java.util.HashSet ; 25 import java.util.Iterator ; 26 import java.util.Set ; 27 import java.util.TimerTask ; 28 29 public class ServerClientHandshakeManager { 30 31 private static final State INIT = new State("INIT"); 32 private static final State STARTING = new State("STARTING"); 33 private static final State STARTED = new State("STARTED"); 34 private static final int BATCH_SEQUENCE_SIZE = 10000; 35 36 public static final Sink NULL_SINK = new NullSink(); 37 38 private State state = INIT; 39 40 private final TCTimer timer; 41 private final ReconnectTimerTask reconnectTimerTask; 42 private final ClientStateManager clientStateManager; 43 private final LockManager lockManager; 44 private final Sink lockResponseSink; 45 private final long reconnectTimeout; 46 private final ObjectManager objectManager; 47 private final DSOChannelManager channelManager; 48 private final TCLogger logger; 49 private final SequenceValidator sequenceValidator; 50 private final Set existingUnconnectedClients = new HashSet (); 51 private final ServerTransactionManager transactionManager; 52 private final ObjectIDSequence oidSequence; 53 private final Set clientsRequestingObjectIDSequence = new HashSet (); 54 private final boolean persistent; 55 56 public ServerClientHandshakeManager(TCLogger logger, DSOChannelManager channelManager, ObjectManager objectManager, 57 SequenceValidator sequenceValidator, ClientStateManager clientStateManager, 58 LockManager lockManager, ServerTransactionManager transactionManager, 59 Sink lockResponseSink, ObjectIDSequence oidSequence, TCTimer timer, 60 long reconnectTimeout, boolean persistent) { 61 this.logger = logger; 62 this.channelManager = channelManager; 63 this.objectManager = objectManager; 64 this.sequenceValidator = sequenceValidator; 65 this.clientStateManager = clientStateManager; 66 this.lockManager = lockManager; 67 this.transactionManager = transactionManager; 68 this.lockResponseSink = lockResponseSink; 69 this.oidSequence = oidSequence; 70 this.reconnectTimeout = reconnectTimeout; 71 this.timer = timer; 72 this.persistent = persistent; 73 this.reconnectTimerTask = new ReconnectTimerTask(this, timer); 74 } 75 76 public synchronized boolean isStarting() { 77 return state == STARTING; 78 } 79 80 public synchronized boolean isStarted() { 81 return state == STARTED; 82 } 83 84 public void notifyClientConnect(ClientHandshakeMessage handshake) throws ClientHandshakeException { 85 ChannelID channelID = handshake.getChannelID(); 86 logger.info("Client connected " + channelID); 87 synchronized (this) { 88 logger.debug("Handling client handshake..."); 89 if (state == STARTED) { 90 if (handshake.getObjectIDs().size() > 0) { 91 throw new ClientHandshakeException( 93 "Clients connected after startup should have no existing object references."); 94 } 95 if (handshake.getWaitContexts().size() > 0) { 96 throw new ClientHandshakeException("Clients connected after startup should have no existing wait contexts."); 98 } 99 if (handshake.isObjectIDsRequested()) { 100 logger.debug("Client " + channelID + " requested Object ID Sequences "); 101 clientsRequestingObjectIDSequence.add(channelID); 102 } 103 sendAckMessageFor(channelID); 105 return; 106 } 107 108 if (state == STARTING) { 109 channelManager.makeChannelActiveNoAck(handshake.getChannel()); 110 } 111 112 this.sequenceValidator.initSequence(handshake.getChannelID(), handshake.getTransactionSequenceIDs()); 113 this.transactionManager.setResentTransactionIDs(handshake.getChannelID(), handshake.getResentTransactionIDs()); 114 115 clientStateManager.addReferences(channelID, handshake.getObjectIDs()); 116 117 for (Iterator i = handshake.getLockContexts().iterator(); i.hasNext();) { 118 LockContext ctxt = (LockContext) i.next(); 119 lockManager.reestablishLock(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(), 120 lockResponseSink); 121 } 122 123 for (Iterator i = handshake.getWaitContexts().iterator(); i.hasNext();) { 124 WaitContext ctxt = (WaitContext) i.next(); 125 lockManager.reestablishWait(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(), 126 ctxt.getWaitInvocation(), lockResponseSink); 127 } 128 129 for (Iterator i = handshake.getPendingLockContexts().iterator(); i.hasNext();) { 130 LockContext ctxt = (LockContext) i.next(); 131 if (ctxt.noBlock()) { 132 lockManager.tryRequestLock(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(), 133 lockResponseSink); 134 } else { 135 lockManager.requestLock(ctxt.getLockID(), ctxt.getChannelID(), ctxt.getThreadID(), ctxt.getLockLevel(), 136 lockResponseSink); 137 } 138 } 139 140 if (handshake.isObjectIDsRequested()) { 141 logger.debug("Client " + channelID + " requested Object ID Sequences "); 142 clientsRequestingObjectIDSequence.add(channelID); 143 } 144 145 if (state == STARTING) { 146 logger.debug("Removing client " + channelID + " from set of existing unconnected clients."); 147 existingUnconnectedClients.remove(channelID); 148 if (existingUnconnectedClients.isEmpty()) { 149 logger.debug("Last existing unconnected client (" + channelID + ") now connected. Cancelling timer"); 150 timer.cancel(); 151 start(); 152 } 153 } else { 154 sendAckMessageFor(channelID); 155 } 156 } 157 } 158 159 private void sendAckMessageFor(ChannelID channelID) { 160 logger.debug("Sending handshake acknowledgement to " + channelID); 161 162 final long startIDs; 163 final long endIDs; 164 if (clientsRequestingObjectIDSequence.remove(channelID)) { 165 final long ids = oidSequence.nextObjectIDBatch(BATCH_SEQUENCE_SIZE); 166 logger.debug("Giving out Object ID Sequences to " + channelID + " from " + ids + " to " 167 + (ids + BATCH_SEQUENCE_SIZE)); 168 169 startIDs = ids; 170 endIDs = ids + BATCH_SEQUENCE_SIZE; 171 } else { 172 startIDs = endIDs = 0; 173 } 174 175 channelManager.makeChannelActive(channelID, startIDs, endIDs, persistent); 178 } 179 180 public synchronized void notifyTimeout() { 181 assertNotStarted(); 182 logger.info("Reconnect window closing. Killing any previously connected clients that failed to connect in time: " 183 + existingUnconnectedClients); 184 this.channelManager.closeAll(existingUnconnectedClients); 185 for (Iterator i = existingUnconnectedClients.iterator(); i.hasNext();) { 186 ChannelID deadClient = (ChannelID) i.next(); 187 this.clientStateManager.shutdownClient(deadClient); 188 i.remove(); 189 } 190 logger.info("Reconnect window closed. All dead clients removed."); 191 start(); 192 } 193 194 private void start() { 195 logger.info("Starting DSO services..."); 196 lockManager.start(); 197 objectManager.start(); 198 for (Iterator i = channelManager.getRawChannelIDs().iterator(); i.hasNext();) { 199 ChannelID channelID = (ChannelID) i.next(); 200 sendAckMessageFor(channelID); 201 } 202 state = STARTED; 203 } 204 205 public synchronized void setStarting(Set existingConnections) { 206 assertInit(); 207 state = STARTING; 208 if (existingConnections.isEmpty()) { 209 start(); 210 } else { 211 for (Iterator i = existingConnections.iterator(); i.hasNext();) { 212 existingUnconnectedClients.add(new ChannelID(((ConnectionID)i.next()).getChannelID())); 213 } 214 logger.info("Starting reconnect window: " + this.reconnectTimeout + " ms."); 215 timer.schedule(reconnectTimerTask, this.reconnectTimeout); 216 } 217 } 218 219 private void assertInit() { 220 if (state != INIT) throw new AssertionError ("Should be in STARTING state: " + state); 221 } 222 223 private void assertNotStarted() { 224 if (state == STARTED) throw new AssertionError ("In STARTING state, but shouldn't be."); 225 } 226 227 232 private static class ReconnectTimerTask extends TimerTask { 233 234 private final TCTimer timer; 235 private final ServerClientHandshakeManager handshakeManager; 236 237 private ReconnectTimerTask(ServerClientHandshakeManager handshakeManager, TCTimer timer) { 238 this.handshakeManager = handshakeManager; 239 this.timer = timer; 240 } 241 242 public void run() { 243 timer.cancel(); 244 handshakeManager.notifyTimeout(); 245 } 246 247 } 248 249 private static class State { 250 private final String name; 251 252 private State(String name) { 253 this.name = name; 254 } 255 256 public String toString() { 257 return getClass().getName() + "[" + name + "]"; 258 } 259 } 260 261 } 262 | Popular Tags |