KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > impl > ObjectManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.impl;
6
7 import com.tc.async.api.Sink;
8 import com.tc.exception.TCRuntimeException;
9 import com.tc.logging.TCLogger;
10 import com.tc.logging.TCLogging;
11 import com.tc.management.beans.object.ObjectManagementMonitor;
12 import com.tc.management.beans.object.ObjectManagementMonitorMBean;
13 import com.tc.management.beans.object.ObjectManagementMonitor.GCComptroller;
14 import com.tc.net.protocol.tcm.ChannelID;
15 import com.tc.object.ObjectID;
16 import com.tc.object.cache.CacheStats;
17 import com.tc.object.cache.Evictable;
18 import com.tc.object.cache.EvictionPolicy;
19 import com.tc.objectserver.api.GCStats;
20 import com.tc.objectserver.api.NoSuchObjectException;
21 import com.tc.objectserver.api.ObjectManager;
22 import com.tc.objectserver.api.ObjectManagerEventListener;
23 import com.tc.objectserver.api.ObjectManagerLookupResults;
24 import com.tc.objectserver.api.ObjectManagerMBean;
25 import com.tc.objectserver.api.ObjectManagerStatsListener;
26 import com.tc.objectserver.api.ShutdownError;
27 import com.tc.objectserver.context.ManagedObjectFaultingContext;
28 import com.tc.objectserver.context.ManagedObjectFlushingContext;
29 import com.tc.objectserver.context.ObjectManagerResultsContext;
30 import com.tc.objectserver.core.api.GarbageCollector;
31 import com.tc.objectserver.core.api.ManagedObject;
32 import com.tc.objectserver.core.impl.NullGarbageCollector;
33 import com.tc.objectserver.l1.api.ClientStateManager;
34 import com.tc.objectserver.managedobject.ManagedObjectChangeListener;
35 import com.tc.objectserver.managedobject.ManagedObjectImpl;
36 import com.tc.objectserver.managedobject.ManagedObjectTraverser;
37 import com.tc.objectserver.mgmt.ManagedObjectFacade;
38 import com.tc.objectserver.persistence.api.ManagedObjectStore;
39 import com.tc.objectserver.persistence.api.PersistenceTransaction;
40 import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
41 import com.tc.objectserver.tx.NullTransactionalObjectManager;
42 import com.tc.objectserver.tx.TransactionalObjectManager;
43 import com.tc.objectserver.tx.TransactionalObjectManagerImpl;
44 import com.tc.properties.TCPropertiesImpl;
45 import com.tc.text.PrettyPrinter;
46 import com.tc.util.Assert;
47 import com.tc.util.Counter;
48 import com.tc.util.SyncObjectIdSet;
49 import com.tc.util.concurrent.StoppableThread;
50
51 import gnu.trove.THashSet;
52
53 import java.io.PrintWriter JavaDoc;
54 import java.util.ArrayList JavaDoc;
55 import java.util.Collection JavaDoc;
56 import java.util.Collections JavaDoc;
57 import java.util.HashMap JavaDoc;
58 import java.util.HashSet JavaDoc;
59 import java.util.Iterator JavaDoc;
60 import java.util.LinkedList JavaDoc;
61 import java.util.List JavaDoc;
62 import java.util.Map JavaDoc;
63 import java.util.Set JavaDoc;
64
65 /**
66  * Manages access to all the Managed objects in the system.
67  *
68  * @author steve
69  */

