KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > tx > TransactionalObjectManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.tx;
6
7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
8
9 import com.tc.logging.TCLogger;
10 import com.tc.logging.TCLogging;
11 import com.tc.net.protocol.tcm.ChannelID;
12 import com.tc.object.ObjectID;
13 import com.tc.object.tx.ServerTransactionID;
14 import com.tc.objectserver.api.ObjectManager;
15 import com.tc.objectserver.api.ObjectManagerLookupResults;
16 import com.tc.objectserver.context.ApplyTransactionContext;
17 import com.tc.objectserver.context.CommitTransactionContext;
18 import com.tc.objectserver.context.ObjectManagerResultsContext;
19 import com.tc.objectserver.context.RecallObjectsContext;
20 import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
21 import com.tc.properties.TCPropertiesImpl;
22 import com.tc.text.PrettyPrintable;
23 import com.tc.text.PrettyPrinter;
24 import com.tc.util.Assert;
25
26 import java.io.PrintWriter JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.Collection JavaDoc;
29 import java.util.Collections JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.HashSet JavaDoc;
32 import java.util.IdentityHashMap JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.LinkedHashMap JavaDoc;
35 import java.util.List JavaDoc;
36 import java.util.Map JavaDoc;
37 import java.util.Set JavaDoc;
38 import java.util.Map.Entry;
39
40 /**
41  * This class keeps track of locally checked out objects for applys and maintain the objects to txnid mapping in the
42  * server. It wraps calls going to object manager from lookup, apply, commit stages
43  */

