KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > nightlabs > ipanema > jdo > cache > Cache


1 /*
2  * Created on Jul 24, 2005
3  */

4 package com.nightlabs.ipanema.jdo.cache;
5
6 import java.util.Collection JavaDoc;
7 import java.util.HashMap JavaDoc;
8 import java.util.HashSet JavaDoc;
9 import java.util.Iterator JavaDoc;
10 import java.util.LinkedList JavaDoc;
11 import java.util.Map JavaDoc;
12 import java.util.Set JavaDoc;
13
14 import javax.jdo.JDOHelper;
15
16 import org.apache.log4j.Logger;
17
18 import com.nightlabs.ModuleException;
19 import com.nightlabs.config.Config;
20 import com.nightlabs.config.ConfigException;
21 import com.nightlabs.ipanema.base.login.Login;
22 import com.nightlabs.ipanema.jdo.JDOManager;
23 import com.nightlabs.ipanema.jdo.JDOManagerUtil;
24 import com.nightlabs.ipanema.jdo.JDOObjectID2PCClassMap;
25 import com.nightlabs.jdo.ObjectID;
26 import com.nightlabs.math.Base62Coder;
27 import com.nightlabs.notification.NotificationEvent;
28 import com.nightlabs.rcp.notification.ChangeManager;
29
30 /**
31  * A singleton of this class caches <b>all</b> JDO objects
32  * in the client. To
33  * use it, you should implement your own
34  *
35  * @author Marco Schulze - marco at nightlabs dot de
36  */

