1 52 53 package com.go.trove.util.tq; 54 55 import java.util.*; 56 import com.go.trove.util.*; 57 58 68 public class TransactionQueue { 69 private ThreadPool mThreadPool; 70 private String mName; 71 private int mMaxSize; 72 private int mMaxThreads; 73 private long mIdleTimeout; 74 private long mTransactionTimeout; 75 76 private LinkedList mQueue = new LinkedList(); 77 private int mThreadCount; 78 private int mServicingCount; 79 private int mThreadId; 80 private boolean mSuspended; 81 82 private Worker mWorker = new Worker(); 83 84 private Collection mListeners = new LinkedList(); 85 private Collection mExceptionListeners = new LinkedList(); 86 87 private long mTimeLapseStart; 89 private int mPeakQueueSize; 90 private int mPeakThreadCount; 91 private int mPeakServicingCount; 92 private int mTotalEnqueueAttempts; 93 private int mTotalEnqueued; 94 private int mTotalServiced; 95 private int mTotalExpired; 96 private int mTotalServiceExceptions; 97 private int mTotalUncaughtExceptions; 98 private long mTotalQueueDuration; 99 private long mTotalServiceDuration; 100 101 public TransactionQueue(ThreadPool tp, int maxSize, int maxThreads) { 102 this(tp, "TransactionQueue", maxSize, maxThreads); 103 } 104 105 public TransactionQueue(ThreadPool tp, String name, 106 int maxSize, int maxThreads) { 107 mThreadPool = tp; 108 mName = name; 109 110 setMaximumSize(maxSize); 111 setMaximumThreads(maxThreads); 112 113 setIdleTimeout(tp.getIdleTimeout()); 114 setTransactionTimeout(-1); 115 116 resetStatistics(); 117 } 118 119 127 public synchronized void setIdleTimeout(long timeout) { 128 mIdleTimeout = timeout; 129 } 130 131 139 public synchronized long getIdleTimeout() { 140 return mIdleTimeout; 141 } 142 143 149 public synchronized void setTransactionTimeout(long timeout) { 150 mTransactionTimeout = timeout; 151 } 152 153 158 public synchronized long getTransactionTimeout() { 159 return mTransactionTimeout; 160 } 161 162 165 public String getName() { 166 return mName; 167 } 168 169 172 public synchronized int getMaximumSize() { 173 return mMaxSize; 174 } 175 176 179 public synchronized void setMaximumSize(int max) { 180 if (max < 0) { 181 throw new IllegalArgumentException 182 ("TransactionQueue max size must be positive: " + max); 183 } 184 185 mMaxSize = max; 186 } 187 188 191 public synchronized int getMaximumThreads() { 192 return mMaxThreads; 193 } 194 195 public synchronized void setMaximumThreads(int max) { 196 if (max < 1) { 197 throw new IllegalArgumentException 198 ("TransactionQueue must have at least one thread: " + max); 199 } 200 201 mMaxThreads = max; 202 } 203 204 212 public synchronized boolean enqueue(Transaction transaction) { 213 mTotalEnqueueAttempts++; 214 215 if (transaction == null || mThreadPool.isClosed()) { 216 return false; 217 } 218 219 int queueSize; 220 if ((queueSize = mQueue.size()) >= mMaxSize) { 221 if (mListeners.size() > 0) { 222 TransactionQueueEvent event = 223 new TransactionQueueEvent(this, transaction); 224 225 Iterator it = mListeners.iterator(); 226 while (it.hasNext()) { 227 ((TransactionQueueListener)it.next()) 228 .transactionQueueFull(event); 229 } 230 } 231 return false; 232 } 233 234 if (!mSuspended) { 235 if (!ensureWaitingThread()) { 236 return false; 237 } 238 } 239 240 mTotalEnqueued++; 241 242 TransactionQueueEvent event = 243 new TransactionQueueEvent(this, transaction); 244 245 mQueue.addLast(event); 246 247 if (++queueSize > mPeakQueueSize) { 248 mPeakQueueSize = queueSize; 249 } 250 251 notify(); 252 253 if (mListeners.size() > 0) { 254 Iterator it = mListeners.iterator(); 255 while (it.hasNext()) { 256 ((TransactionQueueListener)it.next()) 257 .transactionEnqueued(event); 258 } 259 } 260 261 return true; 262 } 263 264 269 public synchronized void suspend() { 270 if (!mSuspended) { 271 mQueue.addFirst(null); 272 notify(); 273 mSuspended = true; 274 } 275 } 276 277 284 public synchronized boolean resume() { 285 if (mSuspended) { 286 mSuspended = false; 287 } 288 return ensureWaitingThread(); 289 } 290 291 302 public synchronized void idle() { 303 mQueue.addLast(null); 304 notify(); 305 } 306 307 public synchronized void addTransactionQueueListener 308 (TransactionQueueListener listener) { 309 310 mListeners.add(listener); 311 } 312 313 public synchronized void removeTransactionQueueListener 314 (TransactionQueueListener listener) { 315 316 mListeners.remove(listener); 317 } 318 319 public synchronized void addUncaughtExceptionListener 320 (UncaughtExceptionListener listener) { 321 322 mExceptionListeners.add(listener); 323 } 324 325 public synchronized void removeUncaughtExceptionListener 326 (UncaughtExceptionListener listener) { 327 328 mExceptionListeners.remove(listener); 329 } 330 331 334 public synchronized int getQueueSize() { 335 return mQueue.size(); 336 } 337 338 341 public synchronized int getThreadCount() { 342 return mThreadCount; 343 } 344 345 348 public synchronized TransactionQueueData getStatistics() { 349 return new TransactionQueueData(this, 350 mTimeLapseStart, 351 System.currentTimeMillis(), 352 mQueue.size(), 353 mThreadCount, 354 mServicingCount, 355 mPeakQueueSize, 356 mPeakThreadCount, 357 mPeakServicingCount, 358 mTotalEnqueueAttempts, 359 mTotalEnqueued, 360 mTotalServiced, 361 mTotalExpired, 362 mTotalServiceExceptions, 363 mTotalUncaughtExceptions, 364 mTotalQueueDuration, 365 mTotalServiceDuration); 366 } 367 368 371 public synchronized void resetStatistics() { 372 mPeakQueueSize = 0; 373 mPeakThreadCount = 0; 374 mPeakServicingCount = 0; 375 mTotalEnqueueAttempts = 0; 376 mTotalEnqueued = 0; 377 mTotalServiced = 0; 378 mTotalExpired = 0; 379 mTotalServiceExceptions = 0; 380 mTotalUncaughtExceptions = 0; 381 mTotalQueueDuration = 0; 382 mTotalServiceDuration = 0; 383 384 mTimeLapseStart = System.currentTimeMillis(); 385 } 386 387 400 public synchronized void applyProperties(PropertyMap properties) { 401 if (properties.containsKey("max.size")) { 402 setMaximumSize(properties.getInt("max.size")); 403 } 404 405 if (properties.containsKey("max.threads")) { 406 setMaximumThreads(properties.getInt("max.threads")); 407 } 408 409 if (properties.containsKey("timeout.idle")) { 410 setIdleTimeout(properties.getNumber("timeout.idle").longValue()); 411 } 412 413 if (properties.containsKey("timeout.transaction")) { 414 setTransactionTimeout 415 (properties.getNumber("timeout.transaction").longValue()); 416 } 417 418 if ("true".equalsIgnoreCase(properties.getString("tune.size"))) { 419 addTransactionQueueListener(new TransactionQueueSizeTuner()); 420 } 421 422 if ("true".equalsIgnoreCase(properties.getString("tune.threads"))) { 423 addTransactionQueueListener(new TransactionQueueThreadTuner()); 424 } 425 } 426 427 synchronized void startThread(boolean canwait) 428 throws InterruptedException { 429 430 if (mThreadCount < mMaxThreads) { 431 String threadName = getName() + ' ' + (mThreadId++); 432 if (canwait) { 433 mThreadPool.start(mWorker, threadName); 434 } 435 else { 436 mThreadPool.start(mWorker, 0, threadName); 437 } 438 439 if (++mThreadCount > mPeakThreadCount) { 440 mPeakThreadCount = mThreadCount; 441 } 442 } 443 } 444 445 448 synchronized TransactionQueueEvent nextTransactionEvent() 449 throws InterruptedException { 450 451 if (mQueue.isEmpty()) { 452 if (mIdleTimeout != 0) { 453 if (mIdleTimeout < 0) { 454 wait(); 455 } 456 else { 457 wait(mIdleTimeout); 458 } 459 } 460 } 461 462 if (mQueue.isEmpty()) { 463 return null; 464 } 465 466 return (TransactionQueueEvent)mQueue.removeFirst(); 467 } 468 469 synchronized TransactionQueueEvent transactionDequeued 470 (TransactionQueueEvent event) { 471 472 if (++mServicingCount > mPeakServicingCount) { 473 mPeakServicingCount = mServicingCount; 474 } 475 476 TransactionQueueEvent deqEvent = new TransactionQueueEvent(event); 477 478 mTotalQueueDuration += 479 (deqEvent.getTimestampMillis() - event.getTimestampMillis()); 480 481 if (mListeners.size() > 0) { 482 Iterator it = mListeners.iterator(); 483 while (it.hasNext()) { 484 ((TransactionQueueListener)it.next()) 485 .transactionDequeued(deqEvent); 486 } 487 } 488 489 return deqEvent; 490 } 491 492 synchronized void transactionServiced(TransactionQueueEvent event) { 493 TransactionQueueEvent svcEvent = new TransactionQueueEvent(event); 494 495 mTotalServiceDuration += 496 (svcEvent.getTimestampMillis() - event.getTimestampMillis()); 497 498 if (mListeners.size() > 0) { 499 Iterator it = mListeners.iterator(); 500 while (it.hasNext()) { 501 ((TransactionQueueListener)it.next()) 502 .transactionServiced(svcEvent); 503 } 504 } 505 506 mServicingCount--; 509 mTotalServiced++; 510 } 511 512 synchronized void transactionExpired(TransactionQueueEvent event) { 513 mServicingCount--; 514 mTotalExpired++; 515 516 if (mListeners.size() > 0) { 517 event = new TransactionQueueEvent(event); 518 519 Iterator it = mListeners.iterator(); 520 while (it.hasNext()) { 521 ((TransactionQueueListener)it.next()) 522 .transactionExpired(event); 523 } 524 } 525 } 526 527 synchronized void transactionException(TransactionQueueEvent event, 528 Throwable e) { 529 mServicingCount--; 530 mTotalServiceExceptions++; 531 532 if (mListeners.size() > 0) { 533 event = new TransactionQueueEvent(event, e); 534 535 Iterator it = mListeners.iterator(); 536 while (it.hasNext()) { 537 ((TransactionQueueListener)it.next()) 538 .transactionException(event); 539 } 540 } 541 } 542 543 synchronized void uncaughtException(Throwable e) { 544 mTotalUncaughtExceptions++; 545 546 if (mExceptionListeners.size() > 0) { 547 UncaughtExceptionEvent event = 548 new UncaughtExceptionEvent(this, e); 549 550 Iterator it = mExceptionListeners.iterator(); 551 while (it.hasNext()) { 552 ((UncaughtExceptionListener)it.next()) 553 .uncaughtException(event); 554 } 555 } 556 else { 557 Thread current = Thread.currentThread(); 558 current.getThreadGroup().uncaughtException(current, e); 559 } 560 } 561 562 synchronized boolean exitThread(boolean force) { 563 if (!force && (mThreadCount - mServicingCount) <= 1 && 564 mQueue.size() > 0 && !mSuspended) { 565 566 return false; 569 } 570 else { 571 mThreadCount--; 572 return true; 573 } 574 } 575 576 private synchronized boolean ensureWaitingThread() { 577 if (mThreadCount <= mServicingCount) { 578 try { 579 startThread(mThreadCount == 0); 582 } 583 catch (NoThreadException e) { 584 if (!e.isThreadPoolClosed()) { 585 if (mThreadCount == 0) { 586 uncaughtException(e); 587 return false; 588 } 589 } 590 } 591 catch (InterruptedException e) { 592 return false; 593 } 594 catch (Throwable e) { 595 uncaughtException(e); 596 return false; 597 } 598 } 599 return true; 600 } 601 602 private class Worker implements Runnable { 603 public void run() { 604 boolean forceExit = false; 605 TransactionQueueEvent event; 606 607 while (true) { 608 try { 609 try { 611 if ((event = nextTransactionEvent()) == null) { 612 continue; 614 } 615 } 616 catch (InterruptedException e) { 617 forceExit = true; 618 continue; 619 } 620 621 long enqueueTimestamp = event.getTimestampMillis(); 622 623 try { 625 startThread(false); 626 } 627 catch (NoThreadException e) { 628 if (e.isThreadPoolClosed()) { 629 forceExit = true; 630 } 633 } 634 catch (InterruptedException e) { 635 forceExit = true; 636 } 639 catch (Throwable e) { 640 uncaughtException(e); 641 } 642 finally { 643 try { 649 event = transactionDequeued(event); 650 } 651 catch (Throwable e) { 652 uncaughtException(e); 653 } 654 } 655 656 long serviceTimestamp = event.getTimestampMillis(); 657 658 long timeout = getTransactionTimeout(); 660 if (timeout >= 0 && 661 (serviceTimestamp - enqueueTimestamp) >= timeout) { 662 try { 663 event.getTransaction().cancel(); 664 } 665 finally { 666 transactionExpired(event); 667 } 668 } 669 else { 670 try { 671 event.getTransaction().service(); 672 transactionServiced(event); 673 } 674 catch (Throwable e) { 675 uncaughtException(e); 676 677 try { 678 event.getTransaction().cancel(); 679 } 680 catch (Throwable e2) { 681 uncaughtException(e2); 682 } 683 684 transactionException(event, e); 685 } 686 } 687 } 688 catch (Throwable e) { 689 uncaughtException(e); 690 } 691 finally { 692 if (exitThread(forceExit)) { 693 break; 694 } 695 } 696 } 697 } 698 } 699 } 700 | Popular Tags |