1 5 package com.tc.object.handshakemanager; 6 7 import com.tc.async.api.Sink; 8 import com.tc.async.api.Stage; 9 import com.tc.cluster.Cluster; 10 import com.tc.logging.TCLogger; 11 import com.tc.net.protocol.tcm.ChannelEvent; 12 import com.tc.net.protocol.tcm.ChannelEventListener; 13 import com.tc.net.protocol.tcm.ChannelEventType; 14 import com.tc.net.protocol.tcm.ChannelIDProvider; 15 import com.tc.object.ClientObjectManager; 16 import com.tc.object.ObjectID; 17 import com.tc.object.PauseListener; 18 import com.tc.object.RemoteObjectManager; 19 import com.tc.object.context.PauseContext; 20 import com.tc.object.gtx.ClientGlobalTransactionManager; 21 import com.tc.object.lockmanager.api.ClientLockManager; 22 import com.tc.object.lockmanager.api.LockContext; 23 import com.tc.object.lockmanager.api.LockRequest; 24 import com.tc.object.lockmanager.api.WaitContext; 25 import com.tc.object.lockmanager.api.WaitLockRequest; 26 import com.tc.object.msg.ClientHandshakeMessage; 27 import com.tc.object.msg.ClientHandshakeMessageFactory; 28 import com.tc.object.session.SessionManager; 29 import com.tc.object.tx.RemoteTransactionManager; 30 import com.tc.util.State; 31 import com.tc.util.Util; 32 import com.tc.util.sequence.BatchSequenceReceiver; 33 34 import java.util.Collection ; 35 import java.util.HashSet ; 36 import java.util.Iterator ; 37 38 public class ClientHandshakeManager implements ChannelEventListener { 39 private static final State PAUSED = new State("PAUSED"); 40 private static final State STARTING = new State("STARTING"); 41 private static final State RUNNING = new State("RUNNING"); 42 43 private final ClientObjectManager objectManager; 44 private final ClientLockManager lockManager; 45 private final ChannelIDProvider cidp; 46 private final ClientHandshakeMessageFactory chmf; 47 private final RemoteObjectManager remoteObjectManager; 48 private final ClientGlobalTransactionManager gtxManager; 49 private final TCLogger logger; 50 private final Collection stagesToPauseOnDisconnect; 51 private final Sink pauseSink; 52 private final SessionManager sessionManager; 53 private final PauseListener pauseListener; 54 private final BatchSequenceReceiver sequenceReceiver; 55 private final Cluster cluster; 56 57 private State state = PAUSED; 58 private boolean stagesPaused = false; 59 private boolean serverIsPersistent = false; 60 61 public ClientHandshakeManager(TCLogger logger, ChannelIDProvider cidp, ClientHandshakeMessageFactory chmf, 62 ClientObjectManager objectManager, RemoteObjectManager remoteObjectManager, 63 ClientLockManager lockManager, RemoteTransactionManager remoteTransactionManager, 64 ClientGlobalTransactionManager gtxManager, Collection stagesToPauseOnDisconnect, 65 Sink pauseSink, SessionManager sessionManager, PauseListener pauseListener, 66 BatchSequenceReceiver sequenceReceiver, Cluster cluster) { 67 this.logger = logger; 68 this.cidp = cidp; 69 this.chmf = chmf; 70 this.objectManager = objectManager; 71 this.remoteObjectManager = remoteObjectManager; 72 this.lockManager = lockManager; 73 this.gtxManager = gtxManager; 74 this.stagesToPauseOnDisconnect = stagesToPauseOnDisconnect; 75 this.pauseSink = pauseSink; 76 this.sessionManager = sessionManager; 77 this.pauseListener = pauseListener; 78 this.sequenceReceiver = sequenceReceiver; 79 this.cluster = cluster; 80 pauseManagers(); 81 } 82 83 public void initiateHandshake() { 84 logger.debug("Initiating handshake..."); 85 changeState(STARTING); 86 notifyManagersStarting(); 87 88 ClientHandshakeMessage handshakeMessage = chmf.newClientHandshakeMessage(); 89 90 handshakeMessage.setTransactionSequenceIDs(gtxManager.getTransactionSequenceIDs()); 91 handshakeMessage.setResentTransactionIDs(gtxManager.getResentTransactionIDs()); 92 93 logger.debug("Getting object ids..."); 94 for (Iterator i = objectManager.getAllObjectIDsAndClear(new HashSet ()).iterator(); i.hasNext();) { 95 handshakeMessage.addObjectID((ObjectID) i.next()); 96 } 97 98 logger.debug("Getting lock holders..."); 99 for (Iterator i = lockManager.addAllHeldLocksTo(new HashSet ()).iterator(); i.hasNext();) { 100 LockRequest request = (LockRequest) i.next(); 101 LockContext ctxt = new LockContext(request.lockID(), cidp.getChannelID(), request.threadID(), request.lockLevel()); 102 handshakeMessage.addLockContext(ctxt); 103 } 104 105 logger.debug("Getting lock waiters..."); 106 for (Iterator i = lockManager.addAllWaitersTo(new HashSet ()).iterator(); i.hasNext();) { 107 WaitLockRequest request = (WaitLockRequest) i.next(); 108 WaitContext ctxt = new WaitContext(request.lockID(), cidp.getChannelID(), request.threadID(), 109 request.lockLevel(), request.getWaitInvocation()); 110 handshakeMessage.addWaitContext(ctxt); 111 } 112 113 logger.debug("Getting pending lock requests..."); 114 for (Iterator i = lockManager.addAllPendingLockRequestsTo(new HashSet ()).iterator(); i.hasNext();) { 115 LockRequest request = (LockRequest) i.next(); 116 LockContext ctxt = new LockContext(request.lockID(), cidp.getChannelID(), request.threadID(), request.lockLevel(), request.noBlock()); 117 handshakeMessage.addPendingLockContext(ctxt); 118 } 119 120 logger.debug("Checking to see if is object ids sequence is needed ..."); 121 handshakeMessage.setIsObjectIDsRequested(!sequenceReceiver.hasNext()); 122 123 logger.debug("Sending handshake message..."); 124 handshakeMessage.send(); 125 } 126 127 public void notifyChannelEvent(ChannelEvent event) { 128 if (event.getType() == ChannelEventType.TRANSPORT_DISCONNECTED_EVENT) { 129 cluster.thisNodeDisconnected(); 130 sessionManager.newSession(); 131 pauseSink.add(PauseContext.PAUSE); 132 } else if (event.getType() == ChannelEventType.TRANSPORT_CONNECTED_EVENT) { 133 pauseSink.add(PauseContext.UNPAUSE); 134 } else if (event.getType() == ChannelEventType.CHANNEL_CLOSED_EVENT) { 135 cluster.thisNodeDisconnected(); 136 } 137 } 138 139 public void pause() { 140 logger.info("Pause " + getState()); 141 if (getState() == PAUSED) { 142 logger.warn("pause called while already PAUSED"); 143 return; 144 } 145 pauseStages(); 146 pauseManagers(); 147 changeState(PAUSED); 148 } 149 150 public void unpause() { 151 logger.info("Unpause " + getState()); 152 if (getState() != PAUSED) { 153 logger.warn("unpause called while not PAUSED: " + getState()); 154 return; 155 } 156 unpauseStages(); 157 initiateHandshake(); 158 } 159 160 public void acknowledgeHandshake(long objectIDStart, long objectIDEnd, boolean persistentServer, String thisNodeId, 161 String [] clusterMembers) { 162 if (getState() != STARTING) { 163 logger.warn("Handshake acknowledged while not STARTING: " + getState()); 164 return; 165 } 166 167 this.serverIsPersistent = persistentServer; 168 169 cluster.thisNodeConnected(thisNodeId, clusterMembers); 170 171 if (objectIDStart < objectIDEnd) { 172 logger.debug("Setting the ObjectID sequence to: " + objectIDStart + " , " + objectIDEnd); 173 sequenceReceiver.setNextBatch(objectIDStart, objectIDEnd); 174 } 175 176 logger.debug("Re-requesting outstanding object requests..."); 177 remoteObjectManager.requestOutstanding(); 178 179 logger.debug("Handshake acknowledged. Resending incomplete transactions..."); 180 gtxManager.resendOutstandingAndUnpause(); 181 unpauseManagers(); 182 183 changeState(RUNNING); 184 } 185 186 private void pauseManagers() { 187 lockManager.pause(); 188 objectManager.pause(); 189 remoteObjectManager.pause(); 190 gtxManager.pause(); 191 pauseListener.notifyPause(); 192 } 193 194 private void notifyManagersStarting() { 195 lockManager.starting(); 196 objectManager.starting(); 197 remoteObjectManager.starting(); 198 gtxManager.starting(); 199 } 200 201 private void unpauseManagers() { 204 lockManager.unpause(); 205 objectManager.unpause(); 206 remoteObjectManager.unpause(); 207 pauseListener.notifyUnpause(); 208 } 209 210 private void pauseStages() { 211 if (!stagesPaused) { 212 logger.debug("Pausing stages..."); 213 for (Iterator i = stagesToPauseOnDisconnect.iterator(); i.hasNext();) { 214 ((Stage) i.next()).pause(); 215 } 216 stagesPaused = true; 217 } else { 218 logger.debug("pauseStages(): Stages are paused; not pausing stages."); 219 } 220 } 221 222 private void unpauseStages() { 223 if (stagesPaused) { 224 logger.debug("Unpausing stages..."); 225 for (Iterator i = stagesToPauseOnDisconnect.iterator(); i.hasNext();) { 226 ((Stage) i.next()).unpause(); 227 } 228 stagesPaused = false; 229 } else { 230 logger.debug("unpauseStages(): Stages not paused; not unpausing stages."); 231 } 232 } 233 234 237 public boolean serverIsPersistent() { 238 return this.serverIsPersistent; 239 } 240 241 public synchronized void waitForHandshake() { 242 boolean isInterrupted = false; 243 while (state != RUNNING) { 244 try { 245 wait(); 246 } catch (InterruptedException e) { 247 logger.error("Interrupted while waiting for handshake"); 248 isInterrupted = true; 249 } 250 } 251 Util.selfInterruptIfNeeded(isInterrupted); 252 } 253 254 private synchronized void changeState(State newState) { 255 state = newState; 256 notifyAll(); 257 } 258 259 private synchronized State getState() { 260 return state; 261 } 262 263 } 264 | Popular Tags |