37 public class Cache
38 {
39     public static Logger LOGGER = Logger.getLogger(Cache.class);
40
41     private static Cache _sharedInstance = null;
42
43     private NotificationThread notificationThread = new NotificationThread(this);
44
45     protected static class NotificationThread extends Thread JavaDoc
46     {
47         private static volatile int nextID = 0;
48
49         private Cache cache;
50
51         public NotificationThread(Cache cache)
52         {
53             this.cache = cache;
54             setName("Cache.NotificationThread-" + (nextID++));
55         }
56
57         /**
58          * @see java.lang.Thread#run()
59          */

60         public void run()
61         {
62             long lastErrorDT = 0;
63             JDOManager jdoManager = null;
64
65             while (!isInterrupted()) {
66                 try {
67                     if (jdoManager == null)
68                         jdoManager = JDOManagerUtil.getHome(Login.getLogin().getInitialContextProperties()).create();
69
70                     Collection JavaDoc changeObjectIDs = jdoManager.waitForChanges(
71                             cache.getCacheSessionID(),
72                             cache.getCacheCfMod().getWaitForChangesTimeoutMSec());
73
74                     if (changeObjectIDs != null) {
75                         LOGGER.info("Received change notification with " + changeObjectIDs.size() + " objectIDs.");
76                         int removedCarrierCount = 0;
77                         for (Iterator JavaDoc it = changeObjectIDs.iterator(); it.hasNext(); ) {
78                             ObjectID objectID = (ObjectID)it.next();
79                             removedCarrierCount += cache.remove(objectID);
80                         }
81                         LOGGER.info("Removed " + removedCarrierCount + " carriers from the cache.");
82
83                         cache.unsubscribeObjectIDs(
84                                 changeObjectIDs,
85                                 cache.getCacheCfMod().getLocalListenerReactionTimeMSec());
86
87                         ChangeManager.sharedInstance().notify(
88                                 new NotificationEvent(this, changeObjectIDs));
89                     } // if (changeObjectIDs != null) {
90

91                 } catch (Throwable JavaDoc t) {
92                     LOGGER.error("Error in NotificationThread!", t);
93                     jdoManager = null;
94                     long lastErrorTimeDiff = System.currentTimeMillis() - lastErrorDT;
95                     long threadErrorWaitMSec = cache.getCacheCfMod().getThreadErrorWaitMSec();
96                     if (lastErrorTimeDiff < threadErrorWaitMSec) {
97                         try {
98                             sleep(threadErrorWaitMSec - lastErrorTimeDiff);
99                         } catch (InterruptedException JavaDoc e) {
100                             // ignore
101
}
102                     }
103                     lastErrorDT = System.currentTimeMillis();
104                 }
105             }
106         }
107     }
108
109     private CacheManagerThread cacheManagerThread = new CacheManagerThread(this);
110
111     protected static class CacheManagerThread extends Thread JavaDoc
112     {
113         private static volatile int nextID = 0;
114
115         private Cache cache;
116         private long lastResyncDT = System.currentTimeMillis();
117
118         public CacheManagerThread(Cache cache)
119         {
120             this.cache = cache;
121             setName("Cache.CacheManagerThread-" + (nextID++));
122         }
123
124         private Set JavaDoc currentlySubscribedObjectIDs = new HashSet JavaDoc();
125
126         /**
127          * @see java.lang.Thread#run()
128          */

129         public void run()
130         {
131             long lastErrorDT = 0;
132             JDOManager jdoManager = null;
133             boolean resync;
134
135             while (!isInterrupted()) {
136                 try {
137                     try {
138                         sleep(cache.getCacheCfMod().getCacheManagerThreadIntervalMSec());
139                     } catch (InterruptedException JavaDoc x) {
140                         // ignore
141
}
142
143                     // *** container management below ***
144
// (this might affect the listeners to be removed - NO: We don't drop listeners when we remove objects from Cache! Marco.)
145
CarrierContainer activeCarrierContainer = cache.getActiveCarrierContainer();
146                     if (System.currentTimeMillis() - activeCarrierContainer.getCreateDT() > cache.getCacheCfMod().getCarrierContainerActivityMSec()) {
147                         cache.rollCarrierContainers();
148                     }
149                     // *** container management above ***
150

151                     // *** listener management below ***
152
resync = System.currentTimeMillis() - lastResyncDT > cache.getCacheCfMod().getResyncRemoteListenersIntervalMSec();
153
154                     if (jdoManager == null)
155                         jdoManager = JDOManagerUtil.getHome(Login.getLogin().getInitialContextProperties()).create();
156
157                     Map JavaDoc subscriptionChanges = cache.fetchSubscriptionChangeRequests();
158                     if (LOGGER.isDebugEnabled())
159                         LOGGER.debug("Thread found " + subscriptionChanges.size() + " subscription change requests.");
160
161                     Map JavaDoc newSubscriptionChanges = null;
162                     LinkedList JavaDoc objectIDsToSubscribe = null;
163                     LinkedList JavaDoc objectIDsToUnsubscribe = null;
164                     boolean restoreCurrentlySubscribedObjectIDs = true;
165                     try {
166                         long now = System.currentTimeMillis();
167                         for (Iterator JavaDoc it = subscriptionChanges.entrySet().iterator(); it.hasNext(); ) {
168                             Map.Entry JavaDoc me = (Map.Entry JavaDoc) it.next();
169                             Object JavaDoc objectID = me.getKey();
170                             SubscriptionChangeRequest scr = (SubscriptionChangeRequest) me.getValue();
171
172                             if (scr.getScheduledActionDT() > now) {
173                                 if (LOGGER.isDebugEnabled())
174                                     LOGGER.debug("Subscription change request " + scr.toString() + " is delayed and will be processed in about " + (scr.getScheduledActionDT() - now) + " msec.");
175
176                                 if (newSubscriptionChanges == null)
177                                     newSubscriptionChanges = new HashMap JavaDoc();
178
179                                 newSubscriptionChanges.put(objectID, scr);
180                             }
181                             else {
182                                 if (scr.getAction() == SubscriptionChangeRequest.ACTION_REMOVE) {
183                                 // remove
184
currentlySubscribedObjectIDs.remove(objectID);
185
186                                     if (objectIDsToUnsubscribe == null)
187                                         objectIDsToUnsubscribe = new LinkedList JavaDoc();
188
189                                     objectIDsToUnsubscribe.add(objectID);
190                                 }
191                                 else {
192                                 // add
193
// if there exists already one, we don't register it again => ignore
194
if (!currentlySubscribedObjectIDs.contains(objectID)) {
195                                         if (objectIDsToSubscribe == null)
196                                             objectIDsToSubscribe = new LinkedList JavaDoc();
197
198                                         objectIDsToSubscribe.add(objectID);
199                                         currentlySubscribedObjectIDs.add(objectID);
200                                     }
201                                     else {
202                                         if (LOGGER.isDebugEnabled())
203                                             LOGGER.debug("Subscription change request " + scr.toString() + " is ignored, because there exists already a listener.");
204                                     }
205                                 }
206                             }
207                         }
208
209                         if (resync) {
210                             LOGGER.info("Synchronizing remote change listeners.");
211
212                             jdoManager.resubscribeAllChangeListeners(
213                                     cache.getCacheSessionID(),
214                                     currentlySubscribedObjectIDs);
215
216                             lastResyncDT = System.currentTimeMillis();
217                         }
218                         else {
219                             if (objectIDsToUnsubscribe != null || objectIDsToSubscribe != null) {
220                                 LOGGER.info(
221                                         "Adding " +
222                                         (objectIDsToSubscribe == null ? 0 : objectIDsToSubscribe.size()) +
223                                         " and removing " +
224                                         (objectIDsToUnsubscribe == null ? 0 : objectIDsToUnsubscribe.size()) +
225                                         " remote change listeners.");
226
227                                 if (LOGGER.isDebugEnabled()) {
228                                     LOGGER.debug("Change listeners for the following ObjectIDs will be removed:");
229                                     if (objectIDsToUnsubscribe == null)
230                                         LOGGER.debug(" NONE!");
231                                     else {
232                                         for (Iterator JavaDoc it = objectIDsToUnsubscribe.iterator(); it.hasNext(); )
233                                             LOGGER.debug(" " + it.next());
234                                     }
235
236                                     LOGGER.debug("Change listeners for the following ObjectIDs will be added:");
237                                     if (objectIDsToSubscribe == null)
238                                         LOGGER.debug(" NONE!");
239                                     else {
240                                         for (Iterator JavaDoc it = objectIDsToSubscribe.iterator(); it.hasNext(); )
241                                             LOGGER.debug(" " + it.next());
242                                     }
243                                 }
244
245                                 jdoManager.removeAddChangeListeners(
246                                         cache.getCacheSessionID(),
247                                         objectIDsToUnsubscribe,
248                                         objectIDsToSubscribe);
249                             }
250                         }
251
252                         restoreCurrentlySubscribedObjectIDs = false; // this is only executed, if there is no exception before
253
} finally {
254                         if (restoreCurrentlySubscribedObjectIDs) {
255                             LOGGER.warn("An error occured - will restore previous subscription change requests.");
256
257                             if (objectIDsToSubscribe != null)
258                                 currentlySubscribedObjectIDs.removeAll(objectIDsToSubscribe);
259                             // I think it is better to have a listener too much than one missing,
260
// therefore I don't restore the ones which were unsubscribed
261
// and I don't know whether it really was in the set...
262

263                             cache.restoreOldSubscriptionChangeRequests(subscriptionChanges);
264                         }
265                         else
266                             cache.restoreOldSubscriptionChangeRequests(newSubscriptionChanges);
267                     }
268                     // *** listener management above ***
269

270                 } catch (Throwable JavaDoc t) {
271                     LOGGER.error("Error in NotificationThread!", t);
272                     jdoManager = null;
273                     long lastErrorTimeDiff = System.currentTimeMillis() - lastErrorDT;
274                     long threadErrorWaitMSec = cache.getCacheCfMod().getThreadErrorWaitMSec();
275                     if (lastErrorTimeDiff < threadErrorWaitMSec) {
276                         try {
277                             sleep(threadErrorWaitMSec - lastErrorTimeDiff);
278                         } catch (InterruptedException JavaDoc e) {
279                             // ignore
280
}
281                     }
282                     lastErrorDT = System.currentTimeMillis();
283                 }
284             }
285         }
286     }
287
288     /**
289      * key: Object objectID<br/>
290      * value: SubscriptionChangeRequest
291      * <p>
292      * The
293      * {@link CacheManagerThread} will fetch them (if non-empty) and
294      * register/unregisters listeners for them. When doing this, it will replace
295      * the <tt>Set</tt> by a newly created empty <tt>HashSet</tt>. After it has done
296      * its work, it will merge the not-yet-performed changes (they can be delayed)
297      * with the new changes that have been added here while the <tt>CacheManagerThread</tt>
298      * was busy.
299      */

300     private Map JavaDoc subscriptionChangeRequests = new HashMap JavaDoc();
301     private Object JavaDoc subscriptionChangeRequestsMutex = new Object JavaDoc();
302
303     /**
304      * @return Returns the {@link #objectIDsToSubscribe} and replaces
305      * the field by a new instance of <tt>HashMap</tt>.
306      */

307     protected Map JavaDoc fetchSubscriptionChangeRequests()
308     {
309         Map JavaDoc res;
310         synchronized (subscriptionChangeRequestsMutex) {
311             res = subscriptionChangeRequests;
312             subscriptionChangeRequests = new HashMap JavaDoc();
313         }
314         return res;
315     }
316
317     /**
318      * This method transfers (and by that overwrites older values)
319      * the new data (from {@link #subscriptionChangeRequests}) to <tt>oldChanges</tt>.
320      * Then, it replaces the field <tt>subscriptionChangeRequests</tt> with
321      * <tt>oldChanges</tt>.
322      * <p>
323      * This is called by the {@link CacheManagerThread} in case of failure or if
324      * some of the fetched {@link SubscriptionChangeRequest}s are delayed and could not
325      * yet be done.
326      *
327      * @param oldChangeRequests Either <tt>null</tt> or a <tt>Map</tt> like the one
328      * returned from {@link #fetchSubscriptionChangeRequests()}.
329      */

330     protected void restoreOldSubscriptionChangeRequests(Map JavaDoc oldChangeRequests)
331     {
332         if (oldChangeRequests == null || oldChangeRequests.isEmpty()) {
333             LOGGER.debug("There are no old subscription change requests. Won't do anything.");
334             return;
335         }
336
337         if (LOGGER.isDebugEnabled()) {
338             LOGGER.debug("Restoring older subscription change requests:");
339             for (Iterator JavaDoc it = oldChangeRequests.values().iterator(); it.hasNext(); ) {
340                 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) it.next();
341                 LOGGER.debug(" " + scr);
342             }
343         }
344
345         synchronized (subscriptionChangeRequestsMutex) {
346             if (LOGGER.isDebugEnabled()) {
347                 if (subscriptionChangeRequests.isEmpty())
348                     LOGGER.debug("There are no new subscription change requests to merge into the old ones. Simply replacing them.");
349                 else {
350                     LOGGER.debug("There are new subscription change requests which will be merged into the old ones:");
351                     for (Iterator JavaDoc it = subscriptionChangeRequests.values().iterator(); it.hasNext(); ) {
352                         SubscriptionChangeRequest scr = (SubscriptionChangeRequest) it.next();
353                         LOGGER.debug(" " + scr);
354                     }
355                 }
356             }
357
358             oldChangeRequests.putAll(subscriptionChangeRequests);
359             subscriptionChangeRequests = oldChangeRequests;
360
361             if (LOGGER.isDebugEnabled()) {
362                 LOGGER.debug("These are the subscription change requests after restore:");
363                 for (Iterator JavaDoc it = subscriptionChangeRequests.values().iterator(); it.hasNext(); ) {
364                     SubscriptionChangeRequest scr = (SubscriptionChangeRequest) it.next();
365                     LOGGER.debug(" " + scr);
366                 }
367             }
368         }
369     }
370
371     /**
372      * This method subscribes the given <tt>objectID</tt> asynchronously
373      * via the {@link CacheManagerThread} (which calls
374      * {@link JDOManager#addChangeListeners(java.lang.String, java.util.Collection)}).
375      */