70 public class ObjectManagerImpl implements ObjectManager, ManagedObjectChangeListener, ObjectManagerMBean, Evictable {
71
72   private static final TCLogger logger = TCLogging.getLogger(ObjectManager.class);
73
74   private static final byte DEFAULT_FLAG = 0x00;
75   private static final byte MISSING_OK = 0x01;
76   private static final byte NEW_REQUEST = 0x02;
77   private static final byte REMOVE_ON_RELEASE = 0x04;
78
79   private static final int MAX_COMMIT_SIZE = TCPropertiesImpl
80                                                                             .getProperties()
81                                                                             .getInt(
82                                                                                     "l2.objectmanager.maxObjectsToCommit");
83   // XXX:: Should go to property file
84
private static final int INITIAL_SET_SIZE = 1;
85   private static final float LOAD_FACTOR = 0.75f;
86   private static final int MAX_LOOKUP_OBJECTS_COUNT = 5000;
87   private static final long REMOVE_THRESHOLD = 300;
88
89   private final ManagedObjectStore objectStore;
90   private final Map JavaDoc references;
91   private final EvictionPolicy evictionPolicy;
92
93   private GarbageCollector collector = new NullGarbageCollector();
94   private int checkedOutCount = 0;
95   private PendingList pending = new PendingList();
96   private Counter flushCount = new Counter();
97
98   private volatile boolean inShutdown = false;
99
100   private ClientStateManager stateManager;
101   private final ObjectManagerConfig config;
102   private final ThreadGroup JavaDoc gcThreadGroup;
103   private ObjectManagerStatsListener stats = new NullObjectManagerStatsListener();
104   private final PersistenceTransactionProvider persistenceTransactionProvider;
105   private final Sink faultSink;
106   private final Sink flushSink;
107   private final ObjectManagementMonitorMBean objectManagementMonitor;
108   private TransactionalObjectManager txnObjectMgr = new NullTransactionalObjectManager();
109
110   public ObjectManagerImpl(ObjectManagerConfig config, ThreadGroup JavaDoc gcThreadGroup, ClientStateManager stateManager,
111                            ManagedObjectStore objectStore, EvictionPolicy cache,
112                            PersistenceTransactionProvider persistenceTransactionProvider, Sink faultSink,
113                            Sink flushSink, ObjectManagementMonitorMBean objectManagementMonitor) {
114     this.faultSink = faultSink;
115     this.flushSink = flushSink;
116     Assert.assertNotNull(objectStore);
117     this.config = config;
118     this.gcThreadGroup = gcThreadGroup;
119     this.stateManager = stateManager;
120     this.objectStore = objectStore;
121     this.evictionPolicy = cache;
122     this.persistenceTransactionProvider = persistenceTransactionProvider;
123     this.references = new HashMap JavaDoc(10000);
124     this.objectManagementMonitor = objectManagementMonitor;
125
126     final boolean doGC = config.doGC();
127     if (this.objectManagementMonitor instanceof ObjectManagementMonitor) {
128       ((ObjectManagementMonitor) this.objectManagementMonitor).registerGCController(new GCComptroller() {
129         public void startGC() {
130           gc();
131         }
132
133         public boolean gcEnabledInConfig() {
134           return doGC;
135         }
136       });
137     }
138   }
139
140   public void setTransactionalObjectManager(TransactionalObjectManagerImpl txnObjectManager) {
141     this.txnObjectMgr = txnObjectManager;
142   }
143
144   public void setStatsListener(ObjectManagerStatsListener statsListener) {
145     this.stats = statsListener;
146   }
147
148   public void start() {
149     collector.start();
150   }
151
152   public synchronized void stop() {
153     this.inShutdown = true;
154
155     collector.stop();
156
157     // flush the cache to stable persistence.
158
Set toFlush = new HashSet();
159     for (Iterator JavaDoc i = references.values().iterator(); i.hasNext();) {
160       ManagedObject obj = ((ManagedObjectReference) i.next()).getObject();
161       if (!obj.isNew()) toFlush.add(obj);
162     }
163     PersistenceTransaction tx = newTransaction();
164     flushAll(tx, toFlush);
165     tx.commit();
166   }
167
168   public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) {
169     out.println(getClass().getName());
170     out.indent().print("roots: ").println(getRoots());
171     out.indent().print("collector: ").visit(collector).println();
172     out.indent().print("references: ").visit(references).println();
173
174     out.indent().println("checkedOutCount: " + checkedOutCount);
175     out.indent().print("pending: ").visit(pending).println();
176
177     out.indent().print("objectStore: ").duplicateAndIndent().visit(objectStore).println();
178     out.indent().print("stateManager: ").duplicateAndIndent().visit(stateManager).println();
179     return out;
180   }
181
182   public void addListener(ObjectManagerEventListener listener) {
183     if (listener == null) { throw new NullPointerException JavaDoc("cannot add a null event listener"); }
184     collector.addListener(listener);
185   }
186
187   public GCStats[] getGarbageCollectorStats() {
188     return collector.getGarbageCollectorStats();
189   }
190
191   public ObjectID lookupRootID(String JavaDoc name) {
192     syncAssertNotInShutdown();
193     return objectStore.getRootID(name);
194   }
195
196   public boolean lookupObjectsAndSubObjectsFor(ChannelID channelID, ObjectManagerResultsContext responseContext,
197                                                int maxReachableObjects) {
198     // maxReachableObjects is atleast 1 so that addReachableObjectsIfNecessary does the right thing
199
return lookupObjectsForOptionallyCreate(channelID, responseContext, maxReachableObjects <= 0 ? 1
200         : maxReachableObjects);
201   }
202
203   public boolean lookupObjectsForCreateIfNecessary(ChannelID channelID, ObjectManagerResultsContext responseContext) {
204     return lookupObjectsForOptionallyCreate(channelID, responseContext, -1);
205   }
206
207   private synchronized boolean lookupObjectsForOptionallyCreate(ChannelID channelID,
208                                                                 ObjectManagerResultsContext responseContext,
209                                                                 int maxReachableObjects) {
210     syncAssertNotInShutdown();
211
212     if (collector.isPausingOrPaused()) {
213       // XXX:: since we are making pending without trying to lookup, cache hit count might be skewed
214
makePending(channelID, new ObjectManagerLookupContext(responseContext), maxReachableObjects)
215           .setProcessPending(true);
216       return false;
217     }
218     return basicLookupObjectsFor(channelID, new ObjectManagerLookupContext(responseContext), maxReachableObjects);
219   }
220
221   public Iterator JavaDoc getRoots() {
222     syncAssertNotInShutdown();
223     return objectStore.getRoots().iterator();
224   }
225
226   public Iterator JavaDoc getRootNames() {
227     syncAssertNotInShutdown();
228     return objectStore.getRootNames().iterator();
229   }
230
231   /**
232    * For management use only (see interface documentation)
233    */

