1 18 package org.apache.activemq.memory; 19 20 import java.util.Iterator ; 21 import java.util.LinkedList ; 22 import java.util.List ; 23 import java.util.concurrent.CopyOnWriteArrayList ; 24 25 import org.apache.activemq.Service; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 29 30 40 public class UsageManager implements Service{ 41 42 private static final Log log = LogFactory.getLog(UsageManager.class); 43 44 private final UsageManager parent; 45 private long limit; 46 private long usage; 47 48 private int percentUsage; 49 private int percentUsageMinDelta=1; 50 51 private final Object usageMutex = new Object (); 52 53 private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList (); 54 55 private boolean sendFailIfNoSpace; 56 57 58 private boolean sendFailIfNoSpaceExplicitySet; 59 private final boolean debug = log.isDebugEnabled(); 60 private String name = ""; 61 private float usagePortion = 1.0f; 62 private List <UsageManager> children = new CopyOnWriteArrayList <UsageManager>(); 63 private final LinkedList <Runnable > callbacks = new LinkedList <Runnable >(); 64 65 public UsageManager() { 66 this(null,"default"); 67 } 68 69 76 public UsageManager(UsageManager parent) { 77 this(parent,"default"); 78 } 79 80 public UsageManager(String name) { 81 this(null,name); 82 } 83 84 public UsageManager(UsageManager parent,String name) { 85 this(parent,name,1.0f); 86 } 87 88 public UsageManager(UsageManager parent, String name, float portion) { 89 this.parent = parent; 90 this.usagePortion=portion; 91 if (parent != null) { 92 this.limit=(long)(parent.limit * portion); 93 this.name= parent.name + ":"; 94 } 95 this.name += name; 96 } 97 98 103 public void enqueueUsage(long value) throws InterruptedException { 104 waitForSpace(); 105 increaseUsage(value); 106 } 107 108 111 public void waitForSpace() throws InterruptedException { 112 if(parent!=null) 113 parent.waitForSpace(); 114 synchronized (usageMutex) { 115 for( int i=0; percentUsage >= 100 ; i++) { 116 usageMutex.wait(); 117 } 118 } 119 } 120 121 126 public boolean waitForSpace(long timeout) throws InterruptedException { 127 if(parent!=null) { 128 if( !parent.waitForSpace(timeout) ) 129 return false; 130 } 131 synchronized (usageMutex) { 132 if( percentUsage >= 100 ) { 133 usageMutex.wait(timeout); 134 } 135 return percentUsage < 100; 136 } 137 } 138 139 144 public void increaseUsage(long value) { 145 if( value == 0 ) 146 return; 147 if(parent!=null) 148 parent.increaseUsage(value); 149 int percentUsage; 150 synchronized(usageMutex) { 151 usage+=value; 152 percentUsage = caclPercentUsage(); 153 } 154 setPercentUsage(percentUsage); 155 } 156 157 162 public void decreaseUsage(long value) { 163 if( value == 0 ) 164 return; 165 if(parent!=null) 166 parent.decreaseUsage(value); 167 int percentUsage; 168 synchronized(usageMutex) { 169 usage-=value; 170 percentUsage = caclPercentUsage(); 171 } 172 setPercentUsage(percentUsage); 173 } 174 175 public boolean isFull() { 176 if(parent!=null && parent.isFull()) 177 return true; 178 synchronized (usageMutex) { 179 return percentUsage >= 100; 180 } 181 } 182 183 public void addUsageListener(UsageListener listener) { 184 listeners.add(listener); 185 } 186 public void removeUsageListener(UsageListener listener) { 187 listeners.remove(listener); 188 } 189 190 public long getLimit() { 191 synchronized (usageMutex) { 192 return limit; 193 } 194 } 195 196 204 public void setLimit(long limit) { 205 if(percentUsageMinDelta < 0 ) { 206 throw new IllegalArgumentException ("percentUsageMinDelta must be greater or equal to 0"); 207 } 208 synchronized(usageMutex){ 209 this.limit=limit; 210 this.usagePortion=0; 211 } 212 onLimitChange(); 213 } 214 215 private void onLimitChange() { 216 217 if( usagePortion > 0 && parent!=null ) { 219 synchronized(usageMutex){ 220 limit = (long)(parent.getLimit()*usagePortion); 221 } 222 } 223 224 int percentUsage; 226 synchronized(usageMutex){ 227 percentUsage=caclPercentUsage(); 228 } 229 setPercentUsage(percentUsage); 230 231 for (UsageManager child:children) { 234 child.onLimitChange(); 235 } 236 } 237 238 public float getUsagePortion() { 239 synchronized(usageMutex){ 240 return usagePortion; 241 } 242 } 243 244 public void setUsagePortion(float usagePortion) { 245 synchronized(usageMutex){ 246 this.usagePortion = usagePortion; 247 } 248 onLimitChange(); 249 } 250 251 255 public int getPercentUsage() { 256 synchronized (usageMutex) { 257 return percentUsage; 258 } 259 } 260 261 public int getPercentUsageMinDelta() { 262 synchronized (usageMutex) { 263 return percentUsageMinDelta; 264 } 265 } 266 267 273 public void setPercentUsageMinDelta(int percentUsageMinDelta) { 274 if(percentUsageMinDelta < 1) { 275 throw new IllegalArgumentException ("percentUsageMinDelta must be greater than 0"); 276 } 277 int percentUsage; 278 synchronized (usageMutex) { 279 this.percentUsageMinDelta = percentUsageMinDelta; 280 percentUsage = caclPercentUsage(); 281 } 282 setPercentUsage(percentUsage); 283 } 284 285 public long getUsage() { 286 synchronized (usageMutex) { 287 return usage; 288 } 289 } 290 291 295 public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) { 296 sendFailIfNoSpaceExplicitySet = true; 297 this.sendFailIfNoSpace = failProducerIfNoSpace; 298 } 299 300 public boolean isSendFailIfNoSpace() { 301 if (sendFailIfNoSpaceExplicitySet || parent == null) { 302 return sendFailIfNoSpace; 303 } else { 304 return parent.isSendFailIfNoSpace(); 305 } 306 } 307 308 private void setPercentUsage(int value) { 309 int oldValue = percentUsage; 310 percentUsage = value; 311 if( oldValue!=value ) { 312 fireEvent(oldValue, value); 313 } 314 } 315 316 private int caclPercentUsage() { 317 if( limit==0 ) return 0; 318 return (int)((((usage*100)/limit)/percentUsageMinDelta)*percentUsageMinDelta); 319 } 320 321 private void fireEvent(int oldPercentUsage,int newPercentUsage){ 322 if (debug) { 323 log.debug("Memory usage change. from: "+oldPercentUsage+", to: "+newPercentUsage); 324 } 325 if(oldPercentUsage>=100&&newPercentUsage<100){ 327 synchronized(usageMutex){ 328 usageMutex.notifyAll(); 329 for (Iterator iter = callbacks.iterator(); iter.hasNext();) { 330 Runnable callback = (Runnable ) iter.next(); 331 callback.run(); 332 } 333 callbacks.clear(); 334 } 335 } 336 for(Iterator iter=listeners.iterator();iter.hasNext();){ 338 UsageListener l=(UsageListener)iter.next(); 339 l.onMemoryUseChanged(this,oldPercentUsage,newPercentUsage); 340 } 341 } 342 343 public String getName() { 344 return name; 345 } 346 347 public String toString(){ 348 349 350 return "UsageManager("+ getName() +") percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+" percentUsageMinDelta=" 351 +percentUsageMinDelta+"%"; 352 } 353 354 public void start(){ 355 if(parent!=null){ 356 parent.addChild(this); 357 } 358 } 359 360 public void stop(){ 361 if(parent!=null){ 362 parent.removeChild(this); 363 } 364 } 365 366 private void addChild(UsageManager child){ 367 children.add(child); 368 } 369 370 private void removeChild(UsageManager child){ 371 children.remove(child); 372 } 373 374 378 public boolean notifyCallbackWhenNotFull( final Runnable callback ) { 379 380 if(parent!=null) { 381 Runnable r = new Runnable (){ 382 public void run() { 383 synchronized (usageMutex) { 384 if( percentUsage >= 100 ) { 385 callbacks.add(callback); 386 } else { 387 callback.run(); 388 } 389 } 390 } 391 }; 392 if( parent.notifyCallbackWhenNotFull(r) ) { 393 return true; 394 } 395 } 396 synchronized (usageMutex) { 397 if( percentUsage >= 100 ) { 398 callbacks.add(callback); 399 return true; 400 } else { 401 return false; 402 } 403 } 404 } 405 406 407 } 408 | Popular Tags |