1 5 package com.tc.object.tx; 6 7 import com.tc.logging.TCLogger; 8 import com.tc.object.lockmanager.api.LockFlushCallback; 9 import com.tc.object.lockmanager.api.LockID; 10 import com.tc.object.net.DSOClientMessageChannel; 11 import com.tc.object.session.SessionID; 12 import com.tc.object.session.SessionManager; 13 import com.tc.properties.TCPropertiesImpl; 14 import com.tc.util.Assert; 15 import com.tc.util.SequenceID; 16 import com.tc.util.State; 17 import com.tc.util.TCAssertionError; 18 import com.tc.util.Util; 19 20 import java.util.ArrayList ; 21 import java.util.Arrays ; 22 import java.util.Collection ; 23 import java.util.Collections ; 24 import java.util.HashMap ; 25 import java.util.HashSet ; 26 import java.util.Iterator ; 27 import java.util.LinkedHashSet ; 28 import java.util.List ; 29 import java.util.Map ; 30 import java.util.Set ; 31 import java.util.Map.Entry; 32 33 38 public class RemoteTransactionManagerImpl implements RemoteTransactionManager { 39 40 private static final long TIMEOUT = 30000L; 41 42 private static final int MAX_OUTSTANDING_BATCHES = TCPropertiesImpl 43 .getProperties() 44 .getInt( 45 "l1.transactionmanager.maxOutstandingBatchSize"); 46 47 private static final State STARTING = new State("STARTING"); 48 private static final State RUNNING = new State("RUNNING"); 49 private static final State PAUSED = new State("PAUSED"); 50 private static final State STOP_INITIATED = new State("STOP-INITIATED"); 51 private static final State STOPPED = new State("STOPPED"); 52 53 private final Object lock = new Object (); 54 private final Map incompleteBatches = new HashMap (); 55 private final HashMap lockFlushCallbacks = new HashMap (); 56 57 private int outStandingBatches = 0; 58 private final TCLogger logger; 59 private final TransactionBatchAccounting batchAccounting; 60 private final LockAccounting lockAccounting; 61 62 private State status; 63 private final SessionManager sessionManager; 64 private final TransactionSequencer sequencer; 65 private final DSOClientMessageChannel channel; 66 67 public RemoteTransactionManagerImpl(TCLogger logger, final TransactionBatchFactory batchFactory, 68 TransactionBatchAccounting batchAccounting, LockAccounting lockAccounting, 69 SessionManager sessionManager, DSOClientMessageChannel channel) { 70 this.logger = logger; 71 this.batchAccounting = batchAccounting; 72 this.lockAccounting = lockAccounting; 73 this.sessionManager = sessionManager; 74 this.channel = channel; 75 this.status = RUNNING; 76 this.sequencer = new TransactionSequencer(batchFactory); 77 } 78 79 public void pause() { 80 synchronized (lock) { 81 if (isStoppingOrStopped()) return; 82 if (this.status == PAUSED) throw new AssertionError ("Attempt to pause while already paused."); 83 this.status = PAUSED; 84 } 85 } 86 87 public void starting() { 88 synchronized (lock) { 89 if (isStoppingOrStopped()) return; 90 if (this.status != PAUSED) throw new AssertionError ("Attempt to start while not paused."); 91 this.status = STARTING; 92 } 93 } 94 95 public void unpause() { 96 synchronized (lock) { 97 if (isStoppingOrStopped()) return; 98 if (this.status != STARTING) throw new AssertionError ("Attempt to unpause while not in starting."); 99 this.status = RUNNING; 100 lock.notifyAll(); 101 } 102 } 103 104 107 public void clear() { 108 synchronized (lock) { 109 sequencer.clear(); 110 incompleteBatches.clear(); 111 } 112 } 113 114 117 public int getMaxOutStandingBatches() { 118 return MAX_OUTSTANDING_BATCHES; 119 } 120 121 public void stopProcessing() { 122 sequencer.shutdown(); 123 channel.close(); 124 } 125 126 public void stop() { 127 final long start = System.currentTimeMillis(); 128 logger.debug("stop() is called on " + System.identityHashCode(this)); 129 synchronized (lock) { 130 this.status = STOP_INITIATED; 131 132 sendBatches(true, "stop()"); 133 134 int count = 10; 135 long t0 = System.currentTimeMillis(); 136 if (incompleteBatches.size() != 0) { 137 try { 138 int incompleteBatchesCount = 0; 139 while (status != STOPPED && (t0 + TIMEOUT * count) > System.currentTimeMillis()) { 140 if (incompleteBatchesCount != incompleteBatches.size()) { 141 logger.debug("stop(): incompleteBatches.size() = " + (incompleteBatchesCount = incompleteBatches.size())); 142 } 143 lock.wait(TIMEOUT); 144 } 145 } catch (InterruptedException e) { 146 logger.warn("stop(): Interrupted " + e); 147 } 148 if (status != STOPPED) { 149 logger.error("stop() : There are still UNACKed Transactions! incompleteBatches.size() = " 150 + incompleteBatches.size()); 151 } 152 } 153 this.status = STOPPED; 154 } 155 logger.info("stop(): took " + (System.currentTimeMillis() - start) + " millis to complete"); 156 } 157 158 public void flush(LockID lockID) { 159 boolean isInterrupted = false; 160 Collection c; 161 synchronized (lock) { 162 while ((!(c = lockAccounting.getTransactionsFor(lockID)).isEmpty())) { 163 try { 164 long waitTime = 15 * 1000; 165 long t0 = System.currentTimeMillis(); 166 lock.wait(waitTime); 167 if ((System.currentTimeMillis() - t0) > waitTime) { 168 logger.info("Flush for " + lockID + " took longer than: " + waitTime 169 + "ms. # Transactions not yet Acked = " + c.size() + "\n"); 170 } 171 } catch (InterruptedException e) { 172 isInterrupted = true; 173 } 174 } 175 } 176 Util.selfInterruptIfNeeded(isInterrupted); 177 } 178 179 180 public boolean isTransactionsForLockFlushed(LockID lockID, LockFlushCallback callback) { 181 synchronized (lock) { 182 183 if ((lockAccounting.getTransactionsFor(lockID)).isEmpty()) { 184 return true; 186 } else { 187 Object prev = lockFlushCallbacks.put(lockID, callback); 189 if (prev != null) { 190 throw new TCAssertionError("There is already a registered call back on Lock Flush for this lock ID - " 193 + lockID); 194 } 195 return false; 196 } 197 } 198 } 199 200 public void commit(ClientTransaction txn) { 201 TransactionID txID = txn.getTransactionID(); 202 203 if (!txn.hasChangesOrNotifies()) throw new AssertionError ("Attempt to commit an empty transaction."); 204 if (!txn.isConcurrent()) { 205 lockAccounting.add(txID, Arrays.asList(txn.getAllLockIDs())); 206 } 207 208 long start = System.currentTimeMillis(); 209 sequencer.addTransaction(txn); 210 long diff = System.currentTimeMillis() - start; 211 if (diff > 1000) { 212 logger.info("WARNING ! Took more than 1000ms to add to sequencer : " + diff + " ms"); 213 } 214 215 synchronized (lock) { 216 if (isStoppingOrStopped()) { 217 sendBatches(true, "commit() : Stop initiated."); 219 } 220 waitUntilRunning(); 221 sendBatches(false); 222 } 223 } 224 225 private void sendBatches(boolean ignoreMax) { 226 sendBatches(ignoreMax, null); 227 } 228 229 private void sendBatches(boolean ignoreMax, String message) { 230 ClientTransactionBatch batch; 231 while ((ignoreMax || canSendBatch()) && (batch = sequencer.getNextBatch()) != null) { 232 if (message != null) { 233 logger.debug(message + " : Sending batch containing " + batch.numberOfTxns() + " Txns."); 234 } 235 sendBatch(batch, true); 236 } 237 } 238 239 private boolean canSendBatch() { 240 return (outStandingBatches < MAX_OUTSTANDING_BATCHES); 241 } 242 243 public void resendOutstanding() { 244 synchronized (lock) { 245 if (status != STARTING && !isStoppingOrStopped()) { 246 throw new AssertionError (this + ": Attempt to resend incomplete batches while not starting. Status=" + status); 248 } 249 logger.debug("resendOutstanding()..."); 250 outStandingBatches = 0; 251 List toSend = batchAccounting.addIncompleteBatchIDsTo(new ArrayList ()); 252 if (toSend.size() == 0) { 253 sendBatches(false, " resendOutstanding()"); 254 } else { 255 for (Iterator i = toSend.iterator(); i.hasNext();) { 256 TxnBatchID id = (TxnBatchID) i.next(); 257 ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches.get(id); 258 if (batch == null) throw new AssertionError ("Unknown batch: " + id); 259 logger.debug("Resending outstanding batch: " + id + ", " + batch.addTransactionIDsTo(new LinkedHashSet ())); 260 sendBatch(batch, false); 261 } 262 } 263 } 264 } 265 266 public Collection getTransactionSequenceIDs() { 267 synchronized (lock) { 268 HashSet sequenceIDs = new HashSet (); 269 if (!isStoppingOrStopped() && (status != STARTING)) { 270 throw new AssertionError ("Attempt to get current transaction sequence while not starting: " + status); 271 } else { 272 List toSend = batchAccounting.addIncompleteBatchIDsTo(new ArrayList ()); 274 for (Iterator i = toSend.iterator(); i.hasNext();) { 275 TxnBatchID id = (TxnBatchID) i.next(); 276 ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches.get(id); 277 if (batch == null) throw new AssertionError ("Unknown batch: " + id); 278 batch.addTransactionSequenceIDsTo(sequenceIDs); 279 } 280 SequenceID currentBatchMinSeq = sequencer.getNextSequenceID(); 282 Assert.assertFalse(SequenceID.NULL_ID.equals(currentBatchMinSeq)); 283 sequenceIDs.add(currentBatchMinSeq); 284 } 285 return sequenceIDs; 286 } 287 } 288 289 public Collection getResentTransactionIDs() { 290 synchronized (lock) { 291 HashSet txIDs = new HashSet (); 292 if (!isStoppingOrStopped() && (status != STARTING)) { 293 throw new AssertionError ("Attempt to get resent transaction IDs while not starting: " + status); 294 } else { 295 List toSend = batchAccounting.addIncompleteBatchIDsTo(new ArrayList ()); 297 for (Iterator i = toSend.iterator(); i.hasNext();) { 298 TxnBatchID id = (TxnBatchID) i.next(); 299 ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches.get(id); 300 if (batch == null) throw new AssertionError ("Unknown batch: " + id); 301 batch.addTransactionIDsTo(txIDs); 302 } 303 } 304 return txIDs; 305 } 306 } 307 308 private boolean isStoppingOrStopped() { 309 return status == STOP_INITIATED || status == STOPPED; 310 } 311 312 private void sendBatch(ClientTransactionBatch batchToSend, boolean account) { 313 synchronized (lock) { 314 if (account) { 315 if (incompleteBatches.put(batchToSend.getTransactionBatchID(), batchToSend) != null) { 316 throw new AssertionError ("Batch has already been sent!"); 318 } 319 Collection txnIds = batchToSend.addTransactionIDsTo(new HashSet ()); 320 batchAccounting.addBatch(batchToSend.getTransactionBatchID(), txnIds); 321 batchToSend.addAcknowledgedTransactionIDs(batchAccounting.addCompletedTransactionIDsTo(new HashSet ())); 322 batchAccounting.clearCompletedTransactionIds(); 323 } 324 batchToSend.send(); 325 outStandingBatches++; 326 } 327 } 328 329 public void receivedBatchAcknowledgement(TxnBatchID txnBatchID) { 330 synchronized (lock) { 331 if (status == STOP_INITIATED) { 332 logger.warn(status + " : Received ACK for batch = " + txnBatchID); 333 lock.notifyAll(); 334 return; 335 } 336 337 waitUntilRunning(); 338 outStandingBatches--; 339 sendBatches(false); 340 lock.notifyAll(); 341 } 342 } 343 344 public void receivedAcknowledgement(SessionID sessionID, TransactionID txID) { 345 Map callbacks; 346 synchronized (lock) { 347 if (!sessionManager.isCurrentSession(sessionID)) { 349 logger.warn("Ignoring Transaction ACK for " + txID + " from previous session = " + sessionID); 350 return; 351 } 352 353 Set completedLocks = lockAccounting.acknowledge(txID); 354 355 TxnBatchID container = batchAccounting.getBatchByTransactionID(txID); 356 if (!container.isNull()) { 357 ClientTransactionBatch containingBatch = (ClientTransactionBatch) incompleteBatches.get(container); 358 containingBatch.removeTransaction(txID); 359 TxnBatchID completed = batchAccounting.acknowledge(txID); 360 if (!completed.isNull()) { 361 incompleteBatches.remove(completed); 362 if (status == STOP_INITIATED && incompleteBatches.size() == 0) { 363 logger.debug("Received ACK for the last Transaction. Moving to STOPPED state."); 364 status = STOPPED; 365 } 366 } 367 } else { 368 logger.fatal("No batch found for acknowledgement: " + txID + " The batch accounting is " + batchAccounting); 369 throw new AssertionError ("No batch found for acknowledgement: " + txID); 370 } 371 lock.notifyAll(); 372 callbacks = getLockFlushCallbacks(completedLocks); 373 } 374 fireLockFlushCallbacks(callbacks); 375 } 376 377 380 private void fireLockFlushCallbacks(Map callbacks) { 381 if (callbacks.isEmpty()) return; 382 for (Iterator i = callbacks.entrySet().iterator(); i.hasNext();) { 383 Entry e = (Entry) i.next(); 384 LockID lid = (LockID) e.getKey(); 385 LockFlushCallback callback = (LockFlushCallback) e.getValue(); 386 callback.transactionsForLockFlushed(lid); 387 } 388 } 389 390 private Map getLockFlushCallbacks(Set completedLocks) { 391 Map callbacks = Collections.EMPTY_MAP; 392 if (!completedLocks.isEmpty() && !lockFlushCallbacks.isEmpty()) { 393 for (Iterator i = completedLocks.iterator(); i.hasNext();) { 394 Object lid = i.next(); 395 Object callback = lockFlushCallbacks.remove(lid); 396 if (callback != null) { 397 if (callbacks == Collections.EMPTY_MAP) { 398 callbacks = new HashMap (); 399 } 400 callbacks.put(lid, callback); 401 } 402 } 403 } 404 return callbacks; 405 } 406 407 private void waitUntilRunning() { 408 boolean isInterrupted = false; 409 while (status != RUNNING) { 410 try { 411 lock.wait(); 412 } catch (InterruptedException e) { 413 isInterrupted = true; 414 } 415 } 416 Util.selfInterruptIfNeeded(isInterrupted); 417 } 418 419 public void resendOutstandingAndUnpause() { 422 synchronized (lock) { 423 resendOutstanding(); 424 unpause(); 425 } 426 } 427 } 428 | Popular Tags |