1 4 package com.nightlabs.ipanema.jdo.cache; 5 6 import java.util.Collection ; 7 import java.util.HashMap ; 8 import java.util.HashSet ; 9 import java.util.Iterator ; 10 import java.util.LinkedList ; 11 import java.util.Map ; 12 import java.util.Set ; 13 14 import javax.jdo.JDOHelper; 15 16 import org.apache.log4j.Logger; 17 18 import com.nightlabs.ModuleException; 19 import com.nightlabs.config.Config; 20 import com.nightlabs.config.ConfigException; 21 import com.nightlabs.ipanema.base.login.Login; 22 import com.nightlabs.ipanema.jdo.JDOManager; 23 import com.nightlabs.ipanema.jdo.JDOManagerUtil; 24 import com.nightlabs.ipanema.jdo.JDOObjectID2PCClassMap; 25 import com.nightlabs.jdo.ObjectID; 26 import com.nightlabs.math.Base62Coder; 27 import com.nightlabs.notification.NotificationEvent; 28 import com.nightlabs.rcp.notification.ChangeManager; 29 30 37 public class Cache 38 { 39 public static Logger LOGGER = Logger.getLogger(Cache.class); 40 41 private static Cache _sharedInstance = null; 42 43 private NotificationThread notificationThread = new NotificationThread(this); 44 45 protected static class NotificationThread extends Thread 46 { 47 private static volatile int nextID = 0; 48 49 private Cache cache; 50 51 public NotificationThread(Cache cache) 52 { 53 this.cache = cache; 54 setName("Cache.NotificationThread-" + (nextID++)); 55 } 56 57 60 public void run() 61 { 62 long lastErrorDT = 0; 63 JDOManager jdoManager = null; 64 65 while (!isInterrupted()) { 66 try { 67 if (jdoManager == null) 68 jdoManager = JDOManagerUtil.getHome(Login.getLogin().getInitialContextProperties()).create(); 69 70 Collection changeObjectIDs = jdoManager.waitForChanges( 71 cache.getCacheSessionID(), 72 cache.getCacheCfMod().getWaitForChangesTimeoutMSec()); 73 74 if (changeObjectIDs != null) { 75 LOGGER.info("Received change notification with " + changeObjectIDs.size() + " objectIDs."); 76 int removedCarrierCount = 0; 77 for (Iterator it = changeObjectIDs.iterator(); it.hasNext(); ) { 78 ObjectID objectID = (ObjectID)it.next(); 79 removedCarrierCount += cache.remove(objectID); 80 } 81 LOGGER.info("Removed " + removedCarrierCount + " carriers from the cache."); 82 83 cache.unsubscribeObjectIDs( 84 changeObjectIDs, 85 cache.getCacheCfMod().getLocalListenerReactionTimeMSec()); 86 87 ChangeManager.sharedInstance().notify( 88 new NotificationEvent(this, changeObjectIDs)); 89 } 91 } catch (Throwable t) { 92 LOGGER.error("Error in NotificationThread!", t); 93 jdoManager = null; 94 long lastErrorTimeDiff = System.currentTimeMillis() - lastErrorDT; 95 long threadErrorWaitMSec = cache.getCacheCfMod().getThreadErrorWaitMSec(); 96 if (lastErrorTimeDiff < threadErrorWaitMSec) { 97 try { 98 sleep(threadErrorWaitMSec - lastErrorTimeDiff); 99 } catch (InterruptedException e) { 100 } 102 } 103 lastErrorDT = System.currentTimeMillis(); 104 } 105 } 106 } 107 } 108 109 private CacheManagerThread cacheManagerThread = new CacheManagerThread(this); 110 111 protected static class CacheManagerThread extends Thread 112 { 113 private static volatile int nextID = 0; 114 115 private Cache cache; 116 private long lastResyncDT = System.currentTimeMillis(); 117 118 public CacheManagerThread(Cache cache) 119 { 120 this.cache = cache; 121 setName("Cache.CacheManagerThread-" + (nextID++)); 122 } 123 124 private Set currentlySubscribedObjectIDs = new HashSet (); 125 126 129 public void run() 130 { 131 long lastErrorDT = 0; 132 JDOManager jdoManager = null; 133 boolean resync; 134 135 while (!isInterrupted()) { 136 try { 137 try { 138 sleep(cache.getCacheCfMod().getCacheManagerThreadIntervalMSec()); 139 } catch (InterruptedException x) { 140 } 142 143 CarrierContainer activeCarrierContainer = cache.getActiveCarrierContainer(); 146 if (System.currentTimeMillis() - activeCarrierContainer.getCreateDT() > cache.getCacheCfMod().getCarrierContainerActivityMSec()) { 147 cache.rollCarrierContainers(); 148 } 149 151 resync = System.currentTimeMillis() - lastResyncDT > cache.getCacheCfMod().getResyncRemoteListenersIntervalMSec(); 153 154 if (jdoManager == null) 155 jdoManager = JDOManagerUtil.getHome(Login.getLogin().getInitialContextProperties()).create(); 156 157 Map subscriptionChanges = cache.fetchSubscriptionChangeRequests(); 158 if (LOGGER.isDebugEnabled()) 159 LOGGER.debug("Thread found " + subscriptionChanges.size() + " subscription change requests."); 160 161 Map newSubscriptionChanges = null; 162 LinkedList objectIDsToSubscribe = null; 163 LinkedList objectIDsToUnsubscribe = null; 164 boolean restoreCurrentlySubscribedObjectIDs = true; 165 try { 166 long now = System.currentTimeMillis(); 167 for (Iterator it = subscriptionChanges.entrySet().iterator(); it.hasNext(); ) { 168 Map.Entry me = (Map.Entry ) it.next(); 169 Object objectID = me.getKey(); 170 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) me.getValue(); 171 172 if (scr.getScheduledActionDT() > now) { 173 if (LOGGER.isDebugEnabled()) 174 LOGGER.debug("Subscription change request " + scr.toString() + " is delayed and will be processed in about " + (scr.getScheduledActionDT() - now) + " msec."); 175 176 if (newSubscriptionChanges == null) 177 newSubscriptionChanges = new HashMap (); 178 179 newSubscriptionChanges.put(objectID, scr); 180 } 181 else { 182 if (scr.getAction() == SubscriptionChangeRequest.ACTION_REMOVE) { 183 currentlySubscribedObjectIDs.remove(objectID); 185 186 if (objectIDsToUnsubscribe == null) 187 objectIDsToUnsubscribe = new LinkedList (); 188 189 objectIDsToUnsubscribe.add(objectID); 190 } 191 else { 192 if (!currentlySubscribedObjectIDs.contains(objectID)) { 195 if (objectIDsToSubscribe == null) 196 objectIDsToSubscribe = new LinkedList (); 197 198 objectIDsToSubscribe.add(objectID); 199 currentlySubscribedObjectIDs.add(objectID); 200 } 201 else { 202 if (LOGGER.isDebugEnabled()) 203 LOGGER.debug("Subscription change request " + scr.toString() + " is ignored, because there exists already a listener."); 204 } 205 } 206 } 207 } 208 209 if (resync) { 210 LOGGER.info("Synchronizing remote change listeners."); 211 212 jdoManager.resubscribeAllChangeListeners( 213 cache.getCacheSessionID(), 214 currentlySubscribedObjectIDs); 215 216 lastResyncDT = System.currentTimeMillis(); 217 } 218 else { 219 if (objectIDsToUnsubscribe != null || objectIDsToSubscribe != null) { 220 LOGGER.info( 221 "Adding " + 222 (objectIDsToSubscribe == null ? 0 : objectIDsToSubscribe.size()) + 223 " and removing " + 224 (objectIDsToUnsubscribe == null ? 0 : objectIDsToUnsubscribe.size()) + 225 " remote change listeners."); 226 227 if (LOGGER.isDebugEnabled()) { 228 LOGGER.debug("Change listeners for the following ObjectIDs will be removed:"); 229 if (objectIDsToUnsubscribe == null) 230 LOGGER.debug(" NONE!"); 231 else { 232 for (Iterator it = objectIDsToUnsubscribe.iterator(); it.hasNext(); ) 233 LOGGER.debug(" " + it.next()); 234 } 235 236 LOGGER.debug("Change listeners for the following ObjectIDs will be added:"); 237 if (objectIDsToSubscribe == null) 238 LOGGER.debug(" NONE!"); 239 else { 240 for (Iterator it = objectIDsToSubscribe.iterator(); it.hasNext(); ) 241 LOGGER.debug(" " + it.next()); 242 } 243 } 244 245 jdoManager.removeAddChangeListeners( 246 cache.getCacheSessionID(), 247 objectIDsToUnsubscribe, 248 objectIDsToSubscribe); 249 } 250 } 251 252 restoreCurrentlySubscribedObjectIDs = false; } finally { 254 if (restoreCurrentlySubscribedObjectIDs) { 255 LOGGER.warn("An error occured - will restore previous subscription change requests."); 256 257 if (objectIDsToSubscribe != null) 258 currentlySubscribedObjectIDs.removeAll(objectIDsToSubscribe); 259 263 cache.restoreOldSubscriptionChangeRequests(subscriptionChanges); 264 } 265 else 266 cache.restoreOldSubscriptionChangeRequests(newSubscriptionChanges); 267 } 268 270 } catch (Throwable t) { 271 LOGGER.error("Error in NotificationThread!", t); 272 jdoManager = null; 273 long lastErrorTimeDiff = System.currentTimeMillis() - lastErrorDT; 274 long threadErrorWaitMSec = cache.getCacheCfMod().getThreadErrorWaitMSec(); 275 if (lastErrorTimeDiff < threadErrorWaitMSec) { 276 try { 277 sleep(threadErrorWaitMSec - lastErrorTimeDiff); 278 } catch (InterruptedException e) { 279 } 281 } 282 lastErrorDT = System.currentTimeMillis(); 283 } 284 } 285 } 286 } 287 288 300 private Map subscriptionChangeRequests = new HashMap (); 301 private Object subscriptionChangeRequestsMutex = new Object (); 302 303 307 protected Map fetchSubscriptionChangeRequests() 308 { 309 Map res; 310 synchronized (subscriptionChangeRequestsMutex) { 311 res = subscriptionChangeRequests; 312 subscriptionChangeRequests = new HashMap (); 313 } 314 return res; 315 } 316 317 330 protected void restoreOldSubscriptionChangeRequests(Map oldChangeRequests) 331 { 332 if (oldChangeRequests == null || oldChangeRequests.isEmpty()) { 333 LOGGER.debug("There are no old subscription change requests. Won't do anything."); 334 return; 335 } 336 337 if (LOGGER.isDebugEnabled()) { 338 LOGGER.debug("Restoring older subscription change requests:"); 339 for (Iterator it = oldChangeRequests.values().iterator(); it.hasNext(); ) { 340 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) it.next(); 341 LOGGER.debug(" " + scr); 342 } 343 } 344 345 synchronized (subscriptionChangeRequestsMutex) { 346 if (LOGGER.isDebugEnabled()) { 347 if (subscriptionChangeRequests.isEmpty()) 348 LOGGER.debug("There are no new subscription change requests to merge into the old ones. Simply replacing them."); 349 else { 350 LOGGER.debug("There are new subscription change requests which will be merged into the old ones:"); 351 for (Iterator it = subscriptionChangeRequests.values().iterator(); it.hasNext(); ) { 352 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) it.next(); 353 LOGGER.debug(" " + scr); 354 } 355 } 356 } 357 358 oldChangeRequests.putAll(subscriptionChangeRequests); 359 subscriptionChangeRequests = oldChangeRequests; 360 361 if (LOGGER.isDebugEnabled()) { 362 LOGGER.debug("These are the subscription change requests after restore:"); 363 for (Iterator it = subscriptionChangeRequests.values().iterator(); it.hasNext(); ) { 364 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) it.next(); 365 LOGGER.debug(" " + scr); 366 } 367 } 368 } 369 } 370 371 376 protected void subscribeObjectID(Object objectID, long delayMSec) 377 { 378 synchronized (subscriptionChangeRequestsMutex) { 379 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) subscriptionChangeRequests.get(objectID); 380 if (scr != null) { 381 if (scr.getAction() == SubscriptionChangeRequest.ACTION_ADD 382 && 383 scr.getScheduledActionDT() < System.currentTimeMillis() + delayMSec) { 384 if (LOGGER.isDebugEnabled()) 385 LOGGER.debug("Ignoring request to subscribe ObjectID, because there is already a request, which is scheduled earlier. ObjectID: " + objectID); 386 387 return; 388 } 389 } 390 391 subscriptionChangeRequests.put( 392 objectID, 393 new SubscriptionChangeRequest(SubscriptionChangeRequest.ACTION_ADD, 394 objectID, 395 delayMSec)); 396 } 397 } 398 399 404 protected void unsubscribeObjectIDs(Collection objectIDs, long delayMSec) 405 { 406 synchronized (subscriptionChangeRequestsMutex) { 407 for (Iterator it = objectIDs.iterator(); it.hasNext(); ) { 408 Object objectID = it.next(); 409 410 SubscriptionChangeRequest scr = (SubscriptionChangeRequest) subscriptionChangeRequests.get(objectID); 411 if (scr != null) { 412 if (scr.getAction() == SubscriptionChangeRequest.ACTION_REMOVE 413 && 414 scr.getScheduledActionDT() < System.currentTimeMillis() + delayMSec) { 415 if (LOGGER.isDebugEnabled()) 416 LOGGER.debug("Ignoring request to unsubscribe ObjectID, because there is already a request, which is scheduled earlier. ObjectID: " + objectID); 417 418 return; 419 } 420 } 421 422 subscriptionChangeRequests.put( 423 objectID, 424 new SubscriptionChangeRequest(SubscriptionChangeRequest.ACTION_REMOVE, 425 objectID, 426 delayMSec)); 427 } 428 } 429 } 430 431 436 public static Cache sharedInstance() 437 { 438 try { 439 if (_sharedInstance == null) 440 _sharedInstance = new Cache(); 441 442 return _sharedInstance; 443 } catch (ConfigException e) { 444 throw new RuntimeException (e); 445 } 446 } 447 448 private String cacheSessionID = null; 449 450 public synchronized String getCacheSessionID() 451 { 452 if (cacheSessionID == null) { 453 Base62Coder coder = Base62Coder.sharedInstance(); 454 cacheSessionID = coder.encode(System.currentTimeMillis(), 1) + '-' + coder.encode((long)(Math.random() * 10000), 1); 455 LOGGER.info("The Cache doesn't have a cacheSessionID. Assigning \""+getCacheSessionID()+"\""); 456 } 457 458 return cacheSessionID; 459 } 460 461 465 private Map carriersByKey = new HashMap (); 466 467 471 private Map keySetsByObjectID = new HashMap (); 472 473 477 private LinkedList carrierContainers = new LinkedList (); 478 private CarrierContainer activeCarrierContainer; 479 480 protected CarrierContainer getActiveCarrierContainer() 481 { 482 return activeCarrierContainer; 483 } 484 485 protected synchronized void rollCarrierContainers() 486 { 487 LOGGER.info("Creating new activeCarrierContainer."); 488 CarrierContainer newActiveCC = new CarrierContainer(this); 489 carrierContainers.addFirst(newActiveCC); 490 activeCarrierContainer = newActiveCC; 491 492 long carrierContainerCount = getCacheCfMod().getCarrierContainerCount(); 493 494 if (carrierContainerCount < 2) 495 throw new IllegalStateException ("carrierContainerCount = "+carrierContainerCount+" but must be at least 2!!!"); 496 497 while (carrierContainers.size() > carrierContainerCount) { 498 CarrierContainer cc = (CarrierContainer) carrierContainers.removeLast(); 499 LOGGER.info("Dropping carrierContainer (created " + cc.getCreateDT() + ")"); 500 cc.close(); 501 } 502 } 503 504 private CacheCfMod cacheCfMod; 505 506 protected CacheCfMod getCacheCfMod() 507 { 508 return cacheCfMod; 509 } 510 511 516 protected Cache() throws ConfigException 517 { 518 LOGGER.info("Creating new Cache instance."); 519 cacheCfMod = (CacheCfMod) Config.sharedInstance().createConfigModule(CacheCfMod.class); 520 Config.sharedInstance().saveConfFile(); activeCarrierContainer = new CarrierContainer(this); 522 carrierContainers.addFirst(activeCarrierContainer); 523 notificationThread.start(); 524 cacheManagerThread.start(); 525 } 526 527 532 protected static Set stringArray2Set(String [] sa) 533 { 534 if (sa == null) 535 return null; 536 537 Set set = new HashSet (sa.length); 538 for (int i = 0; i < sa.length; ++i) 539 set.add(sa[i]); 540 541 return set; 542 } 543 544 560 public Object get(String scope, Object objectID, String [] fetchGroups) 561 { 562 return get(scope, objectID, stringArray2Set(fetchGroups)); 563 } 564 565 581 public synchronized Object get(String scope, Object objectID, Set fetchGroups) 582 { 583 Key key = new Key(scope, objectID, fetchGroups); 584 Carrier carrier = (Carrier) carriersByKey.get(key); 585 if (carrier == null) { 586 if (LOGGER.isDebugEnabled()) 587 LOGGER.debug("No Carrier found for key: " + key.toString()); 588 589 return null; 590 } 591 592 Object object = carrier.getObject(); 593 if (object == null) { if (LOGGER.isDebugEnabled()) 595 LOGGER.debug("Found Carrier, but object has already been released by the garbage collector! key: " + key.toString()); 596 597 remove(key); 598 return null; 599 } 600 601 if (LOGGER.isDebugEnabled()) 602 LOGGER.debug("Found Carrier - will return object from cache. key: " + key.toString()); 603 604 carrier.setAccessDT(); 605 return object; 606 } 607 608 public void putAll(String scope, Collection objects, String [] fetchGroups) 609 { 610 putAll(scope, objects, stringArray2Set(fetchGroups)); 611 } 612 613 public void putAll(String scope, Collection objects, Set fetchGroups) 614 { 615 if (objects == null) 616 throw new NullPointerException ("objects must not be null!"); 617 618 for (Iterator it = objects.iterator(); it.hasNext(); ) 619 put(scope, it.next(), fetchGroups); 620 } 621 622 public void put(String scope, Object object, String [] fetchGroups) 623 { 624 put(scope, object, stringArray2Set(fetchGroups)); 625 } 626 627 public void put(String scope, Object object, Set fetchGroups) 628 { 629 if (object == null) 630 throw new NullPointerException ("object must not be null!"); 631 632 Object objectID = JDOHelper.getObjectId(object); 634 if (objectID == null) 635 throw new IllegalArgumentException ("Could not obtain a JDO objectID from: "+object); 636 Key key = new Key(scope, objectID, fetchGroups); 637 638 if (LOGGER.isDebugEnabled()) 639 LOGGER.debug("Putting object into cache. key: " + key.toString()); 640 641 synchronized (this) { 642 Carrier oldCarrier = (Carrier) carriersByKey.get(key); 644 if (oldCarrier != null) { 645 if (LOGGER.isDebugEnabled()) 646 LOGGER.debug("There was an old carrier for the same key in the cache; removing it. key: " + key.toString()); 647 648 oldCarrier.setCarrierContainer(null); 649 } 650 651 Carrier carrier = new Carrier(key, object, getActiveCarrierContainer()); 653 654 carriersByKey.put(key, carrier); 656 657 Set keySet = (Set ) keySetsByObjectID.get(objectID); 659 if (keySet == null) { 660 keySet = new HashSet (); 661 keySetsByObjectID.put(objectID, keySet); 662 } 663 keySet.add(key); 664 } 665 666 JDOObjectID2PCClassMap.sharedInstance().initPersistenceCapableClass( 668 objectID, object.getClass()); 669 670 subscribeObjectID(objectID, 0); 674 } 675 676 683 protected synchronized void remove(Key key) 684 { 685 if (LOGGER.isDebugEnabled()) 686 LOGGER.debug("Removing Carrier for key: " + key.toString()); 687 688 Object objectID = key.getObjectID(); 689 690 Set keySet = (Set ) keySetsByObjectID.get(objectID); 691 if (keySet != null) { 692 keySet.remove(key); 693 694 if (keySet.isEmpty()) 695 keySetsByObjectID.remove(objectID); 696 } 697 Carrier oldCarrier = (Carrier) carriersByKey.remove(key); 698 if (oldCarrier != null) 699 oldCarrier.setCarrierContainer(null); 700 } 701 702 713 protected synchronized int remove(ObjectID objectID) 714 { 715 LOGGER.debug("Removing all Carriers for objectID: " + objectID); 716 717 Set keySet = (Set ) keySetsByObjectID.remove(objectID); 718 if (keySet == null) 719 return 0; 720 721 int removedCarrierCount = 0; 722 for (Iterator it = keySet.iterator(); it.hasNext(); ) { 723 Key key = (Key) it.next(); 724 Carrier carrier = (Carrier) carriersByKey.remove(key); 725 if (carrier != null) { 726 if (LOGGER.isDebugEnabled()) 727 LOGGER.debug("Removing Carrier: key=\""+key.toString()+"\""); 728 729 carrier.setCarrierContainer(null); 730 removedCarrierCount++; 731 } 732 else 733 LOGGER.warn("There was a key in the keySetsByObjectID, but no carrier for it in carriersByKey! key=\""+key.toString()+"\""); 734 } 735 return removedCarrierCount; 736 } 737 738 763 public synchronized void closeCacheSession() 764 throws ModuleException 765 { 766 try { 767 JDOManager jm = JDOManagerUtil.getHome(Login.getLogin().getInitialContextProperties()).create(); 768 769 jm.closeCacheSession(cacheSessionID); 771 772 774 carriersByKey.clear(); 776 keySetsByObjectID.clear(); 777 778 cacheSessionID = null; 780 } catch (ModuleException x) { 781 throw x; 782 } catch (Exception x) { 783 throw new ModuleException(x); 784 } 785 } 786 } 787 | Popular Tags |