234   public ManagedObjectFacade lookupFacade(ObjectID id, int limit) throws NoSuchObjectException {
235     final ManagedObject object = lookup(id, true);
236     if (object == null) { throw new NoSuchObjectException(id); }
237
238     try {
239       return object.createFacade(limit);
240     } finally {
241       release(persistenceTransactionProvider.nullTransaction(), object);
242     }
243   }
244
245   private synchronized ManagedObject lookup(ObjectID id, boolean missingOk) {
246     syncAssertNotInShutdown();
247
248     ManagedObjectReference mor = null;
249     try {
250       while (true) {
251         mor = getOrLookupReference(id, (missingOk ? REMOVE_ON_RELEASE | MISSING_OK : REMOVE_ON_RELEASE));
252         if (mor == null) {
253           Assert.assertTrue(missingOk);
254           return null;
255         }
256
257         if (mor.isReferenced()) {
258           wait();
259         } else {
260           markReferenced(mor);
261           break;
262         }
263       }
264     } catch (InterruptedException JavaDoc ie) {
265       Assert.eval(false);
266     }
267     return mor.getObject();
268   }
269
270   public ManagedObject getObjectByID(ObjectID id) {
271     return lookup(id, false);
272   }
273
274   private void markReferenced(ManagedObjectReference reference) {
275     if (reference.isReferenced()) { throw new AssertionError JavaDoc("Attempt to mark an already referenced object: "
276                                                              + reference); }
277     reference.markReference();
278     checkedOutCount++;
279   }
280
281   private void unmarkReferenced(ManagedObjectReference reference) {
282     if (!reference.isReferenced()) { throw new AssertionError JavaDoc("Attempt to unmark an unreferenced object: " + reference); }
283     reference.unmarkReference();
284     checkedOutCount--;
285   }
286
287   /**
288    * Retrieves materialized references.
289    */

290   private ManagedObjectReference getReference(ObjectID id) {
291     return (ManagedObjectReference) references.get(id);
292   }
293
294   /**
295    * Retrieves materialized references-- if not materialized, will initiate a request to materialize them from the
296    * object store.
297    */

