1 5 package com.tc.objectserver.impl; 6 7 import com.tc.async.api.Sink; 8 import com.tc.exception.TCRuntimeException; 9 import com.tc.logging.TCLogger; 10 import com.tc.logging.TCLogging; 11 import com.tc.management.beans.object.ObjectManagementMonitor; 12 import com.tc.management.beans.object.ObjectManagementMonitorMBean; 13 import com.tc.management.beans.object.ObjectManagementMonitor.GCComptroller; 14 import com.tc.net.protocol.tcm.ChannelID; 15 import com.tc.object.ObjectID; 16 import com.tc.object.cache.CacheStats; 17 import com.tc.object.cache.Evictable; 18 import com.tc.object.cache.EvictionPolicy; 19 import com.tc.objectserver.api.GCStats; 20 import com.tc.objectserver.api.NoSuchObjectException; 21 import com.tc.objectserver.api.ObjectManager; 22 import com.tc.objectserver.api.ObjectManagerEventListener; 23 import com.tc.objectserver.api.ObjectManagerLookupResults; 24 import com.tc.objectserver.api.ObjectManagerMBean; 25 import com.tc.objectserver.api.ObjectManagerStatsListener; 26 import com.tc.objectserver.api.ShutdownError; 27 import com.tc.objectserver.context.ManagedObjectFaultingContext; 28 import com.tc.objectserver.context.ManagedObjectFlushingContext; 29 import com.tc.objectserver.context.ObjectManagerResultsContext; 30 import com.tc.objectserver.core.api.GarbageCollector; 31 import com.tc.objectserver.core.api.ManagedObject; 32 import com.tc.objectserver.core.impl.NullGarbageCollector; 33 import com.tc.objectserver.l1.api.ClientStateManager; 34 import com.tc.objectserver.managedobject.ManagedObjectChangeListener; 35 import com.tc.objectserver.managedobject.ManagedObjectImpl; 36 import com.tc.objectserver.managedobject.ManagedObjectTraverser; 37 import com.tc.objectserver.mgmt.ManagedObjectFacade; 38 import com.tc.objectserver.persistence.api.ManagedObjectStore; 39 import com.tc.objectserver.persistence.api.PersistenceTransaction; 40 import com.tc.objectserver.persistence.api.PersistenceTransactionProvider; 41 import com.tc.objectserver.tx.NullTransactionalObjectManager; 42 import com.tc.objectserver.tx.TransactionalObjectManager; 43 import com.tc.objectserver.tx.TransactionalObjectManagerImpl; 44 import com.tc.properties.TCPropertiesImpl; 45 import com.tc.text.PrettyPrinter; 46 import com.tc.util.Assert; 47 import com.tc.util.Counter; 48 import com.tc.util.SyncObjectIdSet; 49 import com.tc.util.concurrent.StoppableThread; 50 51 import gnu.trove.THashSet; 52 53 import java.io.PrintWriter ; 54 import java.util.ArrayList ; 55 import java.util.Collection ; 56 import java.util.Collections ; 57 import java.util.HashMap ; 58 import java.util.HashSet ; 59 import java.util.Iterator ; 60 import java.util.LinkedList ; 61 import java.util.List ; 62 import java.util.Map ; 63 import java.util.Set ; 64 65 70 public class ObjectManagerImpl implements ObjectManager, ManagedObjectChangeListener, ObjectManagerMBean, Evictable { 71 72 private static final TCLogger logger = TCLogging.getLogger(ObjectManager.class); 73 74 private static final byte DEFAULT_FLAG = 0x00; 75 private static final byte MISSING_OK = 0x01; 76 private static final byte NEW_REQUEST = 0x02; 77 private static final byte REMOVE_ON_RELEASE = 0x04; 78 79 private static final int MAX_COMMIT_SIZE = TCPropertiesImpl 80 .getProperties() 81 .getInt( 82 "l2.objectmanager.maxObjectsToCommit"); 83 private static final int INITIAL_SET_SIZE = 1; 85 private static final float LOAD_FACTOR = 0.75f; 86 private static final int MAX_LOOKUP_OBJECTS_COUNT = 5000; 87 private static final long REMOVE_THRESHOLD = 300; 88 89 private final ManagedObjectStore objectStore; 90 private final Map references; 91 private final EvictionPolicy evictionPolicy; 92 93 private GarbageCollector collector = new NullGarbageCollector(); 94 private int checkedOutCount = 0; 95 private PendingList pending = new PendingList(); 96 private Counter flushCount = new Counter(); 97 98 private volatile boolean inShutdown = false; 99 100 private ClientStateManager stateManager; 101 private final ObjectManagerConfig config; 102 private final ThreadGroup gcThreadGroup; 103 private ObjectManagerStatsListener stats = new NullObjectManagerStatsListener(); 104 private final PersistenceTransactionProvider persistenceTransactionProvider; 105 private final Sink faultSink; 106 private final Sink flushSink; 107 private final ObjectManagementMonitorMBean objectManagementMonitor; 108 private TransactionalObjectManager txnObjectMgr = new NullTransactionalObjectManager(); 109 110 public ObjectManagerImpl(ObjectManagerConfig config, ThreadGroup gcThreadGroup, ClientStateManager stateManager, 111 ManagedObjectStore objectStore, EvictionPolicy cache, 112 PersistenceTransactionProvider persistenceTransactionProvider, Sink faultSink, 113 Sink flushSink, ObjectManagementMonitorMBean objectManagementMonitor) { 114 this.faultSink = faultSink; 115 this.flushSink = flushSink; 116 Assert.assertNotNull(objectStore); 117 this.config = config; 118 this.gcThreadGroup = gcThreadGroup; 119 this.stateManager = stateManager; 120 this.objectStore = objectStore; 121 this.evictionPolicy = cache; 122 this.persistenceTransactionProvider = persistenceTransactionProvider; 123 this.references = new HashMap (10000); 124 this.objectManagementMonitor = objectManagementMonitor; 125 126 final boolean doGC = config.doGC(); 127 if (this.objectManagementMonitor instanceof ObjectManagementMonitor) { 128 ((ObjectManagementMonitor) this.objectManagementMonitor).registerGCController(new GCComptroller() { 129 public void startGC() { 130 gc(); 131 } 132 133 public boolean gcEnabledInConfig() { 134 return doGC; 135 } 136 }); 137 } 138 } 139 140 public void setTransactionalObjectManager(TransactionalObjectManagerImpl txnObjectManager) { 141 this.txnObjectMgr = txnObjectManager; 142 } 143 144 public void setStatsListener(ObjectManagerStatsListener statsListener) { 145 this.stats = statsListener; 146 } 147 148 public void start() { 149 collector.start(); 150 } 151 152 public synchronized void stop() { 153 this.inShutdown = true; 154 155 collector.stop(); 156 157 Set toFlush = new HashSet(); 159 for (Iterator i = references.values().iterator(); i.hasNext();) { 160 ManagedObject obj = ((ManagedObjectReference) i.next()).getObject(); 161 if (!obj.isNew()) toFlush.add(obj); 162 } 163 PersistenceTransaction tx = newTransaction(); 164 flushAll(tx, toFlush); 165 tx.commit(); 166 } 167 168 public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) { 169 out.println(getClass().getName()); 170 out.indent().print("roots: ").println(getRoots()); 171 out.indent().print("collector: ").visit(collector).println(); 172 out.indent().print("references: ").visit(references).println(); 173 174 out.indent().println("checkedOutCount: " + checkedOutCount); 175 out.indent().print("pending: ").visit(pending).println(); 176 177 out.indent().print("objectStore: ").duplicateAndIndent().visit(objectStore).println(); 178 out.indent().print("stateManager: ").duplicateAndIndent().visit(stateManager).println(); 179 return out; 180 } 181 182 public void addListener(ObjectManagerEventListener listener) { 183 if (listener == null) { throw new NullPointerException ("cannot add a null event listener"); } 184 collector.addListener(listener); 185 } 186 187 public GCStats[] getGarbageCollectorStats() { 188 return collector.getGarbageCollectorStats(); 189 } 190 191 public ObjectID lookupRootID(String name) { 192 syncAssertNotInShutdown(); 193 return objectStore.getRootID(name); 194 } 195 196 public boolean lookupObjectsAndSubObjectsFor(ChannelID channelID, ObjectManagerResultsContext responseContext, 197 int maxReachableObjects) { 198 return lookupObjectsForOptionallyCreate(channelID, responseContext, maxReachableObjects <= 0 ? 1 200 : maxReachableObjects); 201 } 202 203 public boolean lookupObjectsForCreateIfNecessary(ChannelID channelID, ObjectManagerResultsContext responseContext) { 204 return lookupObjectsForOptionallyCreate(channelID, responseContext, -1); 205 } 206 207 private synchronized boolean lookupObjectsForOptionallyCreate(ChannelID channelID, 208 ObjectManagerResultsContext responseContext, 209 int maxReachableObjects) { 210 syncAssertNotInShutdown(); 211 212 if (collector.isPausingOrPaused()) { 213 makePending(channelID, new ObjectManagerLookupContext(responseContext), maxReachableObjects) 215 .setProcessPending(true); 216 return false; 217 } 218 return basicLookupObjectsFor(channelID, new ObjectManagerLookupContext(responseContext), maxReachableObjects); 219 } 220 221 public Iterator getRoots() { 222 syncAssertNotInShutdown(); 223 return objectStore.getRoots().iterator(); 224 } 225 226 public Iterator getRootNames() { 227 syncAssertNotInShutdown(); 228 return objectStore.getRootNames().iterator(); 229 } 230 231 234 public ManagedObjectFacade lookupFacade(ObjectID id, int limit) throws NoSuchObjectException { 235 final ManagedObject object = lookup(id, true); 236 if (object == null) { throw new NoSuchObjectException(id); } 237 238 try { 239 return object.createFacade(limit); 240 } finally { 241 release(persistenceTransactionProvider.nullTransaction(), object); 242 } 243 } 244 245 private synchronized ManagedObject lookup(ObjectID id, boolean missingOk) { 246 syncAssertNotInShutdown(); 247 248 ManagedObjectReference mor = null; 249 try { 250 while (true) { 251 mor = getOrLookupReference(id, (missingOk ? REMOVE_ON_RELEASE | MISSING_OK : REMOVE_ON_RELEASE)); 252 if (mor == null) { 253 Assert.assertTrue(missingOk); 254 return null; 255 } 256 257 if (mor.isReferenced()) { 258 wait(); 259 } else { 260 markReferenced(mor); 261 break; 262 } 263 } 264 } catch (InterruptedException ie) { 265 Assert.eval(false); 266 } 267 return mor.getObject(); 268 } 269 270 public ManagedObject getObjectByID(ObjectID id) { 271 return lookup(id, false); 272 } 273 274 private void markReferenced(ManagedObjectReference reference) { 275 if (reference.isReferenced()) { throw new AssertionError ("Attempt to mark an already referenced object: " 276 + reference); } 277 reference.markReference(); 278 checkedOutCount++; 279 } 280 281 private void unmarkReferenced(ManagedObjectReference reference) { 282 if (!reference.isReferenced()) { throw new AssertionError ("Attempt to unmark an unreferenced object: " + reference); } 283 reference.unmarkReference(); 284 checkedOutCount--; 285 } 286 287 290 private ManagedObjectReference getReference(ObjectID id) { 291 return (ManagedObjectReference) references.get(id); 292 } 293 294 298 private ManagedObjectReference getOrLookupReference(ObjectID id, byte flags) { 299 ManagedObjectReference rv = getReference(id); 300 301 if (rv == null) { 302 ManagedObjectFaultingContext mofc = new ManagedObjectFaultingContext(id, isRemoveOnRelease(flags), 304 isMissingOkay(flags)); 305 faultSink.add(mofc); 306 307 stats.cacheMiss(); 310 rv = addNewReference(new FaultingManagedObjectReference(id)); 311 } else if (rv instanceof FaultingManagedObjectReference) { 312 FaultingManagedObjectReference fmr = (FaultingManagedObjectReference) rv; 314 if (!fmr.isFaultingInProgress()) { 315 references.remove(id); 316 if (isMissingOkay(flags)) { return null; } 317 throw new AssertionError ("Request for a non-existent object: " + id); 318 } 319 if (isNewRequest(flags)) stats.cacheMiss(); 320 } else { 321 if (isNewRequest(flags)) stats.cacheHit(); 322 if (!isRemoveOnRelease(flags)) { 323 if (rv.isRemoveOnRelease()) { 324 rv.setRemoveOnRelease(false); 327 evictionPolicy.add(rv); 328 } else { 329 evictionPolicy.markReferenced(rv); 330 } 331 } 332 } 333 334 return rv; 335 } 336 337 private boolean isNewRequest(byte flags) { 338 return ((flags & NEW_REQUEST) != 0x00); 339 } 340 341 private boolean isMissingOkay(byte flags) { 342 return ((flags & MISSING_OK) != 0x00); 343 } 344 345 private boolean isRemoveOnRelease(byte flags) { 346 return ((flags & REMOVE_ON_RELEASE) != 0x00); 347 } 348 349 public synchronized void addFaultedObject(ObjectID oid, ManagedObject mo, boolean removeOnRelease) { 350 FaultingManagedObjectReference fmor; 351 if (mo == null) { 352 ManagedObjectReference mor = (ManagedObjectReference) references.get(oid); 353 if (mor == null || !(mor instanceof FaultingManagedObjectReference) || !oid.equals(mor.getObjectID())) { 354 throw new AssertionError ("ManagedObjectReference is not what was expected : " + mor + " oid : " + oid); 356 } 357 fmor = (FaultingManagedObjectReference) mor; 358 fmor.faultingComplete(); 359 } else { 360 Assert.assertEquals(oid, mo.getID()); 361 ManagedObjectReference mor = (ManagedObjectReference) references.remove(oid); 362 if (mor == null || !(mor instanceof FaultingManagedObjectReference) || !oid.equals(mor.getObjectID())) { 363 throw new AssertionError ("ManagedObjectReference is not what was expected : " + mor + " oid : " + oid); 365 } 366 fmor = (FaultingManagedObjectReference) mor; 367 addNewReference(mo, removeOnRelease); 368 } 369 postRelease(fmor.getProcessPendingOnRelease()); 370 } 371 372 private ManagedObjectReference addNewReference(ManagedObject obj, boolean isRemoveOnRelease) throws AssertionError { 373 ManagedObjectReference newReference = obj.getReference(); 374 newReference.setRemoveOnRelease(isRemoveOnRelease); 375 376 return addNewReference(newReference); 377 } 378 379 private ManagedObjectReference addNewReference(ManagedObjectReference newReference) { 380 Assert.assertNull(references.put(newReference.getObjectID(), newReference)); 381 Assert.assertTrue(newReference.getNext() == null && newReference.getPrevious() == null); 382 383 if (!newReference.isRemoveOnRelease()) { 384 evictionPolicy.add(newReference); 385 } 386 return newReference; 387 } 388 389 private synchronized void reapCache(Collection removalCandidates, Collection toFlush, Collection removedObjects) { 390 while (collector.isPausingOrPaused()) { 391 try { 392 this.wait(); 393 } catch (InterruptedException e) { 394 logger.error(e); 395 } 396 } 397 for (Iterator i = removalCandidates.iterator(); i.hasNext();) { 398 ManagedObjectReference removalCandidate = (ManagedObjectReference) i.next(); 399 if (removalCandidate != null && !removalCandidate.isReferenced() && !removalCandidate.isNew()) { 400 evictionPolicy.remove(removalCandidate); 401 if (removalCandidate.getObject().isDirty()) { 402 markReferenced(removalCandidate); 403 toFlush.add(removalCandidate.getObject()); 404 } else { 405 removedObjects.add(references.remove(removalCandidate.getObjectID())); 407 } 408 } 409 } 410 } 411 412 private void evicted(Collection managedObjects) { 413 boolean processPendingOnRelease = false; 414 synchronized (this) { 415 checkedOutCount -= managedObjects.size(); 416 for (Iterator i = managedObjects.iterator(); i.hasNext();) { 417 ManagedObject mo = (ManagedObject) i.next(); 418 Object o = references.remove(mo.getID()); 419 if (o == null) { 420 logger.warn("Object ID : " + mo.getID() 421 + " was mapped to null but should have been mapped to a reference of " + mo); 422 } else { 423 ManagedObjectReference ref = (ManagedObjectReference) o; 424 if (ref.getProcessPendingOnRelease()) { 425 processPendingOnRelease = true; 426 ref.unmarkReference(); 427 addNewReference(mo, ref.isRemoveOnRelease()); 428 i.remove(); 429 } 430 } 431 } 432 postRelease(processPendingOnRelease); 433 } 434 435 } 436 437 private boolean basicLookupObjectsFor(ChannelID channelID, ObjectManagerLookupContext context, int maxReachableObjects) { 439 Set objects = createNewSet(); 440 441 createNewObjectsIfNecessary(context); 442 443 boolean processPending = false; 444 boolean available = true; 445 Set ids = context.getLookupIDs(); 446 for (Iterator i = ids.iterator(); i.hasNext();) { 447 ObjectID id = (ObjectID) i.next(); 448 ManagedObjectReference reference = getOrLookupReference(id, (context.isPendingRequest() ? DEFAULT_FLAG 452 : NEW_REQUEST)); 453 if (available && reference.isReferenced()) { 454 available = false; 455 reference.setProcessPendingOnRelease(true); 459 } 460 461 if (reference == null) throw new AssertionError ("ManagedObjectReference is null"); 462 objects.add(reference); 463 } 464 465 if (available) { 466 Set processLater = addReachableObjectsIfNecessary(channelID, maxReachableObjects, objects); 467 ObjectManagerLookupResults results = new ObjectManagerLookupResultsImpl(processObjectsRequest(objects), 468 processLater); 469 context.setResults(results); 470 return true; 471 } else { 472 PendingList pendingList = makePending(channelID, context, maxReachableObjects); 473 if (processPending) { 474 pendingList.setProcessPending(processPending); 475 } 476 return false; 477 } 478 } 479 480 private void createNewObjectsIfNecessary(ObjectManagerLookupContext context) { 481 if (!context.isNewObjectsCreated()) { 482 for (Iterator i = context.getNewObjectIDs().iterator(); i.hasNext();) { 483 ObjectID oid = (ObjectID) i.next(); 484 ManagedObject mo = new ManagedObjectImpl(oid); 485 createObject(mo); 486 } 487 context.newObjectsCreationComplete(); 488 } 489 } 490 491 private Set addReachableObjectsIfNecessary(ChannelID channelID, int maxReachableObjects, Set objects) { 492 if (maxReachableObjects <= 0) { return Collections.EMPTY_SET; } 493 ManagedObjectTraverser traverser = new ManagedObjectTraverser(maxReachableObjects); 494 Set lookedUpObjects = objects; 495 do { 496 traverser.traverse(lookedUpObjects); 497 lookedUpObjects = new HashSet(); 498 Set lookupObjectIDs = traverser.getObjectsToLookup(); 499 stateManager.removeReferencedFrom(channelID, lookupObjectIDs); 500 for (Iterator j = lookupObjectIDs.iterator(); j.hasNext();) { 501 ObjectID id = (ObjectID) j.next(); 502 ManagedObjectReference newRef = getReference(id); 503 if (newRef != null && !newRef.isReferenced()) { 505 if (objects.add(newRef)) { 506 lookedUpObjects.add(newRef); 507 } 508 } 509 } 510 } while (lookedUpObjects.size() > 0 && objects.size() < MAX_LOOKUP_OBJECTS_COUNT); 511 return traverser.getPendingObjectsToLookup(lookedUpObjects); 512 } 513 514 public void releaseReadOnly(ManagedObject object) { 517 synchronized (this) { 518 boolean processPending = basicRelease(object); 519 postRelease(processPending); 520 } 521 522 } 523 524 public void release(PersistenceTransaction persistenceTransaction, ManagedObject object) { 525 if (config.paranoid()) flush(persistenceTransaction, object); 526 synchronized (this) { 527 boolean processPending = basicRelease(object); 528 postRelease(processPending); 529 } 530 531 } 532 533 public synchronized void releaseAll(Collection objects) { 534 boolean processPending = false; 535 for (Iterator i = objects.iterator(); i.hasNext();) { 536 ManagedObject mo = (ManagedObject) i.next(); 537 if (config.paranoid()) { 538 Assert.assertFalse(mo.isDirty()); 539 } 540 processPending |= basicRelease(mo); 541 } 542 postRelease(processPending); 543 } 544 545 public void releaseAll(PersistenceTransaction persistenceTransaction, Collection managedObjects) { 546 if (config.paranoid()) flushAll(persistenceTransaction, managedObjects); 547 synchronized (this) { 548 boolean processPending = false; 549 for (Iterator i = managedObjects.iterator(); i.hasNext();) { 550 processPending |= basicRelease((ManagedObject) i.next()); 551 } 552 postRelease(processPending); 553 } 554 } 555 556 private void removeAllObjectsByID(Set toDelete) { 557 for (Iterator i = toDelete.iterator(); i.hasNext();) { 558 ObjectID id = (ObjectID) i.next(); 559 ManagedObjectReference ref = (ManagedObjectReference) references.remove(id); 560 if (ref != null) { 561 Assert.assertFalse(ref.isReferenced() || ref.getProcessPendingOnRelease()); 562 evictionPolicy.remove(ref); 563 } 564 } 565 } 566 567 public synchronized int getCheckedOutCount() { 568 return checkedOutCount; 569 } 570 571 public Set getRootIDs() { 572 return objectStore.getRoots(); 573 } 574 575 public Map getRootNamesToIDsMap() { 576 return objectStore.getRootNamesToIDsMap(); 577 } 578 579 public SyncObjectIdSet getAllObjectIDs() { 580 return objectStore.getAllObjectIDs(); 581 } 582 583 private void postRelease(boolean processPending) { 584 if (collector.isPausingOrPaused()) { 585 checkAndNotifyGC(); 586 } else if (pending.size() > 0 && (processPending || pending.getProcessPending())) { 587 processPendingLookups(); 588 } 589 notifyAll(); 590 } 591 592 private boolean basicRelease(ManagedObject object) { 593 ManagedObjectReference mor = getReference(object.getID()); 594 595 validateManagedObjectReference(mor, object.getID()); 596 597 removeReferenceIfNecessary(mor); 598 599 unmarkReferenced(mor); 600 boolean isProcessPendingOnRelease = mor.getProcessPendingOnRelease(); 601 mor.setProcessPendingOnRelease(false); 602 return isProcessPendingOnRelease; 603 } 604 605 private void removeReferenceIfNecessary(ManagedObjectReference mor) { 606 if (mor.isRemoveOnRelease()) { 607 if (mor.getObject().isDirty()) { 608 logger.error(mor + " is DIRTY"); 609 throw new AssertionError (mor + " is DIRTY"); 610 } 611 Object removed = references.remove(mor.getObjectID()); 612 Assert.assertNotNull(removed); 613 } 614 } 615 616 private void checkAndNotifyGC() { 617 if (checkedOutCount == 0) { 618 logger.info("Notifying GC : pending = " + pending.size() + " checkedOutCount = " + checkedOutCount); 619 collector.notifyReadyToGC(); 620 } 621 } 622 623 public synchronized void waitUntilReadyToGC() { 624 checkAndNotifyGC(); 625 txnObjectMgr.recallAllCheckedoutObject(); 626 while (!collector.isPaused()) { 627 try { 628 this.wait(10000); 629 } catch (InterruptedException e) { 630 throw new AssertionError (e); 631 } 632 } 633 } 634 635 public void notifyGCComplete(Set toDelete) { 636 synchronized (this) { 637 collector.notifyGCComplete(); 638 removeAllObjectsByID(toDelete); 639 processPendingLookups(); 641 notifyAll(); 642 } 643 644 if (toDelete.size() <= config.getDeleteBatchSize()) { 645 removeFromStore(toDelete); 646 } else { 647 Set split = new HashSet(); 648 for (Iterator i = toDelete.iterator(); i.hasNext();) { 649 split.add(i.next()); 650 if (split.size() >= config.getDeleteBatchSize()) { 651 removeFromStore(split); 652 split = new HashSet(); 653 } 654 } 655 if (split.size() > 0) { 656 removeFromStore(split); 657 } 658 } 659 } 660 661 private void removeFromStore(Set toDelete) { 662 long start = System.currentTimeMillis(); 663 664 PersistenceTransaction tx = newTransaction(); 665 objectStore.removeAllObjectsByIDNow(tx, toDelete); 666 tx.commit(); 667 668 long elapsed = System.currentTimeMillis() - start; 669 if (elapsed > REMOVE_THRESHOLD) { 670 logger.info("Removed " + toDelete.size() + " objects in " + elapsed + "ms."); 671 } 672 } 673 674 private void flush(PersistenceTransaction persistenceTransaction, ManagedObject managedObject) { 675 objectStore.commitObject(persistenceTransaction, managedObject); 676 } 677 678 private void flushAll(PersistenceTransaction persistenceTransaction, Collection managedObjects) { 679 objectStore.commitAllObjects(persistenceTransaction, managedObjects); 680 } 681 682 public void dump() { 683 PrintWriter pw = new PrintWriter (System.err); 684 new PrettyPrinter(pw).visit(this); 685 pw.flush(); 686 } 687 688 public synchronized boolean isReferenced(ObjectID id) { 690 ManagedObjectReference reference = getReference(id); 691 return reference != null && reference.isReferenced(); 692 } 693 694 public synchronized void createObject(ManagedObject object) { 696 syncAssertNotInShutdown(); 697 Assert.eval(object.getID().toLong() != -1); 698 objectStore.addNewObject(object); 699 addNewReference(object, false); 700 stats.newObjectCreated(); 701 } 702 703 public void createRoot(String rootName, ObjectID id) { 704 syncAssertNotInShutdown(); 705 PersistenceTransaction tx = newTransaction(); 706 objectStore.addNewRoot(tx, rootName, id); 707 tx.commit(); 708 stats.newObjectCreated(); 709 changed(null, null, id); 711 } 712 713 private PersistenceTransaction newTransaction() { 714 return this.persistenceTransactionProvider.newTransaction(); 715 } 716 717 public void setGarbageCollector(GarbageCollector collector) { 718 syncAssertNotInShutdown(); 719 this.collector = collector; 720 721 if (!config.doGC() || config.gcThreadSleepTime() < 0) return; 722 723 final Object stopLock = new Object (); 724 725 StoppableThread st = new StoppableThread(this.gcThreadGroup, "GC") { 726 public void requestStop() { 727 super.requestStop(); 728 729 synchronized (stopLock) { 730 stopLock.notifyAll(); 731 } 732 } 733 734 public void run() { 735 final long gcSleepTime = config.gcThreadSleepTime(); 736 737 while (true) { 738 try { 739 if (isStopRequested()) { return; } 740 synchronized (stopLock) { 741 stopLock.wait(gcSleepTime); 742 } 743 if (isStopRequested()) { return; } 744 gc(); 745 } catch (InterruptedException ie) { 746 throw new TCRuntimeException(ie); 747 } 748 } 749 } 750 751 }; 752 st.setDaemon(true); 753 collector.setState(st); 754 } 755 756 public void gc() { 757 collector.gc(); 758 } 759 760 private Map processObjectsRequest(Collection objects) { 761 Map results = new HashMap (); 762 for (Iterator i = objects.iterator(); i.hasNext();) { 763 ManagedObjectReference mor = (ManagedObjectReference) i.next(); 764 Assert.assertNotNull(mor); 765 if (!mor.isReferenced()) { 766 markReferenced(mor); 767 } 768 if (mor.getObject() == null) { 769 logger.error("Object is NULL for " + mor); 770 throw new AssertionError ("ManagedObject is null."); 771 } 772 results.put(mor.getObjectID(), mor.getObject()); 773 } 774 return results; 775 } 776 777 private void processPendingLookups() { 778 List lp = pending; 779 pending = new PendingList(); 780 781 for (Iterator i = lp.iterator(); i.hasNext();) { 783 Pending p = (Pending) i.next(); 784 basicLookupObjectsFor(p.getChannelID(), p.getRequestContext(), p.getMaxReachableObjects()); 785 } 786 } 787 788 private PendingList makePending(ChannelID channelID, ObjectManagerLookupContext context, int maxReachableObjects) { 789 context.makePending(); 790 pending.add(new Pending(channelID, context, maxReachableObjects)); 791 return pending; 792 } 793 794 private void syncAssertNotInShutdown() { 795 assertNotInShutdown(); 796 } 797 798 private void assertNotInShutdown() { 799 if (this.inShutdown) throw new ShutdownError(); 800 } 801 802 public void evictCache(CacheStats stat) { 803 int size = references_size(); 804 int toEvict = stat.getObjectCountToEvict(size); 805 if (toEvict <= 0) return; 806 807 Collection removalCandidates = evictionPolicy.getRemovalCandidates(toEvict); 809 810 HashSet toFlush = new HashSet(); 811 ArrayList removed = new ArrayList (); 812 reapCache(removalCandidates, toFlush, removed); 813 814 int evicted = (toFlush.size() + removed.size()); 815 removed = null; 817 removalCandidates = null; 818 819 if (!toFlush.isEmpty()) { 820 initateFlushRequest(toFlush); 821 toFlush = null; waitUntilFlushComplete(); 823 } 824 825 stat.objectEvicted(evicted, references_size(), Collections.EMPTY_LIST); 827 } 828 829 private void waitUntilFlushComplete() { 830 flushCount.waitUntil(0); 831 } 832 833 private void initateFlushRequest(Collection toFlush) { 834 flushCount.increment(toFlush.size()); 835 for (Iterator i = toFlush.iterator(); i.hasNext();) { 836 int count = 0; 837 ManagedObjectFlushingContext mofc = new ManagedObjectFlushingContext(); 838 while (count < MAX_COMMIT_SIZE && i.hasNext()) { 839 mofc.addObjectToFlush(i.next()); 840 count++; 841 } 843 flushSink.add(mofc); 844 } 845 } 846 847 public void flushAndEvict(List objects2Flush) { 848 PersistenceTransaction tx = newTransaction(); 849 int size = objects2Flush.size(); 850 flushAll(tx, objects2Flush); 851 tx.commit(); 852 evicted(objects2Flush); 853 flushCount.decrement(size); 854 } 855 856 private int references_size() { 859 return references.size(); 860 } 861 862 private static class ObjectManagerLookupContext implements ObjectManagerResultsContext { 863 864 private final ObjectManagerResultsContext responseContext; 865 private boolean pending = false; 866 private boolean newObjectsCreated = false; 867 868 public ObjectManagerLookupContext(ObjectManagerResultsContext responseContext) { 869 this.responseContext = responseContext; 870 } 871 872 public boolean isPendingRequest() { 873 return pending; 874 } 875 876 public boolean isNewObjectsCreated() { 877 return newObjectsCreated; 878 } 879 880 public void newObjectsCreationComplete() { 881 newObjectsCreated = true; 882 } 883 884 public void makePending() { 885 this.pending = true; 886 } 887 888 public Set getLookupIDs() { 889 return responseContext.getLookupIDs(); 890 } 891 892 public Set getNewObjectIDs() { 893 return responseContext.getNewObjectIDs(); 894 } 895 896 public void setResults(ObjectManagerLookupResults results) { 897 responseContext.setResults(results); 898 } 899 900 } 901 902 private static class Pending { 903 private final ObjectManagerLookupContext context; 904 private final ChannelID groupingKey; 905 private final int maxReachableObjects; 906 907 public Pending(ChannelID groupingKey, ObjectManagerLookupContext context, int maxReachableObjects) { 908 this.groupingKey = groupingKey; 909 this.context = context; 910 this.maxReachableObjects = maxReachableObjects; 911 } 912 913 public String toString() { 914 return "ObjectManagerImpl.Pending[groupingKey=" + groupingKey + "]"; 915 916 } 917 918 public ChannelID getChannelID() { 919 return this.groupingKey; 920 } 921 922 public ObjectManagerLookupContext getRequestContext() { 923 return context; 924 } 925 926 public int getMaxReachableObjects() { 927 return maxReachableObjects; 928 } 929 930 } 931 932 private static class PendingList extends LinkedList { 933 boolean processPending = false; 934 935 PendingList() { 936 super(); 937 } 938 939 public boolean getProcessPending() { 940 return this.processPending; 941 } 942 943 public void setProcessPending(boolean b) { 944 this.processPending = b; 945 } 946 } 947 948 951 public void changed(ObjectID changedObject, ObjectID oldReference, ObjectID newReference) { 952 collector.changed(changedObject, oldReference, newReference); 953 } 954 955 private static Set createNewSet() { 957 return new THashSet(INITIAL_SET_SIZE, LOAD_FACTOR); 958 } 959 960 private void validateManagedObjectReference(ManagedObjectReference mor, ObjectID id) { 961 if (mor == null) { 962 dump(); 963 throw new AssertionError ("ManagedObjectReference " + id + " is not found"); 964 } 965 966 if (!mor.isReferenced()) { 967 logger.error("Basic Release is called for a object which is not referenced ! Reference = " + mor); 968 throw new AssertionError ("Basic Release called without lookup !"); 969 } 970 } 971 } 972 | Popular Tags |