376     protected void subscribeObjectID(Object JavaDoc objectID, long delayMSec)
377     {
378         synchronized (subscriptionChangeRequestsMutex) {
379             SubscriptionChangeRequest scr = (SubscriptionChangeRequest) subscriptionChangeRequests.get(objectID);
380             if (scr != null) {
381                 if (scr.getAction() == SubscriptionChangeRequest.ACTION_ADD
382                         &&
383                         scr.getScheduledActionDT() < System.currentTimeMillis() + delayMSec) {
384                     if (LOGGER.isDebugEnabled())
385                         LOGGER.debug("Ignoring request to subscribe ObjectID, because there is already a request, which is scheduled earlier. ObjectID: " + objectID);
386
387                     return;
388                 }
389             }
390
391             subscriptionChangeRequests.put(
392                     objectID,
393                     new SubscriptionChangeRequest(SubscriptionChangeRequest.ACTION_ADD,
394                     objectID,
395                     delayMSec));
396         }
397     }
398
399     /**
400      * This method unsubscribes the given <tt>objectIDs</tt> asynchronously
401      * via the {@link CacheManagerThread} (which calls
402      * {@link JDOManager#removeChangeListeners(java.lang.String, java.util.Collection)}).
403      */

404     protected void unsubscribeObjectIDs(Collection JavaDoc objectIDs, long delayMSec)
405     {
406         synchronized (subscriptionChangeRequestsMutex) {
407             for (Iterator JavaDoc it = objectIDs.iterator(); it.hasNext(); ) {
408                 Object JavaDoc objectID = it.next();
409
410                 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) subscriptionChangeRequests.get(objectID);
411                 if (scr != null) {
412                     if (scr.getAction() == SubscriptionChangeRequest.ACTION_REMOVE
413                             &&
414                             scr.getScheduledActionDT() < System.currentTimeMillis() + delayMSec) {
415                         if (LOGGER.isDebugEnabled())
416                             LOGGER.debug("Ignoring request to unsubscribe ObjectID, because there is already a request, which is scheduled earlier. ObjectID: " + objectID);
417
418                         return;
419                     }
420                 }
421
422                 subscriptionChangeRequests.put(
423                         objectID,
424                         new SubscriptionChangeRequest(SubscriptionChangeRequest.ACTION_REMOVE,
425                                 objectID,
426                                 delayMSec));
427             }
428         }
429     }
430
431     /**
432      * @return Returns the singleton of this class.
433      *
434      * @throws In case the cache needs to be created and a {@link ConfigException} occurs while obtaining {@link CacheCfMod}.
435      */