298   private ManagedObjectReference getOrLookupReference(ObjectID id, byte flags) {
299     ManagedObjectReference rv = getReference(id);
300
301     if (rv == null) {
302       // Request Faulting in a different stage and give back a "Referenced" Proxy
303
ManagedObjectFaultingContext mofc = new ManagedObjectFaultingContext(id, isRemoveOnRelease(flags),
304                                                                            isMissingOkay(flags));
305       faultSink.add(mofc);
306
307       // don't account for a cache "miss" unless this was a real request
308
// originating from a client
309
stats.cacheMiss();
310       rv = addNewReference(new FaultingManagedObjectReference(id));
311     } else if (rv instanceof FaultingManagedObjectReference) {
312       // Check to see if the retrieve was complete and the Object is missing
313
FaultingManagedObjectReference fmr = (FaultingManagedObjectReference) rv;
314       if (!fmr.isFaultingInProgress()) {
315         references.remove(id);
316         if (isMissingOkay(flags)) { return null; }
317         throw new AssertionError JavaDoc("Request for a non-existent object: " + id);
318       }
319       if (isNewRequest(flags)) stats.cacheMiss();
320     } else {
321       if (isNewRequest(flags)) stats.cacheHit();
322       if (!isRemoveOnRelease(flags)) {
323         if (rv.isRemoveOnRelease()) {
324           // This Object is faulted in by GC or Management interface with removeOnRelease = true, but before they got a
325
// chance to grab it, a regular request for object is received. Take corrective action.
326
rv.setRemoveOnRelease(false);
327           evictionPolicy.add(rv);
328         } else {
329           evictionPolicy.markReferenced(rv);
330         }
331       }
332     }
333
334     return rv;
335   }
336
337   private boolean isNewRequest(byte flags) {
338     return ((flags & NEW_REQUEST) != 0x00);
339   }
340
341   private boolean isMissingOkay(byte flags) {
342     return ((flags & MISSING_OK) != 0x00);
343   }
344
345   private boolean isRemoveOnRelease(byte flags) {
346     return ((flags & REMOVE_ON_RELEASE) != 0x00);
347   }
348
349   public synchronized void addFaultedObject(ObjectID oid, ManagedObject mo, boolean removeOnRelease) {
350     FaultingManagedObjectReference fmor;
351     if (mo == null) {
352       ManagedObjectReference mor = (ManagedObjectReference) references.get(oid);
353       if (mor == null || !(mor instanceof FaultingManagedObjectReference) || !oid.equals(mor.getObjectID())) {
354         // Format
355
throw new AssertionError JavaDoc("ManagedObjectReference is not what was expected : " + mor + " oid : " + oid);
356       }
357       fmor = (FaultingManagedObjectReference) mor;
358       fmor.faultingComplete();
359     } else {
360       Assert.assertEquals(oid, mo.getID());
361       ManagedObjectReference mor = (ManagedObjectReference) references.remove(oid);
362       if (mor == null || !(mor instanceof FaultingManagedObjectReference) || !oid.equals(mor.getObjectID())) {
363         // Format
364
throw new AssertionError JavaDoc("ManagedObjectReference is not what was expected : " + mor + " oid : " + oid);
365       }
366       fmor = (FaultingManagedObjectReference) mor;
367       addNewReference(mo, removeOnRelease);
368     }
369     postRelease(fmor.getProcessPendingOnRelease());
370   }
371
372   private ManagedObjectReference addNewReference(ManagedObject obj, boolean isRemoveOnRelease) throws AssertionError JavaDoc {
373     ManagedObjectReference newReference = obj.getReference();
374     newReference.setRemoveOnRelease(isRemoveOnRelease);
375
376     return addNewReference(newReference);
377   }
378
379   private ManagedObjectReference addNewReference(ManagedObjectReference newReference) {
380     Assert.assertNull(references.put(newReference.getObjectID(), newReference));
381     Assert.assertTrue(newReference.getNext() == null && newReference.getPrevious() == null);
382
383     if (!newReference.isRemoveOnRelease()) {
384       evictionPolicy.add(newReference);
385     }
386     return newReference;
387   }
388
389   private synchronized void reapCache(Collection JavaDoc removalCandidates, Collection JavaDoc toFlush, Collection JavaDoc removedObjects) {
390     while (collector.isPausingOrPaused()) {
391       try {
392         this.wait();
393       } catch (InterruptedException JavaDoc e) {
394         logger.error(e);
395       }
396     }
397     for (Iterator JavaDoc i = removalCandidates.iterator(); i.hasNext();) {
398       ManagedObjectReference removalCandidate = (ManagedObjectReference) i.next();
399       if (removalCandidate != null && !removalCandidate.isReferenced() && !removalCandidate.isNew()) {
400         evictionPolicy.remove(removalCandidate);
401         if (removalCandidate.getObject().isDirty()) {
402           markReferenced(removalCandidate);
403           toFlush.add(removalCandidate.getObject());
404         } else {
405           // paranoid mode or the object is not dirty - just remove from reference
406
removedObjects.add(references.remove(removalCandidate.getObjectID()));
407         }
408       }
409     }
410   }
411
412   private void evicted(Collection JavaDoc managedObjects) {
413     boolean processPendingOnRelease = false;
414     synchronized (this) {
415       checkedOutCount -= managedObjects.size();
416       for (Iterator JavaDoc i = managedObjects.iterator(); i.hasNext();) {
417         ManagedObject mo = (ManagedObject) i.next();
418         Object JavaDoc o = references.remove(mo.getID());
419         if (o == null) {
420           logger.warn("Object ID : " + mo.getID()
421                       + " was mapped to null but should have been mapped to a reference of " + mo);
422         } else {
423           ManagedObjectReference ref = (ManagedObjectReference) o;
424           if (ref.getProcessPendingOnRelease()) {
425             processPendingOnRelease = true;
426             ref.unmarkReference();
427             addNewReference(mo, ref.isRemoveOnRelease());
428             i.remove();
429           }
430         }
431       }
432       postRelease(processPendingOnRelease);
433     }
434
435   }
436
437   // Called from within Sync blocks only.
438
private boolean basicLookupObjectsFor(ChannelID channelID, ObjectManagerLookupContext context, int maxReachableObjects) {
439     Set objects = createNewSet();
440
441     createNewObjectsIfNecessary(context);
442
443     boolean processPending = false;
444     boolean available = true;
445     Set ids = context.getLookupIDs();
446     for (Iterator JavaDoc i = ids.iterator(); i.hasNext();) {
447       ObjectID id = (ObjectID) i.next();
448       // We dont check available flag before doing calling getOrLookupReference() for two reasons.
449
// 1) To get the right hit/miss count and
450
// 2) to Fault objects that are not available
451
ManagedObjectReference reference = getOrLookupReference(id, (context.isPendingRequest() ? DEFAULT_FLAG
452           : NEW_REQUEST));
453       if (available && reference.isReferenced()) {
454         available = false;
455         // Setting only the first referenced object to process Pending. If objects are being faulted in, then this
456
// will
457
// ensure that we dont run processPending multiple times unnecessarily.
458
reference.setProcessPendingOnRelease(true);
459       }
460
461       if (reference == null) throw new AssertionError JavaDoc("ManagedObjectReference is null");
462       objects.add(reference);
463     }
464
465     if (available) {
466       Set processLater = addReachableObjectsIfNecessary(channelID, maxReachableObjects, objects);
467       ObjectManagerLookupResults results = new ObjectManagerLookupResultsImpl(processObjectsRequest(objects),
468                                                                               processLater);
469       context.setResults(results);
470       return true;
471     } else {
472       PendingList pendingList = makePending(channelID, context, maxReachableObjects);
473       if (processPending) {
474         pendingList.setProcessPending(processPending);
475       }
476       return false;
477     }
478   }
479
480   private void createNewObjectsIfNecessary(ObjectManagerLookupContext context) {
481     if (!context.isNewObjectsCreated()) {
482       for (Iterator JavaDoc i = context.getNewObjectIDs().iterator(); i.hasNext();) {
483         ObjectID oid = (ObjectID) i.next();
484         ManagedObject mo = new ManagedObjectImpl(oid);
485         createObject(mo);
486       }
487       context.newObjectsCreationComplete();
488     }
489   }
490
491   private Set addReachableObjectsIfNecessary(ChannelID channelID, int maxReachableObjects, Set objects) {
492     if (maxReachableObjects <= 0) { return Collections.EMPTY_SET; }
493     ManagedObjectTraverser traverser = new ManagedObjectTraverser(maxReachableObjects);
494     Set lookedUpObjects = objects;
495     do {
496       traverser.traverse(lookedUpObjects);
497       lookedUpObjects = new HashSet();
498       Set lookupObjectIDs = traverser.getObjectsToLookup();
499       stateManager.removeReferencedFrom(channelID, lookupObjectIDs);
500       for (Iterator JavaDoc j = lookupObjectIDs.iterator(); j.hasNext();) {
501         ObjectID id = (ObjectID) j.next();
502         ManagedObjectReference newRef = getReference(id);
503         // Note : Objects are looked up only if it is in the memory and not referenced
504
if (newRef != null && !newRef.isReferenced()) {
505           if (objects.add(newRef)) {
506             lookedUpObjects.add(newRef);
507           }
508         }
509       }
510     } while (lookedUpObjects.size() > 0 && objects.size() < MAX_LOOKUP_OBJECTS_COUNT);
511     return traverser.getPendingObjectsToLookup(lookedUpObjects);
512   }
513
514   // TODO:: Multiple readonly checkouts, now that there are more than 1 thread faulting objects to the
515
// client
516
public void releaseReadOnly(ManagedObject object) {
517     synchronized (this) {
518       boolean processPending = basicRelease(object);
519       postRelease(processPending);
520     }
521
522   }
523
524   public void release(PersistenceTransaction persistenceTransaction, ManagedObject object) {
525     if (config.paranoid()) flush(persistenceTransaction, object);
526     synchronized (this) {
527       boolean processPending = basicRelease(object);
528       postRelease(processPending);
529     }
530
531   }
532
533   public synchronized void releaseAll(Collection JavaDoc objects) {
534     boolean processPending = false;
535     for (Iterator JavaDoc i = objects.iterator(); i.hasNext();) {
536       ManagedObject mo = (ManagedObject) i.next();
537       if (config.paranoid()) {
538         Assert.assertFalse(mo.isDirty());
539       }
540       processPending |= basicRelease(mo);
541     }
542     postRelease(processPending);
543   }
544
545   public void releaseAll(PersistenceTransaction persistenceTransaction, Collection JavaDoc managedObjects) {
546     if (config.paranoid()) flushAll(persistenceTransaction, managedObjects);
547     synchronized (this) {
548       boolean processPending = false;
549       for (Iterator JavaDoc i = managedObjects.iterator(); i.hasNext();) {
550         processPending |= basicRelease((ManagedObject) i.next());
551       }
552       postRelease(processPending);
553     }
554   }
555
556   private void removeAllObjectsByID(Set toDelete) {
557     for (Iterator JavaDoc i = toDelete.iterator(); i.hasNext();) {
558       ObjectID id = (ObjectID) i.next();
559       ManagedObjectReference ref = (ManagedObjectReference) references.remove(id);
560       if (ref != null) {
561         Assert.assertFalse(ref.isReferenced() || ref.getProcessPendingOnRelease());
562         evictionPolicy.remove(ref);
563       }
564     }
565   }
566
567   public synchronized int getCheckedOutCount() {
568     return checkedOutCount;
569   }
570
571   public Set getRootIDs() {
572     return objectStore.getRoots();
573   }
574
575   public Map JavaDoc getRootNamesToIDsMap() {
576     return objectStore.getRootNamesToIDsMap();
577   }
578
579   public SyncObjectIdSet getAllObjectIDs() {
580     return objectStore.getAllObjectIDs();
581   }
582
583   private void postRelease(boolean processPending) {
584     if (collector.isPausingOrPaused()) {
585       checkAndNotifyGC();
586     } else if (pending.size() > 0 && (processPending || pending.getProcessPending())) {
587       processPendingLookups();
588     }
589     notifyAll();
590   }
591
592   private boolean basicRelease(ManagedObject object) {
593     ManagedObjectReference mor = getReference(object.getID());
594
595     validateManagedObjectReference(mor, object.getID());
596
597     removeReferenceIfNecessary(mor);
598
599     unmarkReferenced(mor);
600     boolean isProcessPendingOnRelease = mor.getProcessPendingOnRelease();
601     mor.setProcessPendingOnRelease(false);
602     return isProcessPendingOnRelease;
603   }
604
605   private void removeReferenceIfNecessary(ManagedObjectReference mor) {
606     if (mor.isRemoveOnRelease()) {
607       if (mor.getObject().isDirty()) {
608         logger.error(mor + " is DIRTY");
609         throw new AssertionError JavaDoc(mor + " is DIRTY");
610       }
611       Object JavaDoc removed = references.remove(mor.getObjectID());
612       Assert.assertNotNull(removed);
613     }
614   }
615
616   private void checkAndNotifyGC() {
617     if (checkedOutCount == 0) {
618       logger.info("Notifying GC : pending = " + pending.size() + " checkedOutCount = " + checkedOutCount);
619       collector.notifyReadyToGC();
620     }
621   }
622
623   public synchronized void waitUntilReadyToGC() {
624     checkAndNotifyGC();
625     txnObjectMgr.recallAllCheckedoutObject();
626     while (!collector.isPaused()) {
627       try {
628         this.wait(10000);
629       } catch (InterruptedException JavaDoc e) {
630         throw new AssertionError JavaDoc(e);
631       }
632     }
633   }
634
635   public void notifyGCComplete(Set toDelete) {
636     synchronized (this) {
637       collector.notifyGCComplete();
638       removeAllObjectsByID(toDelete);
639       // Process pending, since we disabled process pending while GC pause was initiate.
640
processPendingLookups();
641       notifyAll();
642     }
643
644     if (toDelete.size() <= config.getDeleteBatchSize()) {
645       removeFromStore(toDelete);
646     } else {
647       Set split = new HashSet();
648       for (Iterator JavaDoc i = toDelete.iterator(); i.hasNext();) {
649         split.add(i.next());
650         if (split.size() >= config.getDeleteBatchSize()) {
651           removeFromStore(split);
652           split = new HashSet();
653         }
654       }
655       if (split.size() > 0) {
656         removeFromStore(split);
657       }
658     }
659   }
660
661   private void removeFromStore(Set toDelete) {
662     long start = System.currentTimeMillis();
663
664     PersistenceTransaction tx = newTransaction();
665     objectStore.removeAllObjectsByIDNow(tx, toDelete);
666     tx.commit();
667
668     long elapsed = System.currentTimeMillis() - start;
669     if (elapsed > REMOVE_THRESHOLD) {
670       logger.info("Removed " + toDelete.size() + " objects in " + elapsed + "ms.");
671     }
672   }
673
674   private void flush(PersistenceTransaction persistenceTransaction, ManagedObject managedObject) {
675     objectStore.commitObject(persistenceTransaction, managedObject);
676   }
677
678   private void flushAll(PersistenceTransaction persistenceTransaction, Collection JavaDoc managedObjects) {
679     objectStore.commitAllObjects(persistenceTransaction, managedObjects);
680   }
681
682   public void dump() {
683     PrintWriter JavaDoc pw = new PrintWriter JavaDoc(System.err);
684     new PrettyPrinter(pw).visit(this);
685     pw.flush();
686   }
687
688   // This method is fo tests only
689
public synchronized boolean isReferenced(ObjectID id) {
690     ManagedObjectReference reference = getReference(id);
691     return reference != null && reference.isReferenced();
692   }
693
694   // This method is public for testing purpose
695
public synchronized void createObject(ManagedObject object) {
696     syncAssertNotInShutdown();
697     Assert.eval(object.getID().toLong() != -1);
698     objectStore.addNewObject(object);
699     addNewReference(object, false);
700     stats.newObjectCreated();
701   }
702
703   public void createRoot(String JavaDoc rootName, ObjectID id) {
704     syncAssertNotInShutdown();
705     PersistenceTransaction tx = newTransaction();
706     objectStore.addNewRoot(tx, rootName, id);
707     tx.commit();
708     stats.newObjectCreated();
709     // This change needs to be notified so that new roots are not missedout
710
changed(null, null, id);
711   }
712
713   private PersistenceTransaction newTransaction() {
714     return this.persistenceTransactionProvider.newTransaction();
715   }
716
717   public void setGarbageCollector(GarbageCollector collector) {
718     syncAssertNotInShutdown();
719     this.collector = collector;
720
721     if (!config.doGC() || config.gcThreadSleepTime() < 0) return;
722
723     final Object JavaDoc stopLock = new Object JavaDoc();
724
725     StoppableThread st = new StoppableThread(this.gcThreadGroup, "GC") {
726       public void requestStop() {
727         super.requestStop();
728
729         synchronized (stopLock) {
730           stopLock.notifyAll();
731         }
732       }
733
734       public void run() {
735         final long gcSleepTime = config.gcThreadSleepTime();
736
737         while (true) {
738           try {
739             if (isStopRequested()) { return; }
740             synchronized (stopLock) {
741               stopLock.wait(gcSleepTime);
742             }
743             if (isStopRequested()) { return; }
744             gc();
745           } catch (InterruptedException JavaDoc ie) {
746             throw new TCRuntimeException(ie);
747           }
748         }
749       }
750
751     };
752     st.setDaemon(true);
753     collector.setState(st);
754   }
755
756   public void gc() {
757     collector.gc();
758   }
759
760   private Map JavaDoc processObjectsRequest(Collection JavaDoc objects) {
761     Map JavaDoc results = new HashMap JavaDoc();
762     for (Iterator JavaDoc i = objects.iterator(); i.hasNext();) {
763       ManagedObjectReference mor = (ManagedObjectReference) i.next();
764       Assert.assertNotNull(mor);
765       if (!mor.isReferenced()) {
766         markReferenced(mor);
767       }
768       if (mor.getObject() == null) {
769         logger.error("Object is NULL for " + mor);
770         throw new AssertionError JavaDoc("ManagedObject is null.");
771       }
772       results.put(mor.getObjectID(), mor.getObject());
773     }
774     return results;
775   }
776
777   private void processPendingLookups() {
778     List JavaDoc lp = pending;
779     pending = new PendingList();
780
781     // TODO:: Can be optimized to process only requests that becames available.
782
for (Iterator JavaDoc i = lp.iterator(); i.hasNext();) {
783       Pending p = (Pending) i.next();
784       basicLookupObjectsFor(p.getChannelID(), p.getRequestContext(), p.getMaxReachableObjects());
785     }
786   }
787
788   private PendingList makePending(ChannelID channelID, ObjectManagerLookupContext context, int maxReachableObjects) {
789     context.makePending();
790     pending.add(new Pending(channelID, context, maxReachableObjects));
791     return pending;
792   }
793
794   private void syncAssertNotInShutdown() {
795     assertNotInShutdown();
796   }
797
798   private void assertNotInShutdown() {
799     if (this.inShutdown) throw new ShutdownError();
800   }
801
802   public void evictCache(CacheStats stat) {
803     int size = references_size();
804     int toEvict = stat.getObjectCountToEvict(size);
805     if (toEvict <= 0) return;
806
807     // This could be a costly call, so call just once
808
Collection JavaDoc removalCandidates = evictionPolicy.getRemovalCandidates(toEvict);
809
810     HashSet toFlush = new HashSet();
811     ArrayList JavaDoc removed = new ArrayList JavaDoc();
812     reapCache(removalCandidates, toFlush, removed);
813
814     int evicted = (toFlush.size() + removed.size());
815     // Let GC work for us
816
removed = null;
817     removalCandidates = null;
818
819     if (!toFlush.isEmpty()) {
820       initateFlushRequest(toFlush);
821       toFlush = null; // make GC work
822
waitUntilFlushComplete();
823     }
824
825     // TODO:: Send the right objects to the cache manager
826
stat.objectEvicted(evicted, references_size(), Collections.EMPTY_LIST);
827   }
828
829   private void waitUntilFlushComplete() {
830     flushCount.waitUntil(0);
831   }
832
833   private void initateFlushRequest(Collection JavaDoc toFlush) {
834     flushCount.increment(toFlush.size());
835     for (Iterator JavaDoc i = toFlush.iterator(); i.hasNext();) {
836       int count = 0;
837       ManagedObjectFlushingContext mofc = new ManagedObjectFlushingContext();
838       while (count < MAX_COMMIT_SIZE && i.hasNext()) {
839         mofc.addObjectToFlush(i.next());
840         count++;
841         // i.remove();
842
}
843       flushSink.add(mofc);
844     }
845   }
846
847   public void flushAndEvict(List JavaDoc objects2Flush) {
848     PersistenceTransaction tx = newTransaction();
849     int size = objects2Flush.size();
850     flushAll(tx, objects2Flush);
851     tx.commit();
852     evicted(objects2Flush);
853     flushCount.decrement(size);
854   }
855
856   // XXX:: This is not synchronized and might not give us the right number. Performance over accuracy. This is to be
857
// used only in evictCache method.
858
private int references_size() {
859     return references.size();
860   }
861
862   private static class ObjectManagerLookupContext implements ObjectManagerResultsContext {
863
864     private final ObjectManagerResultsContext responseContext;
865     private boolean pending = false;
866     private boolean newObjectsCreated = false;
867
868     public ObjectManagerLookupContext(ObjectManagerResultsContext responseContext) {
869       this.responseContext = responseContext;
870     }
871
872     public boolean isPendingRequest() {
873       return pending;
874     }
875
876     public boolean isNewObjectsCreated() {
877       return newObjectsCreated;
878     }
879
880     public void newObjectsCreationComplete() {
881       newObjectsCreated = true;
882     }
883
884     public void makePending() {
885       this.pending = true;
886     }
887
888     public Set getLookupIDs() {
889       return responseContext.getLookupIDs();
890     }
891
892     public Set getNewObjectIDs() {
893       return responseContext.getNewObjectIDs();
894     }
895
896     public void setResults(ObjectManagerLookupResults results) {
897       responseContext.setResults(results);
898     }
899
900   }
901
902   private static class Pending {
903     private final ObjectManagerLookupContext context;
904     private final ChannelID groupingKey;
905     private final int maxReachableObjects;
906
907     public Pending(ChannelID groupingKey, ObjectManagerLookupContext context, int maxReachableObjects) {
908       this.groupingKey = groupingKey;
909       this.context = context;
910       this.maxReachableObjects = maxReachableObjects;
911     }
912
913     public String JavaDoc toString() {
914       return "ObjectManagerImpl.Pending[groupingKey=" + groupingKey + "]";
915
916     }
917
918     public ChannelID getChannelID() {
919       return this.groupingKey;
920     }
921
922     public ObjectManagerLookupContext getRequestContext() {
923       return context;
924     }
925
926     public int getMaxReachableObjects() {
927       return maxReachableObjects;
928     }
929
930   }
931
932   private static class PendingList extends LinkedList JavaDoc {
933     boolean processPending = false;
934
935     PendingList() {
936       super();
937     }
938
939     public boolean getProcessPending() {
940       return this.processPending;
941     }
942
943     public void setProcessPending(boolean b) {
944       this.processPending = b;
945     }
946   }
947
948   /*********************************************************************************************************************
949    * ManagedObjectChangeListener interface
950    */

951   public void changed(ObjectID changedObject, ObjectID oldReference, ObjectID newReference) {
952     collector.changed(changedObject, oldReference, newReference);
953   }
954
955   // TODO:: INITIAL_SET_SIZE too low and can use a pool
956
private static Set createNewSet() {
957     return new THashSet(INITIAL_SET_SIZE, LOAD_FACTOR);
958   }
959
960   private void validateManagedObjectReference(ManagedObjectReference mor, ObjectID id) {
961     if (mor == null) {
962       dump();
963       throw new AssertionError JavaDoc("ManagedObjectReference " + id + " is not found");
964     }
965
966     if (!mor.isReferenced()) {
967       logger.error("Basic Release is called for a object which is not referenced ! Reference = " + mor);
968       throw new AssertionError JavaDoc("Basic Release called without lookup !");
969     }
970   }
971 }
972
Popular Tags