1 5 package com.tc.object; 6 7 import com.tc.logging.TCLogger; 8 import com.tc.net.protocol.tcm.ChannelID; 9 import com.tc.net.protocol.tcm.ChannelIDProvider; 10 import com.tc.object.dna.api.DNA; 11 import com.tc.object.msg.RequestManagedObjectMessage; 12 import com.tc.object.msg.RequestManagedObjectMessageFactory; 13 import com.tc.object.msg.RequestRootMessage; 14 import com.tc.object.msg.RequestRootMessageFactory; 15 import com.tc.object.session.SessionID; 16 import com.tc.object.session.SessionManager; 17 import com.tc.util.State; 18 import com.tc.util.Util; 19 20 import gnu.trove.THashMap; 21 import gnu.trove.THashSet; 22 23 import java.util.Collection ; 24 import java.util.Collections ; 25 import java.util.Date ; 26 import java.util.HashSet ; 27 import java.util.Iterator ; 28 import java.util.LinkedHashMap ; 29 import java.util.Map ; 30 import java.util.Set ; 31 import java.util.Map.Entry; 32 33 39 public class RemoteObjectManagerImpl implements RemoteObjectManager { 40 41 private static final State PAUSED = new State("PAUSED"); 42 private static final State STARTING = new State("STARTING"); 43 private static final State RUNNING = new State("RUNNING"); 44 45 private final LinkedHashMap rootRequests = new LinkedHashMap (); 46 private final Map dnaRequests = new THashMap(); 47 private final Set removeObjects = new THashSet(100, 0.8f); 48 private final Map outstandingObjectRequests = new THashMap(); 49 private final Map outstandingRootRequests = new THashMap(); 50 private long objectRequestIDCounter = 0; 51 private final ObjectRequestMonitor requestMonitor; 52 private final ChannelIDProvider cip; 53 private final RequestRootMessageFactory rrmFactory; 54 private final RequestManagedObjectMessageFactory rmomFactory; 55 private final DNALRU lruDNA = new DNALRU(); 56 private final static int MAX_LRU = 60; 57 private final int defaultDepth; 58 private State state = RUNNING; 59 private final SessionManager sessionManager; 60 private final TCLogger logger; 61 private static final int REMOVE_OBJECTS_THRESHOLD = 10000; 62 63 public RemoteObjectManagerImpl(TCLogger logger, ChannelIDProvider cip, RequestRootMessageFactory rrmFactory, 64 RequestManagedObjectMessageFactory rmomFactory, ObjectRequestMonitor requestMonitor, 65 int defaultDepth, SessionManager sessionManager) { 66 this.logger = logger; 67 this.cip = cip; 68 this.rrmFactory = rrmFactory; 69 this.rmomFactory = rmomFactory; 70 this.requestMonitor = requestMonitor; 71 this.defaultDepth = defaultDepth; 72 this.sessionManager = sessionManager; 73 } 74 75 public synchronized void pause() { 76 assertNotPaused("Attempt to pause while PAUSED"); 77 state = PAUSED; 78 notifyAll(); 79 } 80 81 public synchronized void starting() { 82 assertPaused("Attempt to start while not PAUSED"); 83 state = STARTING; 84 notifyAll(); 85 } 86 87 public synchronized void unpause() { 88 assertStarting("Attempt to unpause while not STARTING"); 89 state = RUNNING; 90 notifyAll(); 91 } 92 93 public synchronized void clear() { 94 if (state != STARTING) throw new AssertionError ("Attempt to clear while not STARTING: " + state); 95 lruDNA.clear(); 96 for (Iterator i = dnaRequests.entrySet().iterator(); i.hasNext();) { 97 Entry e = (Entry) i.next(); 98 if (e.getValue() != null) { 99 i.remove(); 100 } 101 } 102 removeObjects.clear(); 103 } 104 105 private void waitUntilRunning() { 106 boolean isInterrupted = false; 107 while (state != RUNNING) { 108 try { 109 wait(); 110 } catch (InterruptedException e) { 111 isInterrupted = true; 112 } 113 } 114 Util.selfInterruptIfNeeded(isInterrupted); 115 } 116 117 private void assertPaused(Object message) { 118 if (state != PAUSED) throw new AssertionError (message + ": " + state); 119 } 120 121 private void assertStarting(Object message) { 122 if (state != STARTING) throw new AssertionError (message + ": " + state); 123 } 124 125 private void assertNotPaused(Object message) { 126 if (state == PAUSED) throw new AssertionError (message + ": " + state); 127 } 128 129 public synchronized void requestOutstanding() { 130 assertStarting("Attempt to request outstanding object requests while not STARTING"); 131 for (Iterator i = outstandingObjectRequests.values().iterator(); i.hasNext();) { 132 RequestManagedObjectMessage rmom = createRequestManagedObjectMessage((ObjectRequestContext) i.next(), 133 Collections.EMPTY_SET); 134 rmom.send(); 135 } 136 for (Iterator i = outstandingRootRequests.values().iterator(); i.hasNext();) { 137 RequestRootMessage rrm = createRootMessage((String ) i.next()); 138 rrm.send(); 139 } 140 } 141 142 public DNA retrieve(ObjectID id) { 143 return retrieve(id, defaultDepth); 144 } 145 146 public synchronized DNA retrieve(ObjectID id, int depth) { 147 boolean isInterrupted = false; 148 149 ObjectRequestContext ctxt = new ObjectRequestContextImpl(this.cip.getChannelID(), 150 new ObjectRequestID(objectRequestIDCounter++), id, depth); 151 while (!dnaRequests.containsKey(id) || dnaRequests.get(id) == null) { 152 waitUntilRunning(); 153 if (!dnaRequests.containsKey(id)) { 154 sendRequest(ctxt); 155 } else if (!outstandingObjectRequests.containsKey(id)) { 156 outstandingObjectRequests.put(id, ctxt); 157 } 158 159 if (dnaRequests.get(id) == null) { 160 try { 161 wait(); 162 } catch (InterruptedException e) { 163 isInterrupted = true; 164 } 165 } 166 } 167 Util.selfInterruptIfNeeded(isInterrupted); 168 lruDNA.remove(id); 169 return (DNA) dnaRequests.remove(id); 170 } 171 172 private void sendRequest(ObjectRequestContext ctxt) { 173 Set tr = new HashSet(removeObjects); 174 RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(ctxt, tr); 175 removeObjects.clear(); 176 ObjectID id = null; 177 for (Iterator i = ctxt.getObjectIDs().iterator(); i.hasNext();) { 178 id = (ObjectID) i.next(); 179 dnaRequests.put(id, null); 180 } 181 this.outstandingObjectRequests.put(id, ctxt); 186 rmom.send(); 187 requestMonitor.notifyObjectRequest(ctxt); 188 } 189 190 private RequestManagedObjectMessage createRequestManagedObjectMessage(ObjectRequestContext ctxt, Set removed) { 191 RequestManagedObjectMessage rmom = rmomFactory.newRequestManagedObjectMessage(); 192 Set requestedObjectIDs = ctxt.getObjectIDs(); 193 rmom.initialize(ctxt, requestedObjectIDs, removed); 194 return rmom; 195 } 196 197 public synchronized ObjectID retrieveRootID(String name) { 198 199 if (!rootRequests.containsKey(name)) { 200 RequestRootMessage rrm = createRootMessage(name); 201 rootRequests.put(name, ObjectID.NULL_ID); 202 outstandingRootRequests.put(name, name); 203 rrm.send(); 204 } 205 206 boolean isInterrupted = false; 207 while (ObjectID.NULL_ID.equals(rootRequests.get(name))) { 208 waitUntilRunning(); 209 try { 210 if (ObjectID.NULL_ID.equals(rootRequests.get(name))) { 211 wait(); 212 } 213 } catch (InterruptedException e) { 214 isInterrupted = true; 215 } 216 } 217 Util.selfInterruptIfNeeded(isInterrupted); 218 219 return (ObjectID) (rootRequests.containsKey(name) ? rootRequests.get(name) : ObjectID.NULL_ID); 220 } 221 222 private RequestRootMessage createRootMessage(String name) { 223 RequestRootMessage rrm = rrmFactory.newRequestRootMessage(); 224 rrm.initialize(name); 225 return rrm; 226 } 227 228 public synchronized void addRoot(String name, ObjectID id) { 229 waitUntilRunning(); 230 if (id.isNull()) { 231 rootRequests.remove(name); 232 } else { 233 rootRequests.put(name, id); 234 } 235 Object rootName = outstandingRootRequests.remove(name); 236 if (rootName == null) { 237 logger.warn("A root was added that was not found in the outstanding requests. root name = " + name + " " + id); 239 } 240 notifyAll(); 241 } 242 243 public synchronized void addAllObjects(SessionID sessionID, long batchID, Collection dnas) { 244 waitUntilRunning(); 245 if (!sessionManager.isCurrentSession(sessionID)) { 246 logger.warn("Ignoring DNA added from a different session: " + sessionID + ", " + sessionManager); 247 return; 248 } 249 lruDNA.clearUnrequestedDNA(); 250 lruDNA.add(batchID, dnas); 251 for (Iterator i = dnas.iterator(); i.hasNext();) { 252 DNA dna = (DNA) i.next(); 253 if (removeObjects.contains(dna.getObjectID())) { 255 throw new AssertionError ("Server sent us an object that is present in the removed set - " + dna.getObjectID() 257 + " , removed set = " + removeObjects); 258 } 259 basicAddObject(dna); 260 } 261 notifyAll(); 262 } 263 264 synchronized void addObject(DNA dna) { 266 if (!removeObjects.contains(dna.getObjectID())) basicAddObject(dna); 267 notifyAll(); 268 } 269 270 private void basicAddObject(DNA dna) { 271 dnaRequests.put(dna.getObjectID(), dna); 272 outstandingObjectRequests.remove(dna.getObjectID()); 273 } 274 275 public synchronized void removed(ObjectID id) { 276 dnaRequests.remove(id); 277 removeObjects.add(id); 278 if (removeObjects.size() > REMOVE_OBJECTS_THRESHOLD) { 279 ObjectRequestContext ctxt = new ObjectRequestContextImpl(this.cip.getChannelID(), 280 new ObjectRequestID(objectRequestIDCounter++), 281 Collections.EMPTY_SET, -1); 282 Set tr = new HashSet(removeObjects); 283 RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(ctxt, tr); 284 removeObjects.clear(); 285 rmom.send(); 286 } 287 } 288 289 public class ObjectRequestContextImpl implements ObjectRequestContext { 290 291 private final long timestamp; 292 293 private final Set objectIDs; 294 295 private final ObjectRequestID requestID; 296 297 private final ChannelID channelID; 298 299 private final int depth; 300 301 private ObjectRequestContextImpl(ChannelID channelID, ObjectRequestID requestID, ObjectID objectID, int depth) { 302 this(channelID, requestID, new HashSet(), depth); 303 this.objectIDs.add(objectID); 304 } 305 306 private ObjectRequestContextImpl(ChannelID channelID, ObjectRequestID requestID, Set objectIDs, int depth) { 307 this.timestamp = System.currentTimeMillis(); 308 this.channelID = channelID; 309 this.requestID = requestID; 310 this.objectIDs = objectIDs; 311 this.depth = depth; 312 } 313 314 public ChannelID getChannelID() { 315 return this.channelID; 316 } 317 318 public ObjectRequestID getRequestID() { 319 return this.requestID; 320 } 321 322 public Set getObjectIDs() { 323 return this.objectIDs; 324 } 325 326 public int getRequestDepth() { 327 return this.depth; 328 } 329 330 public String toString() { 331 return getClass().getName() + "[" + new Date (timestamp) + ", requestID =" + requestID + ", objectIDs =" 332 + objectIDs + ", depth = " + depth + "]"; 333 } 334 } 335 336 private class DNALRU { 337 private LinkedHashMap dnas = new LinkedHashMap (); 338 339 public synchronized int size() { 340 return dnas.size(); 341 } 342 343 public synchronized void clear() { 344 dnas.clear(); 345 } 346 347 public synchronized void add(long batchID, Collection objs) { 348 Long key = new Long (batchID); 349 Map m = (Map) dnas.get(key); 350 if (m == null) { 351 m = new THashMap(objs.size(), 0.8f); 354 dnas.put(key, m); 355 } 356 for (Iterator i = objs.iterator(); i.hasNext();) { 357 DNA dna = (DNA) i.next(); 358 m.put(dna.getObjectID(), dna); 359 } 360 } 361 362 public synchronized void remove(ObjectID id) { 363 for (Iterator i = dnas.values().iterator(); i.hasNext();) { 364 Map m = (Map) i.next(); 365 if (m.remove(id) != null) { 366 break; 368 } 369 } 370 } 371 372 public synchronized void clearUnrequestedDNA() { 373 if (dnas.size() > MAX_LRU) { 374 Iterator dnaMapIterator = dnas.values().iterator(); 375 Map dnaMap = (Map) dnaMapIterator.next(); 376 for (Iterator i = dnaMap.keySet().iterator(); i.hasNext();) { 377 ObjectID id = (ObjectID) i.next(); 378 if (!outstandingObjectRequests.containsKey(id)) { 379 if (dnaRequests.containsKey(id)) { 382 removed(id); 383 } 384 } 385 } 386 dnaMapIterator.remove(); 387 } 388 } 389 } 390 391 } 392 | Popular Tags |