KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > RemoteObjectManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object;
6
7 import com.tc.logging.TCLogger;
8 import com.tc.net.protocol.tcm.ChannelID;
9 import com.tc.net.protocol.tcm.ChannelIDProvider;
10 import com.tc.object.dna.api.DNA;
11 import com.tc.object.msg.RequestManagedObjectMessage;
12 import com.tc.object.msg.RequestManagedObjectMessageFactory;
13 import com.tc.object.msg.RequestRootMessage;
14 import com.tc.object.msg.RequestRootMessageFactory;
15 import com.tc.object.session.SessionID;
16 import com.tc.object.session.SessionManager;
17 import com.tc.util.State;
18 import com.tc.util.Util;
19
20 import gnu.trove.THashMap;
21 import gnu.trove.THashSet;
22
23 import java.util.Collection JavaDoc;
24 import java.util.Collections JavaDoc;
25 import java.util.Date JavaDoc;
26 import java.util.HashSet JavaDoc;
27 import java.util.Iterator JavaDoc;
28 import java.util.LinkedHashMap JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.Set JavaDoc;
31 import java.util.Map.Entry;
32
33 /**
34  * This class is a kludge but I think it will do the trick for now. It is responsible for any communications to the
35  * server for object retrieval and removal
36  *
37  * @author steve
38  */