436     public static Cache sharedInstance()
437     {
438         try {
439             if (_sharedInstance == null)
440                 _sharedInstance = new Cache();
441     
442             return _sharedInstance;
443         } catch (ConfigException e) {
444             throw new RuntimeException JavaDoc(e);
445         }
446     }
447
448     private String JavaDoc cacheSessionID = null;
449
450     public synchronized String JavaDoc getCacheSessionID()
451     {
452         if (cacheSessionID == null) {
453             Base62Coder coder = Base62Coder.sharedInstance();
454             cacheSessionID = coder.encode(System.currentTimeMillis(), 1) + '-' + coder.encode((long)(Math.random() * 10000), 1);
455             LOGGER.info("The Cache doesn't have a cacheSessionID. Assigning \""+getCacheSessionID()+"\"");
456         }
457
458         return cacheSessionID;
459     }
460
461     /**
462      * key: {@link Key} key<br/>
463      * value: {@link Carrier} carrier
464      */

465     private Map JavaDoc carriersByKey = new HashMap JavaDoc();
466
467     /**
468      * key: Object objectID<br/>
469      * value: Set of Key
470      */

471     private Map JavaDoc keySetsByObjectID = new HashMap JavaDoc();
472
473     /**
474      * This is a rolling carrier-registration with the activeCarrierContainer
475      * being the first entry.
476      */

