1 22 package org.jboss.mq.server; 23 24 import java.lang.ref.Reference ; 25 import java.lang.ref.ReferenceQueue ; 26 import java.util.HashMap ; 27 import javax.jms.JMSException ; 28 import javax.management.MBeanRegistration ; 29 import javax.management.ObjectName ; 30 import org.jboss.mq.DurableSubscriptionID; 31 import org.jboss.mq.SpyMessage; 32 import org.jboss.mq.pm.CacheStore; 33 import org.jboss.system.ServiceMBeanSupport; 34 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 35 36 55 public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, MBeanRegistration , Runnable 56 { 57 public static final long ONE_MEGABYTE = 1024L * 1000; 58 public static final long DEFAULT_HIGH_MEMORY_MARK = ONE_MEGABYTE * 50; 59 public static final long DEFAULT_MAX_MEMORY_MARK = ONE_MEGABYTE * 60; 60 61 private LRUCache lruCache = new LRUCache(); 63 64 private SynchronizedLong messageCounter = new SynchronizedLong(0); 66 long cacheHits = 0; 67 long cacheMisses = 0; 68 69 protected CacheStore cacheStore; 70 ObjectName cacheStoreName; 71 72 private Thread referenceSoftner; 73 74 private long highMemoryMark = DEFAULT_HIGH_MEMORY_MARK; 75 private long maxMemoryMark = DEFAULT_MAX_MEMORY_MARK; 76 77 78 private boolean makeSoftReferences = true; 79 80 81 private long lastSoften = 0L; 82 83 84 private long softenNoMoreOftenThanMillis = 0L; 85 86 87 private long softenAtLeastEveryMillis = 0L; 88 89 90 private long softenWaitMillis = 1000L; 91 92 93 private int minimumHard = 1; 94 95 96 private int maximumHard = 0; 97 98 int softRefCacheSize = 0; 99 int totalCacheSize = 0; 100 101 ReferenceQueue referenceQueue = new ReferenceQueue (); 103 104 long softenedSize = 0; 106 107 boolean checkSoftReferenceDepth = false; 109 110 117 public MessageCache getInstance() 118 { 119 return this; 120 } 121 122 125 public MessageReference add(SpyMessage message, BasicQueue queue, int stored) throws javax.jms.JMSException 126 { 127 DurableSubscriptionID id = message.header.durableSubscriberID; 128 return addInternal(message, queue, stored, id); 129 } 130 131 134 public MessageReference add(SpyMessage message, BasicQueue queue, int stored, DurableSubscriptionID id) throws javax.jms.JMSException 135 { 136 return addInternal(message, queue, stored, id); 137 } 138 139 142 public MessageReference addInternal(SpyMessage message, BasicQueue queue, int stored, DurableSubscriptionID id) throws javax.jms.JMSException 143 { 144 MessageReference mh = new MessageReference(); 146 mh.init(this, messageCounter.increment(), message, queue, id); 147 mh.setStored(stored); 148 149 synchronized (mh) 151 { 152 synchronized (lruCache) 153 { 154 lruCache.addMostRecent(mh); 155 totalCacheSize++; 156 } 157 } 158 validateSoftReferenceDepth(); 159 160 return mh; 161 } 162 163 166 public void remove(MessageReference mr) throws JMSException 167 { 168 removeInternal(mr, true, true); 170 } 171 172 176 public void removeDelayed(MessageReference mr) throws JMSException 177 { 178 removeInternal(mr, true, false); 180 } 181 182 186 void soften(MessageReference mr) throws JMSException 187 { 188 removeInternal(mr, false, false); 190 191 if (makeSoftReferences) 192 softRefCacheSize++; 193 } 194 195 198 protected void removeInternal(MessageReference mr, boolean clear, boolean reset) throws JMSException 199 { 200 synchronized (mr) 201 { 202 if (mr.stored != MessageReference.REMOVED) 203 { 204 synchronized (lruCache) 205 { 206 if (mr.hardReference != null) lruCache.remove(mr); 208 if (clear) 209 totalCacheSize--; 210 } 211 if (clear) 212 mr.clear(); 213 } 215 216 if (reset) 217 mr.reset(); 218 } 220 } 221 222 234 public void run() 235 { 236 try 237 { 238 while (true) 239 { 240 Reference r = null; 242 if (checkSoftReferenceDepth) 243 r = referenceQueue.poll(); 244 else 245 r = referenceQueue.remove(softenWaitMillis); 246 if (r != null) 247 { 248 softRefCacheSize--; 249 while ((r = referenceQueue.poll()) != null) 252 { 253 softRefCacheSize--; 254 } 255 if (log.isTraceEnabled()) 256 log.trace("soft reference cache size is now: " + softRefCacheSize); 257 258 checkSoftReferenceDepth = true; 259 } 260 261 long now = System.currentTimeMillis(); 262 263 if (softenNoMoreOftenThanMillis > 0 && (now - lastSoften < softenNoMoreOftenThanMillis)) 265 checkSoftReferenceDepth = false; 266 267 else if (softenAtLeastEveryMillis > 0 && (now - lastSoften > softenAtLeastEveryMillis)) 269 checkSoftReferenceDepth = true; 270 271 if (checkSoftReferenceDepth) 273 { 274 checkSoftReferenceDepth = validateSoftReferenceDepth(); 275 276 if (checkSoftReferenceDepth == false) 278 lastSoften = now; 279 } 280 } 281 } 282 catch (InterruptedException e) 283 { 284 } 286 catch (Throwable t) 287 { 288 log.error("Message Cache Thread Stopped: ", t); 289 } 290 log.debug("Thread exiting."); 291 } 292 293 297 boolean validateSoftReferenceDepth() throws JMSException 298 { 299 boolean trace = log.isTraceEnabled(); 300 301 while (getState() == ServiceMBeanSupport.STARTED) 303 { 304 MessageReference messageToSoften = null; 305 306 synchronized (lruCache) 307 { 308 int softenCount = 0; 310 int hardCount = getHardRefCacheSize(); 311 int softCount = getSoftRefCacheSize(); 312 313 if (hardCount <= minimumHard) 315 return false; 316 317 long currentMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); 318 if (currentMem > highMemoryMark) 319 { 320 float severity = ((float) (currentMem - highMemoryMark)) / (maxMemoryMark - highMemoryMark); 323 severity = Math.min(severity, 1.0F); 324 if (trace) 325 log.trace("Memory usage serverity=" + severity); 326 int totalMessageInMem = hardCount + softCount; 327 int howManyShouldBeSoft = (int) ((totalMessageInMem) * severity); 328 softenCount = howManyShouldBeSoft - softCount; 329 } 330 331 if (maximumHard > 0) 333 { 334 int removeCount = hardCount - maximumHard; 335 if (removeCount > 0 && removeCount > softenCount) 336 softenCount = removeCount; 337 } 338 339 if (softenCount > hardCount) 341 { 342 if (trace) 343 log.trace("Soften count " + softenCount + " greater than hard references " + hardCount); 344 softenCount = hardCount; 345 } 346 347 if (softenCount > 1 || (maximumHard > 0 && hardCount > maximumHard)) 350 { 351 if (trace) 352 log.trace("Need to soften " + softenCount + " messages"); 353 Node node = lruCache.getLeastRecent(); 354 messageToSoften = (MessageReference) node.data; 355 } 356 } 357 358 if (messageToSoften == null) 360 return false; 361 362 synchronized (messageToSoften) 363 { 364 if (messageToSoften.messageCache != null && messageToSoften.stored != MessageReference.REMOVED) 366 { 367 messageToSoften.makeSoft(); 368 if (messageToSoften.stored == MessageReference.STORED) 369 { 370 softenedSize++; 371 return true; 372 } 373 else if (messageToSoften.isPersistent()) 374 { 375 return false; 378 } 379 } 380 else if (trace) 381 log.trace("not softening removed message " + messageToSoften); 382 } 383 } 384 return false; 385 } 386 387 391 void messageReferenceUsedEvent(MessageReference mh, boolean wasHard) throws JMSException 392 { 393 synchronized (mh) 394 { 395 synchronized (lruCache) 396 { 397 if (wasHard) 398 lruCache.makeMostRecent(mh); 399 else 400 { 401 lruCache.addMostRecent(mh); 402 } 403 } 404 } 405 406 if (wasHard == false) 407 checkSoftReferenceDepth = true; 408 } 409 410 SpyMessage loadFromStorage(MessageReference mh) throws JMSException 414 { 415 return cacheStore.loadFromStorage(mh); 416 } 417 418 void saveToStorage(MessageReference mh, SpyMessage message) throws JMSException 419 { 420 cacheStore.saveToStorage(mh, message); 421 } 422 423 void removeFromStorage(MessageReference mh) throws JMSException 424 { 425 cacheStore.removeFromStorage(mh); 426 } 427 428 434 437 protected void startService() throws Exception 438 { 439 setupCacheStore(); 440 441 referenceSoftner = new Thread (this, "JBossMQ Cache Reference Softner"); 442 referenceSoftner.setDaemon(true); 443 referenceSoftner.start(); 444 } 445 446 protected void setupCacheStore() throws Exception 447 { 448 cacheStore = (CacheStore) getServer().getAttribute(cacheStoreName, "Instance"); 449 } 450 451 454 protected void stopService() 455 { 456 synchronized (lruCache) 457 { 458 referenceSoftner.interrupt(); 459 referenceSoftner = null; 460 } 461 cacheStore = null; 462 } 463 464 470 public int getHardRefCacheSize() 471 { 472 synchronized (lruCache) 473 { 474 return lruCache.size(); 475 } 476 } 477 478 485 public long getSoftenedSize() 486 { 487 return softenedSize; 488 } 489 490 496 public int getSoftRefCacheSize() 497 { 498 return softRefCacheSize; 499 } 500 501 507 public int getTotalCacheSize() 508 { 509 return totalCacheSize; 510 } 511 512 518 public long getCacheMisses() 519 { 520 return cacheMisses; 521 } 522 523 529 public long getCacheHits() 530 { 531 return cacheHits; 532 } 533 534 540 public boolean getMakeSoftReferences() 541 { 542 return makeSoftReferences; 543 } 544 545 551 public void setMakeSoftReferences(boolean makeSoftReferences) 552 { 553 this.makeSoftReferences = makeSoftReferences; 554 } 555 556 562 public int getMinimumHard() 563 { 564 return minimumHard; 565 } 566 567 573 public void setMinimumHard(int minimumHard) 574 { 575 if (minimumHard < 1) 576 this.minimumHard = 1; 577 else 578 this.minimumHard = minimumHard; 579 } 580 581 587 public int getMaximumHard() 588 { 589 return maximumHard; 590 } 591 592 598 public void setMaximumHard(int maximumHard) 599 { 600 if (maximumHard < 0) 601 this.maximumHard = 0; 602 else 603 this.maximumHard = maximumHard; 604 } 605 606 612 public long getSoftenWaitMillis() 613 { 614 return softenWaitMillis; 615 } 616 617 623 public void setSoftenWaitMillis(long millis) 624 { 625 if (millis < 1000) 626 softenWaitMillis = 1000; 627 else 628 softenWaitMillis = millis; 629 } 630 631 637 public long getSoftenNoMoreOftenThanMillis() 638 { 639 return softenNoMoreOftenThanMillis; 640 } 641 642 648 public void setSoftenNoMoreOftenThanMillis(long millis) 649 { 650 if (millis < 0) 651 softenNoMoreOftenThanMillis = 0; 652 else 653 softenNoMoreOftenThanMillis = millis; 654 } 655 656 662 public long getSoftenAtLeastEveryMillis() 663 { 664 return softenAtLeastEveryMillis; 665 } 666 667 673 public void setSoftenAtLeastEveryMillis(long millis) 674 { 675 if (millis < 0) 676 softenAtLeastEveryMillis = 0; 677 else 678 softenAtLeastEveryMillis = millis; 679 } 680 681 687 public long getHighMemoryMark() 688 { 689 return highMemoryMark / ONE_MEGABYTE; 690 } 691 697 public void setHighMemoryMark(long highMemoryMark) 698 { 699 if (highMemoryMark > 0) 700 this.highMemoryMark = highMemoryMark * ONE_MEGABYTE; 701 else 702 this.highMemoryMark = 0; 703 } 704 705 711 public long getMaxMemoryMark() 712 { 713 return maxMemoryMark / ONE_MEGABYTE; 714 } 715 716 722 public void setMaxMemoryMark(long maxMemoryMark) 723 { 724 if (maxMemoryMark > 0) 725 this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE; 726 else 727 this.maxMemoryMark = 0; 728 } 729 730 736 public long getCurrentMemoryUsage() 737 { 738 return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE; 739 } 740 741 744 public String getName() 745 { 746 return "MessageCache"; 747 } 748 749 754 public void setCacheStore(ObjectName cacheStoreName) 755 { 756 this.cacheStoreName = cacheStoreName; 757 } 758 759 766 public ObjectName getCacheStore() 767 { 768 return cacheStoreName; 769 } 770 771 775 class LRUCache 776 { 777 int currentSize = 0; 778 HashMap map = new HashMap (); 780 Node mostRecent = null; 781 Node leastRecent = null; 782 public void addMostRecent(Object o) 783 { 784 Node newNode = new Node(); 785 newNode.data = o; 786 Object oldNode = map.put(o, newNode); 788 if (oldNode != null) 789 { 790 map.put(o, oldNode); 791 throw new RuntimeException ("Can't add object '" + o + "' to LRUCache that is already in cache."); 792 } 793 if (mostRecent == null) 795 { 796 mostRecent = newNode; 798 leastRecent = newNode; 799 } 800 else 801 { 802 newNode.lessRecent = mostRecent; 803 mostRecent.moreRecent = newNode; 804 mostRecent = newNode; 805 } 806 ++currentSize; 807 } 808 public void addLeastRecent(Object o) 810 { 811 Node newNode = new Node(); 812 newNode.data = o; 813 Object oldNode = map.put(o, newNode); 815 if (oldNode != null) 816 { 817 map.put(o, oldNode); 818 throw new RuntimeException ("Can't add object '" + o + "' to LRUCache that is already in cache."); 819 } 820 if (leastRecent == null) 822 { 823 mostRecent = newNode; 825 leastRecent = newNode; 826 } 827 else 828 { 829 newNode.moreRecent = leastRecent; 830 leastRecent.lessRecent = newNode; 831 leastRecent = newNode; 832 } 833 ++currentSize; 834 } 835 public void remove(Object o) 836 { 837 Node node = (Node) map.remove(o); 839 if (node == null) 840 throw new RuntimeException ("Can't remove object '" + o + "' that is not in cache."); 841 Node more = node.moreRecent; 843 Node less = node.lessRecent; 844 if (more == null) 845 { mostRecent = less; 847 if (mostRecent != null) 848 { 849 mostRecent.moreRecent = null; } 851 } 852 else 853 { 854 more.lessRecent = less; 855 } 856 if (less == null) 857 { leastRecent = more; 859 if (leastRecent != null) 860 { 861 leastRecent.lessRecent = null; } 863 } 864 else 865 { 866 less.moreRecent = more; 867 } 868 --currentSize; 869 } 870 public void makeMostRecent(Object o) 871 { 872 Node node = (Node) map.get(o); 874 if (node == null) 875 throw new RuntimeException ("Can't make most recent object '" + o + "' that is not in cache."); 876 Node more = node.moreRecent; 878 Node less = node.lessRecent; 879 if (more == null) return; 881 else 882 more.lessRecent = less; 883 if (less == null) leastRecent = more; 885 else 886 less.moreRecent = more; 887 node.lessRecent = mostRecent; 889 node.moreRecent = null; mostRecent.moreRecent = node; 891 mostRecent = node; 892 } 893 public int size() 894 { 895 return currentSize; 896 } 897 public Node getMostRecent() 898 { 899 return mostRecent; 900 } 901 public Node getLeastRecent() 902 { 903 return leastRecent; 904 } 905 } 906 907 static class Node 908 { 909 Node moreRecent = null; 910 Node lessRecent = null; 911 Object data = null; 912 } 913 } 914 | Popular Tags |