1 16 17 package org.apache.catalina.cluster.util; 18 19 28 34 public class FastQueue implements IQueue { 35 36 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 37 .getLog(FastQueue.class); 38 39 42 private SingleRemoveSynchronizedAddLock lock = null; 43 44 47 private LinkObject first = null; 48 49 52 private LinkObject last = null; 53 54 57 private int size = 0; 58 59 62 private boolean checkLock = false; 63 64 67 private boolean timeWait = false; 68 69 72 private boolean doStats = false; 73 74 77 private boolean inAdd = false; 78 79 82 private boolean inRemove = false; 83 84 87 private boolean inMutex = false; 88 89 92 private int maxQueueLength = 0; 93 94 97 private long addWaitTimeout = 10000L; 98 99 100 103 private long removeWaitTimeout = 30000L; 104 105 108 private boolean enabled = true; 109 110 113 private long addCounter = 0; 114 115 118 private long addErrorCounter = 0; 119 120 123 private long removeCounter = 0; 124 125 128 private long removeErrorCounter = 0; 129 130 133 private long addWait = 0; 134 135 138 private long removeWait = 0; 139 140 143 private int maxSize = 0; 144 145 148 private long avgSize = 0; 149 150 153 private int maxSizeSample = 0; 154 155 158 private long avgSizeSample = 0; 159 160 163 private int sampleInterval = 100; 164 165 169 public FastQueue() { 170 lock = new SingleRemoveSynchronizedAddLock(); 171 lock.setAddWaitTimeout(addWaitTimeout); 172 lock.setRemoveWaitTimeout(removeWaitTimeout); 173 } 174 175 180 public long getAddWaitTimeout() { 181 addWaitTimeout = lock.getAddWaitTimeout(); 182 return addWaitTimeout; 183 } 184 185 190 public void setAddWaitTimeout(long timeout) { 191 addWaitTimeout = timeout; 192 lock.setAddWaitTimeout(addWaitTimeout); 193 } 194 195 200 public long getRemoveWaitTimeout() { 201 removeWaitTimeout = lock.getRemoveWaitTimeout(); 202 return removeWaitTimeout; 203 } 204 205 210 public void setRemoveWaitTimeout(long timeout) { 211 removeWaitTimeout = timeout; 212 lock.setRemoveWaitTimeout(removeWaitTimeout); 213 } 214 215 220 public int getMaxQueueLength() { 221 return maxQueueLength; 222 } 223 224 public void setMaxQueueLength(int length) { 225 maxQueueLength = length; 226 } 227 228 public boolean isEnabled() { 229 return enabled; 230 } 231 232 public void setEnabled(boolean enable) { 233 enabled = enable; 234 if (!enabled) { 235 lock.abortRemove(); 236 } 237 } 238 239 242 public boolean isCheckLock() { 243 return checkLock; 244 } 245 246 249 public void setCheckLock(boolean checkLock) { 250 this.checkLock = checkLock; 251 } 252 253 256 public boolean isDoStats() { 257 return doStats; 258 } 259 260 263 public void setDoStats(boolean doStats) { 264 this.doStats = doStats; 265 } 266 267 270 public boolean isTimeWait() { 271 return timeWait; 272 } 273 274 277 public void setTimeWait(boolean timeWait) { 278 this.timeWait = timeWait; 279 } 280 281 public int getSampleInterval() { 282 return sampleInterval; 283 } 284 285 public void setSampleInterval(int interval) { 286 sampleInterval = interval; 287 } 288 289 public long getAddCounter() { 290 return addCounter; 291 } 292 293 public void setAddCounter(long counter) { 294 addCounter = counter; 295 } 296 297 public long getAddErrorCounter() { 298 return addErrorCounter; 299 } 300 301 public void setAddErrorCounter(long counter) { 302 addErrorCounter = counter; 303 } 304 305 public long getRemoveCounter() { 306 return removeCounter; 307 } 308 309 public void setRemoveCounter(long counter) { 310 removeCounter = counter; 311 } 312 313 public long getRemoveErrorCounter() { 314 return removeErrorCounter; 315 } 316 317 public void setRemoveErrorCounter(long counter) { 318 removeErrorCounter = counter; 319 } 320 321 public long getAddWait() { 322 return addWait; 323 } 324 325 public void setAddWait(long wait) { 326 addWait = wait; 327 } 328 329 public long getRemoveWait() { 330 return removeWait; 331 } 332 333 public void setRemoveWait(long wait) { 334 removeWait = wait; 335 } 336 337 340 public int getMaxSize() { 341 return maxSize; 342 } 343 344 347 public void setMaxSize(int size) { 348 maxSize = size; 349 } 350 351 352 356 public long getAvgSize() { 357 if (addCounter > 0) { 358 return avgSize / addCounter; 359 } else { 360 return 0; 361 } 362 } 363 364 367 public void resetStatistics() { 368 addCounter = 0; 369 addErrorCounter = 0; 370 removeCounter = 0; 371 removeErrorCounter = 0; 372 avgSize = 0; 373 maxSize = 0; 374 addWait = 0; 375 removeWait = 0; 376 } 377 378 381 public void unlockAdd() { 382 lock.unlockAdd(size > 0 ? true : false); 383 } 384 385 388 public void unlockRemove() { 389 lock.unlockRemove(); 390 } 391 392 395 public void start() { 396 setEnabled(true); 397 } 398 399 402 public void stop() { 403 setEnabled(false); 404 } 405 406 public long getSample() { 407 return addCounter % sampleInterval; 408 } 409 410 public int getMaxSizeSample() { 411 return maxSizeSample; 412 } 413 414 public void setMaxSizeSample(int size) { 415 maxSizeSample = size; 416 } 417 418 public long getAvgSizeSample() { 419 long sample = addCounter % sampleInterval; 420 if (sample > 0) { 421 return avgSizeSample / sample; 422 } else if (addCounter > 0) { 423 return avgSizeSample / sampleInterval; 424 } else { 425 return 0; 426 } 427 } 428 429 public int getSize() { 430 int sz; 431 sz = size; 432 return sz; 433 } 434 435 439 public boolean add(String key, Object data) { 440 boolean ok = true; 441 long time = 0; 442 443 if (!enabled) { 444 if (log.isInfoEnabled()) 445 log.info("FastQueue: queue disabled, add aborted"); 446 return false; 447 } 448 449 if (timeWait) { 450 time = System.currentTimeMillis(); 451 } 452 lock.lockAdd(); 453 try { 454 if (timeWait) { 455 addWait += (System.currentTimeMillis() - time); 456 } 457 458 if (log.isTraceEnabled()) { 459 log.trace("FastQueue: add starting with size " + size); 460 } 461 if (checkLock) { 462 if (inAdd) 463 log.warn("FastQueue.add: Detected other add"); 464 inAdd = true; 465 if (inMutex) 466 log.warn("FastQueue.add: Detected other mutex in add"); 467 inMutex = true; 468 } 469 470 if ((maxQueueLength > 0) && (size >= maxQueueLength)) { 471 ok = false; 472 if (log.isTraceEnabled()) { 473 log.trace("FastQueue: Could not add, since queue is full (" 474 + size + ">=" + maxQueueLength + ")"); 475 } 476 477 } else { 478 LinkObject element = new LinkObject(key, data); 479 if (size == 0) { 480 first = last = element; 481 size = 1; 482 } else { 483 if (last == null) { 484 ok = false; 485 log 486 .error("FastQueue: Could not add, since last is null although size is " 487 + size + " (>0)"); 488 } else { 489 last.append(element); 490 last = element; 491 size++; 492 } 493 } 494 495 } 496 497 if (doStats) { 498 if (ok) { 499 if (addCounter % sampleInterval == 0) { 500 maxSizeSample = 0; 501 avgSizeSample = 0; 502 } 503 addCounter++; 504 if (size > maxSize) { 505 maxSize = size; 506 } 507 if (size > maxSizeSample) { 508 maxSizeSample = size; 509 } 510 avgSize += size; 511 avgSizeSample += size; 512 } else { 513 addErrorCounter++; 514 } 515 } 516 517 if (first == null) { 518 log.error("FastQueue: first is null, size is " + size 519 + " at end of add"); 520 } 521 if (last == null) { 522 log.error("FastQueue: last is null, size is " + size 523 + " at end of add"); 524 } 525 526 if (checkLock) { 527 if (!inMutex) 528 log.warn("FastQueue: Cancelled by other mutex in add"); 529 inMutex = false; 530 if (!inAdd) 531 log.warn("FastQueue: Cancelled by other add"); 532 inAdd = false; 533 } 534 if (log.isTraceEnabled()) { 535 log.trace("FastQueue: add ending with size " + size); 536 } 537 538 if (timeWait) { 539 time = System.currentTimeMillis(); 540 } 541 } finally { 542 lock.unlockAdd(true); 543 } 544 if (timeWait) { 545 addWait += (System.currentTimeMillis() - time); 546 } 547 return ok; 548 } 549 550 554 public LinkObject remove() { 555 LinkObject element; 556 boolean gotLock; 557 long time = 0; 558 559 if (!enabled) { 560 if (log.isInfoEnabled()) 561 log.info("FastQueue: queue disabled, remove aborted"); 562 return null; 563 } 564 565 if (timeWait) { 566 time = System.currentTimeMillis(); 567 } 568 gotLock = lock.lockRemove(); 569 try { 570 571 if (!gotLock) { 572 if (enabled) { 573 if (timeWait) { 574 removeWait += (System.currentTimeMillis() - time); 575 } 576 if (doStats) { 577 removeErrorCounter++; 578 } 579 if (log.isInfoEnabled()) 580 log 581 .info("FastQueue: Remove aborted although queue enabled"); 582 } else { 583 if (log.isInfoEnabled()) 584 log.info("FastQueue: queue disabled, remove aborted"); 585 } 586 return null; 587 } 588 589 if (timeWait) { 590 removeWait += (System.currentTimeMillis() - time); 591 } 592 593 if (log.isTraceEnabled()) { 594 log.trace("FastQueue: remove starting with size " + size); 595 } 596 if (checkLock) { 597 if (inRemove) 598 log.warn("FastQueue: Detected other remove"); 599 inRemove = true; 600 if (inMutex) 601 log.warn("FastQueue: Detected other mutex in remove"); 602 inMutex = true; 603 } 604 605 element = first; 606 607 if (doStats) { 608 if (element != null) { 609 removeCounter++; 610 } else { 611 removeErrorCounter++; 612 log 613 .error("FastQueue: Could not remove, since first is null although size is " 614 + size + " (>0)"); 615 } 616 } 617 618 first = last = null; 619 size = 0; 620 621 if (checkLock) { 622 if (!inMutex) 623 log.warn("FastQueue: Cancelled by other mutex in remove"); 624 inMutex = false; 625 if (!inRemove) 626 log.warn("FastQueue: Cancelled by other remove"); 627 inRemove = false; 628 } 629 if (log.isTraceEnabled()) { 630 log.trace("FastQueue: remove ending with size " + size); 631 } 632 633 if (timeWait) { 634 time = System.currentTimeMillis(); 635 } 636 } finally { 637 lock.unlockRemove(); 638 } 639 if (timeWait) { 640 removeWait += (System.currentTimeMillis() - time); 641 } 642 return element; 643 } 644 645 } | Popular Tags |