477     private LinkedList JavaDoc carrierContainers = new LinkedList JavaDoc();
478     private CarrierContainer activeCarrierContainer;
479
480     protected CarrierContainer getActiveCarrierContainer()
481     {
482         return activeCarrierContainer;
483     }
484
485     protected synchronized void rollCarrierContainers()
486     {
487         LOGGER.info("Creating new activeCarrierContainer.");
488         CarrierContainer newActiveCC = new CarrierContainer(this);
489         carrierContainers.addFirst(newActiveCC);
490         activeCarrierContainer = newActiveCC;
491
492         long carrierContainerCount = getCacheCfMod().getCarrierContainerCount();
493
494         if (carrierContainerCount < 2)
495             throw new IllegalStateException JavaDoc("carrierContainerCount = "+carrierContainerCount+" but must be at least 2!!!");
496
497         while (carrierContainers.size() > carrierContainerCount) {
498             CarrierContainer cc = (CarrierContainer) carrierContainers.removeLast();
499             LOGGER.info("Dropping carrierContainer (created " + cc.getCreateDT() + ")");
500             cc.close();
501         }
502     }
503
504     private CacheCfMod cacheCfMod;
505
506     protected CacheCfMod getCacheCfMod()
507     {
508         return cacheCfMod;
509     }
510
511     /**
512      * Should not be used! Use {@link #sharedInstance()} instead!
513      *
514      * @throws ConfigException If it fails to obtain the {@link CacheCfMod} config module.
515      */