44 public class TransactionalObjectManagerImpl implements TransactionalObjectManager, PrettyPrintable {
45
46   private static final TCLogger logger = TCLogging
47                                                                            .getLogger(TransactionalObjectManagerImpl.class);
48   private static final int MAX_COMMIT_SIZE = TCPropertiesImpl
49                                                                            .getProperties()
50                                                                            .getInt(
51                                                                                    "l2.objectmanager.maxObjectsToCommit");
52   private final ObjectManager objectManager;
53   private final TransactionSequencer sequencer;
54   private final ServerGlobalTransactionManager gtxm;
55
56   private final Object JavaDoc completedTxnIdsLock = new Object JavaDoc();
57   private Set JavaDoc completedTxnIDs = new HashSet JavaDoc();
58
59   /*
60    * This map contains ObjectIDs to TxnObjectGrouping that contains these objects
61    */

62   private final Map JavaDoc checkedOutObjects = new HashMap JavaDoc();
63   private final Map JavaDoc applyPendingTxns = new HashMap JavaDoc();
64   private final LinkedHashMap JavaDoc commitPendingTxns = new LinkedHashMap JavaDoc();
65
66   private final Set JavaDoc pendingObjectRequest = new HashSet JavaDoc();
67   private final PendingList pendingTxnList = new PendingList();
68   private final LinkedQueue processedPendingLookups = new LinkedQueue();
69   private final LinkedQueue processedApplys = new LinkedQueue();
70
71   private final TransactionalStageCoordinator txnStageCoordinator;
72
73   public TransactionalObjectManagerImpl(ObjectManager objectManager, TransactionSequencer sequencer,
74                                         ServerGlobalTransactionManager gtxm,
75                                         TransactionalStageCoordinator txnStageCoordinator) {
76     this.objectManager = objectManager;
77     this.sequencer = sequencer;
78     this.gtxm = gtxm;
79     this.txnStageCoordinator = txnStageCoordinator;
80   }
81
82   // ProcessTransactionHandler Method
83
public void addTransactions(ChannelID channelID, List JavaDoc txns, Collection JavaDoc completedTxnIds) {
84     sequencer.addTransactions(txns);
85     addCompletedTxnIds(completedTxnIds);
86     txnStageCoordinator.initiateLookup();
87   }
88
89   private void addCompletedTxnIds(Collection JavaDoc txnIds) {
90     synchronized (completedTxnIdsLock) {
91       completedTxnIDs.addAll(txnIds);
92     }
93   }
94
95   private Set JavaDoc getCompletedTxnIds() {
96     synchronized (completedTxnIdsLock) {
97       Set JavaDoc toRet = completedTxnIDs;
98       completedTxnIDs = new HashSet JavaDoc();
99       return toRet;
100     }
101   }
102
103   // LookupHandler Method
104
public void lookupObjectsForTransactions() {
105     processPendingIfNecessary();
106     while (true) {
107       ServerTransaction txn = sequencer.getNextTxnToProcess();
108       if (txn == null) break;
109       ServerTransactionID stxID = txn.getServerTransactionID();
110       if (gtxm.needsApply(stxID)) {
111         lookupObjectsForApplyAndAddToSink(txn, true);
112       } else {
113         // These txns are already applied, hence just sending it to the next stage.
114
txnStageCoordinator.addToApplyStage(new ApplyTransactionContext(txn, Collections.EMPTY_MAP));
115       }
116     }
117   }
118
119   private synchronized void processPendingIfNecessary() {
120     if (addProcessedPendingLookups()) {
121       processPendingTransactions();
122     }
123   }
124
125   public synchronized void lookupObjectsForApplyAndAddToSink(ServerTransaction txn, boolean newTxn) {
126     Collection JavaDoc oids = txn.getObjectIDs();
127     // log("lookupObjectsForApplyAndAddToSink(): START : " + txn.getServerTransactionID() + " : " + oids);
128
Set JavaDoc newRequests = new HashSet JavaDoc();
129     boolean makePending = false;
130     for (Iterator JavaDoc i = oids.iterator(); i.hasNext();) {
131       ObjectID oid = (ObjectID) i.next();
132       TxnObjectGrouping tog;
133       if (pendingObjectRequest.contains(oid)) {
134         makePending = true;
135       } else if ((tog = (TxnObjectGrouping) checkedOutObjects.get(oid)) == null) {
136         // 1) Object is not already checked out or
137
newRequests.add(oid);
138       } else if (tog.limitReached()) {
139         // 2) the object is available, but we dont use it to prevent huge commits, large txn acks etc
140
newRequests.add(oid);
141         // log(shortDescription());
142
// log("Limit Reached. " + oid + " - " + tog.shortDescription());
143
}
144     }
145     // TODO:: make cache and stats right
146
LookupContext lookupContext = null;
147     if (!newRequests.isEmpty()) {
148       lookupContext = new LookupContext(newRequests, (newTxn ? txn.getNewObjectIDs() : Collections.EMPTY_SET));
149       if (objectManager.lookupObjectsForCreateIfNecessary(txn.getChannelID(), lookupContext)) {
150         addLookedupObjects(lookupContext);
151       } else {
152         // New request went pending in object manager
153
// log("lookupObjectsForApplyAndAddToSink(): New Request went pending : " + newRequests);
154
makePending = true;
155         pendingObjectRequest.addAll(newRequests);
156       }
157     }
158     if (makePending) {
159       // log("lookupObjectsForApplyAndAddToSink(): Make Pending : " + txn.getServerTransactionID());
160
makePending(txn);
161       if (lookupContext != null) lookupContext.makePending();
162     } else {
163       ServerTransactionID txnID = txn.getServerTransactionID();
164       TxnObjectGrouping newGrouping = new TxnObjectGrouping(txnID, txn.getNewRoots());
165       mergeTransactionGroupings(oids, newGrouping);
166       applyPendingTxns.put(txnID, newGrouping);
167       txnStageCoordinator.addToApplyStage(new ApplyTransactionContext(txn, getRequiredObjectsMap(oids, newGrouping
168           .getObjects())));
169       makeUnpending(txn);
170       // log("lookupObjectsForApplyAndAddToSink(): Success: " + txn.getServerTransactionID());
171
}
172   }
173
174   public String JavaDoc shortDescription() {
175     return "TxnObjectManager : checked Out count = " + checkedOutObjects.size() + " apply pending txn = "
176            + applyPendingTxns.size() + " commit pending = " + commitPendingTxns.size() + " pending txns = "
177            + pendingTxnList.size() + " pending object requests = " + pendingObjectRequest.size();
178   }
179
180   private Map JavaDoc getRequiredObjectsMap(Collection JavaDoc oids, Map JavaDoc objects) {
181     HashMap JavaDoc map = new HashMap JavaDoc(oids.size());
182     for (Iterator JavaDoc i = oids.iterator(); i.hasNext();) {
183       Object JavaDoc oid = i.next();
184       Object JavaDoc mo = objects.get(oid);
185       if (mo == null) {
186         dump();
187         log("NULL !! " + oid + " not found ! " + oids);
188         log("Map contains " + objects);
189         throw new AssertionError JavaDoc("Object is NULL !! : " + oid);
190       }
191       map.put(oid, mo);
192     }
193     return map;
194   }
195
196   private void log(String JavaDoc message) {
197     logger.info(message);
198   }
199
200   // This method written to be optimized to perform large merges fast. Hence the code flow might not
201
// look natural.
202
private void mergeTransactionGroupings(Collection JavaDoc oids, TxnObjectGrouping newGrouping) {
203     long start = System.currentTimeMillis();
204     for (Iterator JavaDoc i = oids.iterator(); i.hasNext();) {
205       ObjectID oid = (ObjectID) i.next();
206       TxnObjectGrouping oldGrouping = (TxnObjectGrouping) checkedOutObjects.get(oid);
207       if (oldGrouping == null) {
208         throw new AssertionError JavaDoc("Transaction Grouping for lookedup objects is Null !! " + oid);
209       } else if (oldGrouping != newGrouping && oldGrouping.isActive()) {
210         ServerTransactionID oldTxnId = oldGrouping.getServerTransactionID();
211         // This merge has a sideeffect of setting all reference contained in oldGrouping to null.
212
newGrouping.merge(oldGrouping);
213         commitPendingTxns.remove(oldTxnId);
214       }
215     }
216     for (Iterator JavaDoc j = newGrouping.getObjects().keySet().iterator(); j.hasNext();) {
217       checkedOutObjects.put(j.next(), newGrouping);
218     }
219     for (Iterator JavaDoc j = newGrouping.getApplyPendingTxnsIterator(); j.hasNext();) {
220       ServerTransactionID oldTxnId = (ServerTransactionID) j.next();
221       if (applyPendingTxns.containsKey(oldTxnId)) {
222         applyPendingTxns.put(oldTxnId, newGrouping);
223       }
224     }
225     long timeTaken = System.currentTimeMillis() - start;
226     if (timeTaken > 500) {
227       log("Merged " + oids.size() + " object into " + newGrouping.shortDescription() + " in " + timeTaken + " ms");
228     }
229   }
230
231   private synchronized void addLookedupObjects(LookupContext context) {
232     Map JavaDoc lookedUpObjects = context.getLookedUpObjects();
233     Assert.assertTrue(lookedUpObjects != null && lookedUpObjects.size() > 0);
234     TxnObjectGrouping tg = new TxnObjectGrouping(lookedUpObjects);
235     for (Iterator JavaDoc i = lookedUpObjects.keySet().iterator(); i.hasNext();) {
236       Object JavaDoc oid = i.next();
237       pendingObjectRequest.remove(oid);
238       checkedOutObjects.put(oid, tg);
239     }
240   }
241
242   private void makePending(ServerTransaction txn) {
243     if (pendingTxnList.add(txn)) {
244       sequencer.makePending(txn);
245     }
246   }
247
248   private void makeUnpending(ServerTransaction txn) {
249     if (pendingTxnList.remove(txn)) {
250       sequencer.makeUnpending(txn);
251     }
252   }
253
254   private boolean addProcessedPendingLookups() {
255     LookupContext c;
256     boolean processedPending = false;
257     try {
258       while ((c = (LookupContext) processedPendingLookups.poll(0)) != null) {
259         addLookedupObjects(c);
260         processedPending = true;
261       }
262     } catch (InterruptedException JavaDoc e) {
263       throw new AssertionError JavaDoc(e);
264     }
265     return processedPending;
266   }
267
268   private void addProcessedPending(LookupContext context) {
269     try {
270       processedPendingLookups.put(context);
271     } catch (InterruptedException JavaDoc e) {
272       throw new AssertionError JavaDoc(e);
273     }
274     txnStageCoordinator.initiateLookup();
275   }
276
277   private void processPendingTransactions() {
278     List JavaDoc copy = pendingTxnList.copy();
279     for (Iterator JavaDoc i = copy.iterator(); i.hasNext();) {
280       ServerTransaction txn = (ServerTransaction) i.next();
281       lookupObjectsForApplyAndAddToSink(txn, false);
282     }
283   }
284
285   // ApplyTransaction stage method
286
public boolean applyTransactionComplete(ServerTransactionID stxnID) {
287     try {
288       processedApplys.put(stxnID);
289     } catch (InterruptedException JavaDoc e) {
290       throw new AssertionError JavaDoc(e);
291     }
292     txnStageCoordinator.initiateApplyComplete();
293     return true;
294   }
295
296   // Apply Complete stage method
297
public void processApplyComplete() {
298     try {
299       ServerTransactionID txnID;
300       ArrayList JavaDoc txnIDs = new ArrayList JavaDoc();
301       while ((txnID = (ServerTransactionID) processedApplys.poll(0)) != null) {
302         txnIDs.add(txnID);
303       }
304       if (txnIDs.size() > 0) {
305         processApplyTxnComplete(txnIDs);
306       }
307     } catch (InterruptedException JavaDoc e) {
308       throw new AssertionError JavaDoc(e);
309     }
310   }
311
312   private synchronized void processApplyTxnComplete(ArrayList JavaDoc txnIDs) {
313     for (Iterator JavaDoc i = txnIDs.iterator(); i.hasNext();) {
314       ServerTransactionID stxnID = (ServerTransactionID) i.next();
315       processApplyTxnComplete(stxnID);
316     }
317   }
318
319   private void processApplyTxnComplete(ServerTransactionID stxnID) {
320     TxnObjectGrouping grouping = (TxnObjectGrouping) applyPendingTxns.remove(stxnID);
321     Assert.assertNotNull(grouping);
322     if (grouping.applyComplete(stxnID)) {
323       // Since verifying against all txns is costly, only the prime one (the one that created this grouping) is verfied
324
// against
325
ServerTransactionID pTxnID = grouping.getServerTransactionID();
326       Assert.assertNull(applyPendingTxns.get(pTxnID));
327       Object JavaDoc old = commitPendingTxns.put(pTxnID, grouping);
328       Assert.assertNull(old);
329       txnStageCoordinator.initiateCommit();
330     }
331   }
332
333   // Commit Transaction stage method
334
public synchronized void commitTransactionsComplete(CommitTransactionContext ctc) {
335
336     if (commitPendingTxns.isEmpty()) return;
337
338     Map JavaDoc newRoots = new HashMap JavaDoc();
339     Map JavaDoc objects = new HashMap JavaDoc();
340     Collection JavaDoc txnIDs = new ArrayList JavaDoc();
341     for (Iterator JavaDoc i = commitPendingTxns.values().iterator(); i.hasNext();) {
342       TxnObjectGrouping tog = (TxnObjectGrouping) i.next();
343       newRoots.putAll(tog.getNewRoots());
344       txnIDs.addAll(tog.getTxnIDs());
345       objects.putAll(tog.getObjects());
346       i.remove();
347       if (objects.size() > MAX_COMMIT_SIZE) {
348         break;
349       }
350     }
351
352     ctc.initialize(txnIDs, objects.values(), newRoots, getCompletedTxnIds());
353
354     for (Iterator JavaDoc j = objects.keySet().iterator(); j.hasNext();) {
355       Object JavaDoc old = checkedOutObjects.remove(j.next());
356       Assert.assertNotNull(old);
357     }
358
359     if (!commitPendingTxns.isEmpty()) {
360       // More commits needed
361
txnStageCoordinator.initiateCommit();
362     }
363   }
364
365   // recall from ObjectManager on GC start
366
public void recallAllCheckedoutObject() {
367     txnStageCoordinator.initiateRecallAll();
368   }
369
370   // Recall Stage method
371
public synchronized void recallCheckedoutObject(RecallObjectsContext roc) {
372     processPendingIfNecessary();
373     if (roc.recallAll()) {
374       IdentityHashMap JavaDoc recalled = new IdentityHashMap JavaDoc();
375       HashMap JavaDoc recalledObjects = new HashMap JavaDoc();
376       for (Iterator JavaDoc i = checkedOutObjects.entrySet().iterator(); i.hasNext();) {
377         Entry e = (Entry) i.next();
378         TxnObjectGrouping tog = (TxnObjectGrouping) e.getValue();
379         if (tog.getServerTransactionID().isNull()) {
380           i.remove();
381           if (!recalled.containsKey(tog)) {
382             recalled.put(tog, null);
383             recalledObjects.putAll(tog.getObjects());
384           }
385         }
386       }
387       if (!recalledObjects.isEmpty()) {
388         logger.info("Recalling " + recalledObjects.size() + " Objects to ObjectManager");
389         objectManager.releaseAll(recalledObjects.values());
390       }
391     }
392   }
393
394   public void dump() {
395     PrintWriter JavaDoc pw = new PrintWriter JavaDoc(System.err);
396     new PrettyPrinter(pw).visit(this);
397     pw.flush();
398   }
399
400   public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) {
401     out.println(getClass().getName());
402     out.indent().print("checkedOutObjects: ").visit(checkedOutObjects).println();
403     out.indent().print("applyPendingTxns: ").visit(applyPendingTxns).println();
404     out.indent().print("commitPendingTxns: ").visit(commitPendingTxns).println();
405     out.indent().print("pendingTxnList: ").visit(pendingTxnList).println();
406     out.indent().print("pendingObjectRequest: ").visit(pendingObjectRequest).println();
407     return out;
408   }
409
410   private class LookupContext implements ObjectManagerResultsContext {
411
412     private boolean pending = false;
413     private boolean resultsSet = false;
414     private Map JavaDoc lookedUpObjects;
415     private final Set JavaDoc oids;
416     private final Set JavaDoc newOids;
417
418     public LookupContext(Set JavaDoc oids, Set JavaDoc newOids) {
419       this.oids = oids;
420       this.newOids = newOids;
421     }
422
423     public synchronized void makePending() {
424       pending = true;
425       if (resultsSet) {
426         TransactionalObjectManagerImpl.this.addProcessedPending(this);
427       }
428     }
429
430     public synchronized void setResults(ObjectManagerLookupResults results) {
431       lookedUpObjects = results.getObjects();
432       resultsSet = true;
433       if (pending) {
434         TransactionalObjectManagerImpl.this.addProcessedPending(this);
435       }
436     }
437
438     public synchronized Map JavaDoc getLookedUpObjects() {
439       return lookedUpObjects;
440     }
441
442     public String JavaDoc toString() {
443       return "LookupContext [ " + oids + "] = { pending = " + pending + ", lookedupObjects = "
444              + lookedUpObjects.keySet() + "}";
445     }
446
447     public Set JavaDoc getLookupIDs() {
448       return oids;
449     }
450
451     public Set JavaDoc getNewObjectIDs() {
452       return newOids;
453     }
454
455   }
456
457   private static final class PendingList {
458     LinkedHashMap JavaDoc pending = new LinkedHashMap JavaDoc();
459
460     public boolean add(ServerTransaction txn) {
461       ServerTransactionID sTxID = txn.getServerTransactionID();
462       // Doing two lookups to avoid reordering
463
if (pending.containsKey(sTxID)) {
464         return false;
465       } else {
466         pending.put(sTxID, txn);
467         return true;
468       }
469     }
470
471     public List JavaDoc copy() {
472       return new ArrayList JavaDoc(pending.values());
473     }
474
475     public boolean remove(ServerTransaction txn) {
476       return (pending.remove(txn.getServerTransactionID()) != null);
477     }
478
479     public String JavaDoc toString() {
480       return "PendingList : pending Txns = " + pending;
481     }
482
483     public int size() {
484       return pending.size();
485     }
486   }
487 }
488
Popular Tags