39 public class RemoteObjectManagerImpl implements RemoteObjectManager {
40
41   private static final State PAUSED = new State("PAUSED");
42   private static final State STARTING = new State("STARTING");
43   private static final State RUNNING = new State("RUNNING");
44
45   private final LinkedHashMap JavaDoc rootRequests = new LinkedHashMap JavaDoc();
46   private final Map dnaRequests = new THashMap();
47   private final Set removeObjects = new THashSet(100, 0.8f);
48   private final Map outstandingObjectRequests = new THashMap();
49   private final Map outstandingRootRequests = new THashMap();
50   private long objectRequestIDCounter = 0;
51   private final ObjectRequestMonitor requestMonitor;
52   private final ChannelIDProvider cip;
53   private final RequestRootMessageFactory rrmFactory;
54   private final RequestManagedObjectMessageFactory rmomFactory;
55   private final DNALRU lruDNA = new DNALRU();
56   private final static int MAX_LRU = 60;
57   private final int defaultDepth;
58   private State state = RUNNING;
59   private final SessionManager sessionManager;
60   private final TCLogger logger;
61   private static final int REMOVE_OBJECTS_THRESHOLD = 10000;
62
63   public RemoteObjectManagerImpl(TCLogger logger, ChannelIDProvider cip, RequestRootMessageFactory rrmFactory,
64                                  RequestManagedObjectMessageFactory rmomFactory, ObjectRequestMonitor requestMonitor,
65                                  int defaultDepth, SessionManager sessionManager) {
66     this.logger = logger;
67     this.cip = cip;
68     this.rrmFactory = rrmFactory;
69     this.rmomFactory = rmomFactory;
70     this.requestMonitor = requestMonitor;
71     this.defaultDepth = defaultDepth;
72     this.sessionManager = sessionManager;
73   }
74
75   public synchronized void pause() {
76     assertNotPaused("Attempt to pause while PAUSED");
77     state = PAUSED;
78     notifyAll();
79   }
80
81   public synchronized void starting() {
82     assertPaused("Attempt to start while not PAUSED");
83     state = STARTING;
84     notifyAll();
85   }
86
87   public synchronized void unpause() {
88     assertStarting("Attempt to unpause while not STARTING");
89     state = RUNNING;
90     notifyAll();
91   }
92
93   public synchronized void clear() {
94     if (state != STARTING) throw new AssertionError JavaDoc("Attempt to clear while not STARTING: " + state);
95     lruDNA.clear();
96     for (Iterator JavaDoc i = dnaRequests.entrySet().iterator(); i.hasNext();) {
97       Entry e = (Entry) i.next();
98       if (e.getValue() != null) {
99         i.remove();
100       }
101     }
102     removeObjects.clear();
103   }
104
105   private void waitUntilRunning() {
106     boolean isInterrupted = false;
107     while (state != RUNNING) {
108       try {
109         wait();
110       } catch (InterruptedException JavaDoc e) {
111         isInterrupted = true;
112       }
113     }
114     Util.selfInterruptIfNeeded(isInterrupted);
115   }
116
117   private void assertPaused(Object JavaDoc message) {
118     if (state != PAUSED) throw new AssertionError JavaDoc(message + ": " + state);
119   }
120
121   private void assertStarting(Object JavaDoc message) {
122     if (state != STARTING) throw new AssertionError JavaDoc(message + ": " + state);
123   }
124
125   private void assertNotPaused(Object JavaDoc message) {
126     if (state == PAUSED) throw new AssertionError JavaDoc(message + ": " + state);
127   }
128
129   public synchronized void requestOutstanding() {
130     assertStarting("Attempt to request outstanding object requests while not STARTING");
131     for (Iterator JavaDoc i = outstandingObjectRequests.values().iterator(); i.hasNext();) {
132       RequestManagedObjectMessage rmom = createRequestManagedObjectMessage((ObjectRequestContext) i.next(),
133                                                                            Collections.EMPTY_SET);
134       rmom.send();
135     }
136     for (Iterator JavaDoc i = outstandingRootRequests.values().iterator(); i.hasNext();) {
137       RequestRootMessage rrm = createRootMessage((String JavaDoc) i.next());
138       rrm.send();
139     }
140   }
141
142   public DNA retrieve(ObjectID id) {
143     return retrieve(id, defaultDepth);
144   }
145
146   public synchronized DNA retrieve(ObjectID id, int depth) {
147     boolean isInterrupted = false;
148     
149     ObjectRequestContext ctxt = new ObjectRequestContextImpl(this.cip.getChannelID(),
150                                                              new ObjectRequestID(objectRequestIDCounter++), id, depth);
151     while (!dnaRequests.containsKey(id) || dnaRequests.get(id) == null) {
152       waitUntilRunning();
153       if (!dnaRequests.containsKey(id)) {
154         sendRequest(ctxt);
155       } else if (!outstandingObjectRequests.containsKey(id)) {
156         outstandingObjectRequests.put(id, ctxt);
157       }
158
159       if (dnaRequests.get(id) == null) {
160         try {
161           wait();
162         } catch (InterruptedException JavaDoc e) {
163           isInterrupted = true;
164         }
165       }
166     }
167     Util.selfInterruptIfNeeded(isInterrupted);
168     lruDNA.remove(id);
169     return (DNA) dnaRequests.remove(id);
170   }
171
172   private void sendRequest(ObjectRequestContext ctxt) {
173     Set tr = new HashSet(removeObjects);
174     RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(ctxt, tr);
175     removeObjects.clear();
176     ObjectID id = null;
177     for (Iterator JavaDoc i = ctxt.getObjectIDs().iterator(); i.hasNext();) {
178       id = (ObjectID) i.next();
179       dnaRequests.put(id, null);
180     }
181     // XXX:: This is a little weird that we add only the last ObjectID to the outstandingObjectRequests map
182
// when we add all the list of ObjectIDs to dnaRequests. This is done so that we only send the request once
183
// on resend. Since the only way we request for more than one ObjectID in 1 message is when someone initiate
184
// non-blocking lookups. So if we loose those requests on restart it is still ok.
185
this.outstandingObjectRequests.put(id, ctxt);
186     rmom.send();
187     requestMonitor.notifyObjectRequest(ctxt);
188   }
189
190   private RequestManagedObjectMessage createRequestManagedObjectMessage(ObjectRequestContext ctxt, Set removed) {
191     RequestManagedObjectMessage rmom = rmomFactory.newRequestManagedObjectMessage();
192     Set requestedObjectIDs = ctxt.getObjectIDs();
193     rmom.initialize(ctxt, requestedObjectIDs, removed);
194     return rmom;
195   }
196
197   public synchronized ObjectID retrieveRootID(String JavaDoc name) {
198
199     if (!rootRequests.containsKey(name)) {
200       RequestRootMessage rrm = createRootMessage(name);
201       rootRequests.put(name, ObjectID.NULL_ID);
202       outstandingRootRequests.put(name, name);
203       rrm.send();
204     }
205
206     boolean isInterrupted = false;
207     while (ObjectID.NULL_ID.equals(rootRequests.get(name))) {
208       waitUntilRunning();
209       try {
210         if (ObjectID.NULL_ID.equals(rootRequests.get(name))) {
211           wait();
212         }
213       } catch (InterruptedException JavaDoc e) {
214         isInterrupted = true;
215       }
216     }
217     Util.selfInterruptIfNeeded(isInterrupted);
218
219     return (ObjectID) (rootRequests.containsKey(name) ? rootRequests.get(name) : ObjectID.NULL_ID);
220   }
221
222   private RequestRootMessage createRootMessage(String JavaDoc name) {
223     RequestRootMessage rrm = rrmFactory.newRequestRootMessage();
224     rrm.initialize(name);
225     return rrm;
226   }
227
228   public synchronized void addRoot(String JavaDoc name, ObjectID id) {
229     waitUntilRunning();
230     if (id.isNull()) {
231       rootRequests.remove(name);
232     } else {
233       rootRequests.put(name, id);
234     }
235     Object JavaDoc rootName = outstandingRootRequests.remove(name);
236     if (rootName == null) {
237       // This is possible in some restart scenario
238
logger.warn("A root was added that was not found in the outstanding requests. root name = " + name + " " + id);
239     }
240     notifyAll();
241   }
242
243   public synchronized void addAllObjects(SessionID sessionID, long batchID, Collection JavaDoc dnas) {
244     waitUntilRunning();
245     if (!sessionManager.isCurrentSession(sessionID)) {
246       logger.warn("Ignoring DNA added from a different session: " + sessionID + ", " + sessionManager);
247       return;
248     }
249     lruDNA.clearUnrequestedDNA();
250     lruDNA.add(batchID, dnas);
251     for (Iterator JavaDoc i = dnas.iterator(); i.hasNext();) {
252       DNA dna = (DNA) i.next();
253       // The server should not send us any objects that the server thinks we still have.
254
if (removeObjects.contains(dna.getObjectID())) {
255         // formatting
256
throw new AssertionError JavaDoc("Server sent us an object that is present in the removed set - " + dna.getObjectID()
257                                  + " , removed set = " + removeObjects);
258       }
259       basicAddObject(dna);
260     }
261     notifyAll();
262   }
263
264   // Used only for testing
265
synchronized void addObject(DNA dna) {
266     if (!removeObjects.contains(dna.getObjectID())) basicAddObject(dna);
267     notifyAll();
268   }
269
270   private void basicAddObject(DNA dna) {
271     dnaRequests.put(dna.getObjectID(), dna);
272     outstandingObjectRequests.remove(dna.getObjectID());
273   }
274
275   public synchronized void removed(ObjectID id) {
276     dnaRequests.remove(id);
277     removeObjects.add(id);
278     if (removeObjects.size() > REMOVE_OBJECTS_THRESHOLD) {
279       ObjectRequestContext ctxt = new ObjectRequestContextImpl(this.cip.getChannelID(),
280                                                                new ObjectRequestID(objectRequestIDCounter++),
281                                                                Collections.EMPTY_SET, -1);
282       Set tr = new HashSet(removeObjects);
283       RequestManagedObjectMessage rmom = createRequestManagedObjectMessage(ctxt, tr);
284       removeObjects.clear();
285       rmom.send();
286     }
287   }
288
289   public class ObjectRequestContextImpl implements ObjectRequestContext {
290
291     private final long timestamp;
292
293     private final Set objectIDs;
294
295     private final ObjectRequestID requestID;
296
297     private final ChannelID channelID;
298
299     private final int depth;
300
301     private ObjectRequestContextImpl(ChannelID channelID, ObjectRequestID requestID, ObjectID objectID, int depth) {
302       this(channelID, requestID, new HashSet(), depth);
303       this.objectIDs.add(objectID);
304     }
305
306     private ObjectRequestContextImpl(ChannelID channelID, ObjectRequestID requestID, Set objectIDs, int depth) {
307       this.timestamp = System.currentTimeMillis();
308       this.channelID = channelID;
309       this.requestID = requestID;
310       this.objectIDs = objectIDs;
311       this.depth = depth;
312     }
313
314     public ChannelID getChannelID() {
315       return this.channelID;
316     }
317
318     public ObjectRequestID getRequestID() {
319       return this.requestID;
320     }
321
322     public Set getObjectIDs() {
323       return this.objectIDs;
324     }
325
326     public int getRequestDepth() {
327       return this.depth;
328     }
329
330     public String JavaDoc toString() {
331       return getClass().getName() + "[" + new Date JavaDoc(timestamp) + ", requestID =" + requestID + ", objectIDs ="
332              + objectIDs + ", depth = " + depth + "]";
333     }
334   }
335
336   private class DNALRU {
337     private LinkedHashMap JavaDoc dnas = new LinkedHashMap JavaDoc();
338
339     public synchronized int size() {
340       return dnas.size();
341     }
342
343     public synchronized void clear() {
344       dnas.clear();
345     }
346
347     public synchronized void add(long batchID, Collection JavaDoc objs) {
348       Long JavaDoc key = new Long JavaDoc(batchID);
349       Map m = (Map) dnas.get(key);
350       if (m == null) {
351         // XXX:: We are creating a Map with initial size equals objs.size() but there could be more to come !
352
// Revisit !!
353
m = new THashMap(objs.size(), 0.8f);
354         dnas.put(key, m);
355       }
356       for (Iterator JavaDoc i = objs.iterator(); i.hasNext();) {
357         DNA dna = (DNA) i.next();
358         m.put(dna.getObjectID(), dna);
359       }
360     }
361
362     public synchronized void remove(ObjectID id) {
363       for (Iterator JavaDoc i = dnas.values().iterator(); i.hasNext();) {
364         Map m = (Map) i.next();
365         if (m.remove(id) != null) {
366           // found !!!
367
break;
368         }
369       }
370     }
371
372     public synchronized void clearUnrequestedDNA() {
373       if (dnas.size() > MAX_LRU) {
374         Iterator JavaDoc dnaMapIterator = dnas.values().iterator();
375         Map dnaMap = (Map) dnaMapIterator.next();
376         for (Iterator JavaDoc i = dnaMap.keySet().iterator(); i.hasNext();) {
377           ObjectID id = (ObjectID) i.next();
378           if (!outstandingObjectRequests.containsKey(id)) {
379             // only include this ID in the removed set if this DNA has never left the request map.
380
// If it has left the map, this client is actually be referencing this object
381
if (dnaRequests.containsKey(id)) {
382               removed(id);
383             }
384           }
385         }
386         dnaMapIterator.remove();
387       }
388     }
389   }
390
391 }
392
Popular Tags