516     protected Cache() throws ConfigException
517     {
518         LOGGER.info("Creating new Cache instance.");
519         cacheCfMod = (CacheCfMod) Config.sharedInstance().createConfigModule(CacheCfMod.class);
520         Config.sharedInstance().saveConfFile(); // TODO remove this as soon as we have a thread that periodically saves it.
521
activeCarrierContainer = new CarrierContainer(this);
522         carrierContainers.addFirst(activeCarrierContainer);
523         notificationThread.start();
524         cacheManagerThread.start();
525     }
526
527     /**
528      * @param sa Either <tt>null</tt> or an array of <tt>String</tt>.
529      * @return Returns <tt>null</tt> if <tt>sa</tt> is <tt>null</tt> or a new instance
530      * of <tt>HashSet</tt> with all entries from the String array.
531      */

532     protected static Set JavaDoc stringArray2Set(String JavaDoc[] sa)
533     {
534         if (sa == null)
535             return null;
536
537         Set JavaDoc set = new HashSet JavaDoc(sa.length);
538         for (int i = 0; i < sa.length; ++i)
539             set.add(sa[i]);
540
541         return set;
542     }
543
544     /**
545      * This method calls {@link #get(String, Object, Set)} - look there for more details.
546      *
547      * @param scope Either <tt>null</tt> (default) or a <tt>String</tt> to separate
548      * a special namespace in the cache. This is necessary, if the method with which
549      * the object has been fetched from the server has modified the object and it
550      * therefore differs from the result of the default fetch method, even if the
551      * <tt>fetchGroups</tt> are identical.
552      * @param objectID A non-<tt>null</tt> JDO object ID referencing a persistence-capable
553      * object which may exist in this cache.
554      * @param fetchGroups Either <tt>null</tt> or a String array of those JDO fetch groups,
555      * that should have been used to retrieve the object. This String array is
556      * transformed to a <tt>Set</tt>.
557      *
558      * @return Returns either <tt>null</tt> or the desired JDO object.
559      */

560     public Object JavaDoc get(String JavaDoc scope, Object JavaDoc objectID, String JavaDoc[] fetchGroups)
561     {
562         return get(scope, objectID, stringArray2Set(fetchGroups));
563     }
564
565     /**
566      * Use this method to retrieve an object from the cache. This method updates
567      * the access timestamp, if an object has been found.
568      *
569      * @param scope Either <tt>null</tt> (default) or a <tt>String</tt> to separate
570      * a special namespace in the cache. This is necessary, if the method with which
571      * the object has been fetched from the server has modified the object and it
572      * therefore differs from the result of the default fetch method, even if the
573      * <tt>fetchGroups</tt> are identical.
574      * @param objectID A non-<tt>null</tt> JDO object ID referencing a persistence-capable
575      * object which may exist in this cache.
576      * @param fetchGroups Either <tt>null</tt> or a Set of String with those JDO fetch
577      * groups, that should have been used to retrieve the object.
578      *
579      * @return Returns either <tt>null</tt> or the desired JDO object.
580      */

