KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > tx > RemoteTransactionManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object.tx;
6
7 import com.tc.logging.TCLogger;
8 import com.tc.object.lockmanager.api.LockFlushCallback;
9 import com.tc.object.lockmanager.api.LockID;
10 import com.tc.object.net.DSOClientMessageChannel;
11 import com.tc.object.session.SessionID;
12 import com.tc.object.session.SessionManager;
13 import com.tc.properties.TCPropertiesImpl;
14 import com.tc.util.Assert;
15 import com.tc.util.SequenceID;
16 import com.tc.util.State;
17 import com.tc.util.TCAssertionError;
18 import com.tc.util.Util;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Arrays JavaDoc;
22 import java.util.Collection JavaDoc;
23 import java.util.Collections JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.HashSet JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.LinkedHashSet JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.Set JavaDoc;
31 import java.util.Map.Entry;
32
33 /**
34  * Sends off committed transactions
35  *
36  * @author steve
37  */

38 public class RemoteTransactionManagerImpl implements RemoteTransactionManager {
39
40   private static final long TIMEOUT = 30000L;
41
42   private static final int MAX_OUTSTANDING_BATCHES = TCPropertiesImpl
43                                                                        .getProperties()
44                                                                        .getInt(
45                                                                                "l1.transactionmanager.maxOutstandingBatchSize");
46
47   private static final State STARTING = new State("STARTING");
48   private static final State RUNNING = new State("RUNNING");
49   private static final State PAUSED = new State("PAUSED");
50   private static final State STOP_INITIATED = new State("STOP-INITIATED");
51   private static final State STOPPED = new State("STOPPED");
52
53   private final Object JavaDoc lock = new Object JavaDoc();
54   private final Map JavaDoc incompleteBatches = new HashMap JavaDoc();
55   private final HashMap JavaDoc lockFlushCallbacks = new HashMap JavaDoc();
56
57   private int outStandingBatches = 0;
58   private final TCLogger logger;
59   private final TransactionBatchAccounting batchAccounting;
60   private final LockAccounting lockAccounting;
61
62   private State status;
63   private final SessionManager sessionManager;
64   private final TransactionSequencer sequencer;
65   private final DSOClientMessageChannel channel;
66
67   public RemoteTransactionManagerImpl(TCLogger logger, final TransactionBatchFactory batchFactory,
68                                       TransactionBatchAccounting batchAccounting, LockAccounting lockAccounting,
69                                       SessionManager sessionManager, DSOClientMessageChannel channel) {
70     this.logger = logger;
71     this.batchAccounting = batchAccounting;
72     this.lockAccounting = lockAccounting;
73     this.sessionManager = sessionManager;
74     this.channel = channel;
75     this.status = RUNNING;
76     this.sequencer = new TransactionSequencer(batchFactory);
77   }
78
79   public void pause() {
80     synchronized (lock) {
81       if (isStoppingOrStopped()) return;
82       if (this.status == PAUSED) throw new AssertionError JavaDoc("Attempt to pause while already paused.");
83       this.status = PAUSED;
84     }
85   }
86
87   public void starting() {
88     synchronized (lock) {
89       if (isStoppingOrStopped()) return;
90       if (this.status != PAUSED) throw new AssertionError JavaDoc("Attempt to start while not paused.");
91       this.status = STARTING;
92     }
93   }
94
95   public void unpause() {
96     synchronized (lock) {
97       if (isStoppingOrStopped()) return;
98       if (this.status != STARTING) throw new AssertionError JavaDoc("Attempt to unpause while not in starting.");
99       this.status = RUNNING;
100       lock.notifyAll();
101     }
102   }
103
104   /**
105    * This is for testing only.
106    */

107   public void clear() {
108     synchronized (lock) {
109       sequencer.clear();
110       incompleteBatches.clear();
111     }
112   }
113
114   /**
115    * This is for testing only.
116    */

117   public int getMaxOutStandingBatches() {
118     return MAX_OUTSTANDING_BATCHES;
119   }
120
121   public void stopProcessing() {
122     sequencer.shutdown();
123     channel.close();
124   }
125
126   public void stop() {
127     final long start = System.currentTimeMillis();
128     logger.debug("stop() is called on " + System.identityHashCode(this));
129     synchronized (lock) {
130       this.status = STOP_INITIATED;
131
132       sendBatches(true, "stop()");
133
134       int count = 10;
135       long t0 = System.currentTimeMillis();
136       if (incompleteBatches.size() != 0) {
137         try {
138           int incompleteBatchesCount = 0;
139           while (status != STOPPED && (t0 + TIMEOUT * count) > System.currentTimeMillis()) {
140             if (incompleteBatchesCount != incompleteBatches.size()) {
141               logger.debug("stop(): incompleteBatches.size() = " + (incompleteBatchesCount = incompleteBatches.size()));
142             }
143             lock.wait(TIMEOUT);
144           }
145         } catch (InterruptedException JavaDoc e) {
146           logger.warn("stop(): Interrupted " + e);
147         }
148         if (status != STOPPED) {
149           logger.error("stop() : There are still UNACKed Transactions! incompleteBatches.size() = "
150                        + incompleteBatches.size());
151         }
152       }
153       this.status = STOPPED;
154     }
155     logger.info("stop(): took " + (System.currentTimeMillis() - start) + " millis to complete");
156   }
157
158   public void flush(LockID lockID) {
159     boolean isInterrupted = false;
160     Collection JavaDoc c;
161     synchronized (lock) {
162       while ((!(c = lockAccounting.getTransactionsFor(lockID)).isEmpty())) {
163         try {
164           long waitTime = 15 * 1000;
165           long t0 = System.currentTimeMillis();
166           lock.wait(waitTime);
167           if ((System.currentTimeMillis() - t0) > waitTime) {
168             logger.info("Flush for " + lockID + " took longer than: " + waitTime
169                         + "ms. # Transactions not yet Acked = " + c.size() + "\n");
170           }
171         } catch (InterruptedException JavaDoc e) {
172           isInterrupted = true;
173         }
174       }
175     }
176     Util.selfInterruptIfNeeded(isInterrupted);
177   }
178
179   /* This does not block unlike flush() */
180   public boolean isTransactionsForLockFlushed(LockID lockID, LockFlushCallback callback) {
181     synchronized (lock) {
182
183       if ((lockAccounting.getTransactionsFor(lockID)).isEmpty()) {
184         // All transactions are flushed !
185
return true;
186       } else {
187         // register for call back
188
Object JavaDoc prev = lockFlushCallbacks.put(lockID, callback);
189         if (prev != null) {
190           // Will this scenario comeup in server restart scenario ? It should as we check for greediness in the Lock
191
// Manager before making this call
192
throw new TCAssertionError("There is already a registered call back on Lock Flush for this lock ID - "
193                                      + lockID);
194         }
195         return false;
196       }
197     }
198   }
199
200   public void commit(ClientTransaction txn) {
201     TransactionID txID = txn.getTransactionID();
202
203     if (!txn.hasChangesOrNotifies()) throw new AssertionError JavaDoc("Attempt to commit an empty transaction.");
204     if (!txn.isConcurrent()) {
205       lockAccounting.add(txID, Arrays.asList(txn.getAllLockIDs()));
206     }
207
208     long start = System.currentTimeMillis();
209     sequencer.addTransaction(txn);
210     long diff = System.currentTimeMillis() - start;
211     if (diff > 1000) {
212       logger.info("WARNING ! Took more than 1000ms to add to sequencer : " + diff + " ms");
213     }
214
215     synchronized (lock) {
216       if (isStoppingOrStopped()) {
217         // Send now if stop is requested
218
sendBatches(true, "commit() : Stop initiated.");
219       }
220       waitUntilRunning();
221       sendBatches(false);
222     }
223   }
224
225   private void sendBatches(boolean ignoreMax) {
226     sendBatches(ignoreMax, null);
227   }
228
229   private void sendBatches(boolean ignoreMax, String JavaDoc message) {
230     ClientTransactionBatch batch;
231     while ((ignoreMax || canSendBatch()) && (batch = sequencer.getNextBatch()) != null) {
232       if (message != null) {
233         logger.debug(message + " : Sending batch containing " + batch.numberOfTxns() + " Txns.");
234       }
235       sendBatch(batch, true);
236     }
237   }
238
239   private boolean canSendBatch() {
240     return (outStandingBatches < MAX_OUTSTANDING_BATCHES);
241   }
242
243   public void resendOutstanding() {
244     synchronized (lock) {
245       if (status != STARTING && !isStoppingOrStopped()) {
246         // formatting
247
throw new AssertionError JavaDoc(this + ": Attempt to resend incomplete batches while not starting. Status=" + status);
248       }
249       logger.debug("resendOutstanding()...");
250       outStandingBatches = 0;
251       List JavaDoc toSend = batchAccounting.addIncompleteBatchIDsTo(new ArrayList JavaDoc());
252       if (toSend.size() == 0) {
253         sendBatches(false, " resendOutstanding()");
254       } else {
255         for (Iterator JavaDoc i = toSend.iterator(); i.hasNext();) {
256           TxnBatchID id = (TxnBatchID) i.next();
257           ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches.get(id);
258           if (batch == null) throw new AssertionError JavaDoc("Unknown batch: " + id);
259           logger.debug("Resending outstanding batch: " + id + ", " + batch.addTransactionIDsTo(new LinkedHashSet JavaDoc()));
260           sendBatch(batch, false);
261         }
262       }
263     }
264   }
265
266   public Collection JavaDoc getTransactionSequenceIDs() {
267     synchronized (lock) {
268       HashSet JavaDoc sequenceIDs = new HashSet JavaDoc();
269       if (!isStoppingOrStopped() && (status != STARTING)) {
270         throw new AssertionError JavaDoc("Attempt to get current transaction sequence while not starting: " + status);
271       } else {
272         // Add list of SequenceIDs that are going to be resent
273
List JavaDoc toSend = batchAccounting.addIncompleteBatchIDsTo(new ArrayList JavaDoc());
274         for (Iterator JavaDoc i = toSend.iterator(); i.hasNext();) {
275           TxnBatchID id = (TxnBatchID) i.next();
276           ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches.get(id);
277           if (batch == null) throw new AssertionError JavaDoc("Unknown batch: " + id);
278           batch.addTransactionSequenceIDsTo(sequenceIDs);
279         }
280         // Add Last next
281
SequenceID currentBatchMinSeq = sequencer.getNextSequenceID();
282         Assert.assertFalse(SequenceID.NULL_ID.equals(currentBatchMinSeq));
283         sequenceIDs.add(currentBatchMinSeq);
284       }
285       return sequenceIDs;
286     }
287   }
288
289   public Collection JavaDoc getResentTransactionIDs() {
290     synchronized (lock) {
291       HashSet JavaDoc txIDs = new HashSet JavaDoc();
292       if (!isStoppingOrStopped() && (status != STARTING)) {
293         throw new AssertionError JavaDoc("Attempt to get resent transaction IDs while not starting: " + status);
294       } else {
295         // Add list of TransactionIDs that are going to be resent
296
List JavaDoc toSend = batchAccounting.addIncompleteBatchIDsTo(new ArrayList JavaDoc());
297         for (Iterator JavaDoc i = toSend.iterator(); i.hasNext();) {
298           TxnBatchID id = (TxnBatchID) i.next();
299           ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches.get(id);
300           if (batch == null) throw new AssertionError JavaDoc("Unknown batch: " + id);
301           batch.addTransactionIDsTo(txIDs);
302         }
303       }
304       return txIDs;
305     }
306   }
307
308   private boolean isStoppingOrStopped() {
309     return status == STOP_INITIATED || status == STOPPED;
310   }
311
312   private void sendBatch(ClientTransactionBatch batchToSend, boolean account) {
313     synchronized (lock) {
314       if (account) {
315         if (incompleteBatches.put(batchToSend.getTransactionBatchID(), batchToSend) != null) {
316           // formatting
317
throw new AssertionError JavaDoc("Batch has already been sent!");
318         }
319         Collection JavaDoc txnIds = batchToSend.addTransactionIDsTo(new HashSet JavaDoc());
320         batchAccounting.addBatch(batchToSend.getTransactionBatchID(), txnIds);
321         batchToSend.addAcknowledgedTransactionIDs(batchAccounting.addCompletedTransactionIDsTo(new HashSet JavaDoc()));
322         batchAccounting.clearCompletedTransactionIds();
323       }
324       batchToSend.send();
325       outStandingBatches++;
326     }
327   }
328
329   public void receivedBatchAcknowledgement(TxnBatchID txnBatchID) {
330     synchronized (lock) {
331       if (status == STOP_INITIATED) {
332         logger.warn(status + " : Received ACK for batch = " + txnBatchID);
333         lock.notifyAll();
334         return;
335       }
336
337       waitUntilRunning();
338       outStandingBatches--;
339       sendBatches(false);
340       lock.notifyAll();
341     }
342   }
343
344   public void receivedAcknowledgement(SessionID sessionID, TransactionID txID) {
345     Map JavaDoc callbacks;
346     synchronized (lock) {
347       // waitUntilRunning();
348
if (!sessionManager.isCurrentSession(sessionID)) {
349         logger.warn("Ignoring Transaction ACK for " + txID + " from previous session = " + sessionID);
350         return;
351       }
352
353       Set JavaDoc completedLocks = lockAccounting.acknowledge(txID);
354
355       TxnBatchID container = batchAccounting.getBatchByTransactionID(txID);
356       if (!container.isNull()) {
357         ClientTransactionBatch containingBatch = (ClientTransactionBatch) incompleteBatches.get(container);
358         containingBatch.removeTransaction(txID);
359         TxnBatchID completed = batchAccounting.acknowledge(txID);
360         if (!completed.isNull()) {
361           incompleteBatches.remove(completed);
362           if (status == STOP_INITIATED && incompleteBatches.size() == 0) {
363             logger.debug("Received ACK for the last Transaction. Moving to STOPPED state.");
364             status = STOPPED;
365           }
366         }
367       } else {
368         logger.fatal("No batch found for acknowledgement: " + txID + " The batch accounting is " + batchAccounting);
369         throw new AssertionError JavaDoc("No batch found for acknowledgement: " + txID);
370       }
371       lock.notifyAll();
372       callbacks = getLockFlushCallbacks(completedLocks);
373     }
374     fireLockFlushCallbacks(callbacks);
375   }
376
377   /*
378    * Never fire callbacks while holding lock
379    */

380   private void fireLockFlushCallbacks(Map JavaDoc callbacks) {
381     if (callbacks.isEmpty()) return;
382     for (Iterator JavaDoc i = callbacks.entrySet().iterator(); i.hasNext();) {
383       Entry e = (Entry) i.next();
384       LockID lid = (LockID) e.getKey();
385       LockFlushCallback callback = (LockFlushCallback) e.getValue();
386       callback.transactionsForLockFlushed(lid);
387     }
388   }
389
390   private Map JavaDoc getLockFlushCallbacks(Set JavaDoc completedLocks) {
391     Map JavaDoc callbacks = Collections.EMPTY_MAP;
392     if (!completedLocks.isEmpty() && !lockFlushCallbacks.isEmpty()) {
393       for (Iterator JavaDoc i = completedLocks.iterator(); i.hasNext();) {
394         Object JavaDoc lid = i.next();
395         Object JavaDoc callback = lockFlushCallbacks.remove(lid);
396         if (callback != null) {
397           if (callbacks == Collections.EMPTY_MAP) {
398             callbacks = new HashMap JavaDoc();
399           }
400           callbacks.put(lid, callback);
401         }
402       }
403     }
404     return callbacks;
405   }
406
407   private void waitUntilRunning() {
408     boolean isInterrupted = false;
409     while (status != RUNNING) {
410       try {
411         lock.wait();
412       } catch (InterruptedException JavaDoc e) {
413         isInterrupted = true;
414       }
415     }
416     Util.selfInterruptIfNeeded(isInterrupted);
417   }
418
419   // This method exists so that both these (resending and unpausing) should happen in
420
// atomically or else there exists a race condition.
421
public void resendOutstandingAndUnpause() {
422     synchronized (lock) {
423       resendOutstanding();
424       unpause();
425     }
426   }
427 }
428
Popular Tags