581     public synchronized Object JavaDoc get(String JavaDoc scope, Object JavaDoc objectID, Set JavaDoc fetchGroups)
582     {
583         Key key = new Key(scope, objectID, fetchGroups);
584         Carrier carrier = (Carrier) carriersByKey.get(key);
585         if (carrier == null) {
586             if (LOGGER.isDebugEnabled())
587                 LOGGER.debug("No Carrier found for key: " + key.toString());
588
589             return null;
590         }
591
592         Object JavaDoc object = carrier.getObject();
593         if (object == null) { // remove the unnecessary keys from all indices.
594
if (LOGGER.isDebugEnabled())
595                 LOGGER.debug("Found Carrier, but object has already been released by the garbage collector! key: " + key.toString());
596
597             remove(key);
598             return null;
599         }
600
601         if (LOGGER.isDebugEnabled())
602             LOGGER.debug("Found Carrier - will return object from cache. key: " + key.toString());
603
604         carrier.setAccessDT();
605         return object;
606     }
607
608     public void putAll(String JavaDoc scope, Collection JavaDoc objects, String JavaDoc[] fetchGroups)
609     {
610         putAll(scope, objects, stringArray2Set(fetchGroups));
611     }
612
613     public void putAll(String JavaDoc scope, Collection JavaDoc objects, Set JavaDoc fetchGroups)
614     {
615         if (objects == null)
616             throw new NullPointerException JavaDoc("objects must not be null!");
617
618         for (Iterator JavaDoc it = objects.iterator(); it.hasNext(); )
619             put(scope, it.next(), fetchGroups);
620     }
621
622     public void put(String JavaDoc scope, Object JavaDoc object, String JavaDoc[] fetchGroups)
623     {
624         put(scope, object, stringArray2Set(fetchGroups));
625     }
626
627     public void put(String JavaDoc scope, Object JavaDoc object, Set JavaDoc fetchGroups)
628     {
629         if (object == null)
630             throw new NullPointerException JavaDoc("object must not be null!");
631
632         // obtain the objectID, create a key and a carrier
633
Object JavaDoc objectID = JDOHelper.getObjectId(object);
634         if (objectID == null)
635             throw new IllegalArgumentException JavaDoc("Could not obtain a JDO objectID from: "+object);
636         Key key = new Key(scope, objectID, fetchGroups);
637
638         if (LOGGER.isDebugEnabled())
639             LOGGER.debug("Putting object into cache. key: " + key.toString());
640
641         synchronized (this) {
642             // remove the old carrier - if existing
643
Carrier oldCarrier = (Carrier) carriersByKey.get(key);
644             if (oldCarrier != null) {
645                 if (LOGGER.isDebugEnabled())
646                     LOGGER.debug("There was an old carrier for the same key in the cache; removing it. key: " + key.toString());
647
648                 oldCarrier.setCarrierContainer(null);
649             }
650
651             // the constructor of Carrier self-registers in the active CarrierContainer
652
Carrier carrier = new Carrier(key, object, getActiveCarrierContainer());
653     
654             // store the new carrier in our main cache map
655
carriersByKey.put(key, carrier);
656     
657             // register the key by its objectID (for fast removal if the object changed)
658
Set JavaDoc keySet = (Set JavaDoc) keySetsByObjectID.get(objectID);
659             if (keySet == null) {
660                 keySet = new HashSet JavaDoc();
661                 keySetsByObjectID.put(objectID, keySet);
662             }
663             keySet.add(key);
664         }
665
666         // register the class of the pc object in the JDOObjectID2PCClassMap to minimize lookups.
667
JDOObjectID2PCClassMap.sharedInstance().initPersistenceCapableClass(
668                 objectID, object.getClass());
669
670         // Subscribe a listener to this object. This will not necessarily cause a remote
671
// method call, because the CacheManagerThread keeps a Set of all registered listeners
672
// and minimizes traffic by that.
673
subscribeObjectID(objectID, 0);
674     }
675
676     /**
677      * This method is called by {@link CarrierContainer#close()} and removes
678      * the <tt>Carrier</tt> specified by the given <tt>Key</tt> from the <tt>Cache</tt>.
679      * Note, that this method DOES NOT unregister server-sided listeners in any case!
680      *
681      * @param key The key referencing the <tt>Carrier</tt> that shall be removed.
682      */

683     protected synchronized void remove(Key key)
684     {
685         if (LOGGER.isDebugEnabled())
686             LOGGER.debug("Removing Carrier for key: " + key.toString());
687
688         Object JavaDoc objectID = key.getObjectID();
689
690         Set JavaDoc keySet = (Set JavaDoc) keySetsByObjectID.get(objectID);
691         if (keySet != null) {
692             keySet.remove(key);
693
694             if (keySet.isEmpty())
695                 keySetsByObjectID.remove(objectID);
696         }
697         Carrier oldCarrier = (Carrier) carriersByKey.remove(key);
698         if (oldCarrier != null)
699             oldCarrier.setCarrierContainer(null);
700     }
701
702     /**
703      * This method is called by the <tt>NotificationThread</tt>, if an object has
704      * been changed. It will
705      * remove all cached instances (all scopes & all fetch-groups) of the
706      * JDO object which is referenced by the ID <tt>objectID</tt>. It will NOT fire
707      * any local notifications! This is done by the <tt>NotificationThread</tt>.
708      * It will NOT deregister any server-sided listeners, either. This is done by
709      * the <tt>NotificationThread</tt>, too.
710      *
711      * @param objectID The JDO object-id of the persistance-capable object.
712      */

713     protected synchronized int remove(ObjectID objectID)
714     {
715         LOGGER.debug("Removing all Carriers for objectID: " + objectID);
716
717         Set JavaDoc keySet = (Set JavaDoc) keySetsByObjectID.remove(objectID);
718         if (keySet == null)
719             return 0;
720
721         int removedCarrierCount = 0;
722         for (Iterator JavaDoc it = keySet.iterator(); it.hasNext(); ) {
723             Key key = (Key) it.next();
724             Carrier carrier = (Carrier) carriersByKey.remove(key);
725             if (carrier != null) {
726                 if (LOGGER.isDebugEnabled())
727                     LOGGER.debug("Removing Carrier: key=\""+key.toString()+"\"");
728
729                 carrier.setCarrierContainer(null);
730                 removedCarrierCount++;
731             }
732             else
733                 LOGGER.warn("There was a key in the keySetsByObjectID, but no carrier for it in carriersByKey! key=\""+key.toString()+"\"");
734         }
735         return removedCarrierCount;
736     }
737
738 // /**
739
// * This method must be called, if an object has been changed. It will
740
// * remove all cached instances (all scopes & all fetch-groups) of the
741
// * JDO object which is referenced by the ID <tt>objectID</tt>. Additionally,
742
// * the {@link ChangeManager} singleton will be notified for the affected
743
// * class.
744
// *
745
// * @param objectID The JDO object-id of the persistance-capable object.
746
// */
747
// public synchronized void notifyChange(Object source, Object objectID)
748
// {
749
// Set keySet = (Set) keySetsByObjectID.get(objectID);
750
// if (keySet != null) {
751
//
752
// for (Iterator it = keySet.iterator(); it.hasNext(); ) {
753
// Key key = (Key) it.next();
754
// carriersByKey.remove(key);
755
// }
756
// }
757
//
758
// Class clazz = JDOObjectID2PCClassMap.sharedInstance().getPersistenceCapableClass(objectID);
759
// ChangeManager.sharedInstance().notify(
760
// new NotificationEvent(source, objectID));
761
// }
762

763     public synchronized void closeCacheSession()
764     throws ModuleException
765     {
766         try {
767             JDOManager jm = JDOManagerUtil.getHome(Login.getLogin().getInitialContextProperties()).create();
768
769             // remove all listeners for this session - done by remote closeCacheSession(...)
770
jm.closeCacheSession(cacheSessionID);
771
772             // stop the listener thread
773

774             // clear the cache
775
carriersByKey.clear();
776             keySetsByObjectID.clear();
777
778             // forget the cacheSessionID - a new one will automatically generated
779
cacheSessionID = null;
780         } catch (ModuleException x) {
781             throw x;
782         } catch (Exception JavaDoc x) {
783             throw new ModuleException(x);
784         }
785     }
786 }
787
Popular Tags