KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > go > trove > util > tq > TransactionQueue


1 /* ====================================================================
2  * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3  * ====================================================================
4  * The Tea Software License, Version 1.1
5  *
6  * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Walt Disney Internet Group (http://opensource.go.com/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact opensource@dig.com.
31  *
32  * 5. Products derived from this software may not be called "Tea",
33  * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34  * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35  * written permission of the Walt Disney Internet Group.
36  *
37  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40  * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48  * ====================================================================
49  *
50  * For more information about Tea, please see http://opensource.go.com/.
51  */

52
53 package com.go.trove.util.tq;
54
55 import java.util.*;
56 import com.go.trove.util.*;
57
58 /******************************************************************************
59  * TransactionQueue processes {@link Transaction Transactions} concurrently
60  * using threads obtained from a {@link ThreadPool}. When a transaction is
61  * enqueued, it goes into a waiting queue, and it is serviced as soon as a
62  * thread is available.
63  *
64  * @author Brian S O'Neill
65  * @version
66  * <!--$$Revision:--> 31 <!-- $-->, <!--$$JustDate:--> 01/02/15 <!-- $-->
67  */

68 public class TransactionQueue {
69     private ThreadPool mThreadPool;
70     private String JavaDoc mName;
71     private int mMaxSize;
72     private int mMaxThreads;
73     private long mIdleTimeout;
74     private long mTransactionTimeout;
75
76     private LinkedList mQueue = new LinkedList();
77     private int mThreadCount;
78     private int mServicingCount;
79     private int mThreadId;
80     private boolean mSuspended;
81
82     private Worker mWorker = new Worker();
83
84     private Collection mListeners = new LinkedList();
85     private Collection mExceptionListeners = new LinkedList();
86
87     // Used to gather time lapse statistics.
88
private long mTimeLapseStart;
89     private int mPeakQueueSize;
90     private int mPeakThreadCount;
91     private int mPeakServicingCount;
92     private int mTotalEnqueueAttempts;
93     private int mTotalEnqueued;
94     private int mTotalServiced;
95     private int mTotalExpired;
96     private int mTotalServiceExceptions;
97     private int mTotalUncaughtExceptions;
98     private long mTotalQueueDuration;
99     private long mTotalServiceDuration;
100
101     public TransactionQueue(ThreadPool tp, int maxSize, int maxThreads) {
102         this(tp, "TransactionQueue", maxSize, maxThreads);
103     }
104
105     public TransactionQueue(ThreadPool tp, String JavaDoc name,
106                             int maxSize, int maxThreads) {
107         mThreadPool = tp;
108         mName = name;
109
110         setMaximumSize(maxSize);
111         setMaximumThreads(maxThreads);
112
113         setIdleTimeout(tp.getIdleTimeout());
114         setTransactionTimeout(-1);
115
116         resetStatistics();
117     }
118
119     /**
120      * Sets the timeout (in milliseconds) for the TransactionQueue to wait
121      * inactive before going into an idle state. When the TransactionQueue is
122      * idle, there are no internal worker threads. A negative value specifies
123      * that the TransactionQueue should never automatically go into idle mode.
124      *
125      * @see #idle()
126      */

127     public synchronized void setIdleTimeout(long timeout) {
128         mIdleTimeout = timeout;
129     }
130
131     /**
132      * Returns the timeout (in milliseconds) that the TransactionQueue will
133      * wait inactive before going into an idle state. The default value is
134      * the same as the ThreadPool's idle timeout.
135      *
136      * @see #idle()
137      * @see ThreadPool#getIdleTimeout
138      */

139     public synchronized long getIdleTimeout() {
140         return mIdleTimeout;
141     }
142
143     /**
144      * Sets the timeout (in milliseconds) to wait before an enqueued
145      * transaction expires. If a worker receives an expired transaction, it
146      * is cancelled. A negative timeout specifies that enqueued transactions
147      * never expire.
148      */

149     public synchronized void setTransactionTimeout(long timeout) {
150         mTransactionTimeout = timeout;
151     }
152
153     /**
154      * Returns the timeout (in milliseconds) to wait before an enqueued
155      * transaction expires. The default value is -1, indicating that enqueued
156      * transactions never expire.
157      */

158     public synchronized long getTransactionTimeout() {
159         return mTransactionTimeout;
160     }
161
162     /**
163      * Returns the name of this TransactionQueue.
164      */

165     public String JavaDoc getName() {
166         return mName;
167     }
168
169     /**
170      * Returns the maximum allowed number of queued transactions.
171      */

172     public synchronized int getMaximumSize() {
173         return mMaxSize;
174     }
175
176     /**
177      * Setting the max size to zero disables enqueueing.
178      */

179     public synchronized void setMaximumSize(int max) {
180         if (max < 0) {
181             throw new IllegalArgumentException JavaDoc
182                 ("TransactionQueue max size must be positive: " + max);
183         }
184
185         mMaxSize = max;
186     }
187
188     /**
189      * Returns the maximum allowed number of worker threads.
190      */

191     public synchronized int getMaximumThreads() {
192         return mMaxThreads;
193     }
194
195     public synchronized void setMaximumThreads(int max) {
196         if (max < 1) {
197             throw new IllegalArgumentException JavaDoc
198                 ("TransactionQueue must have at least one thread: " + max);
199         }
200
201         mMaxThreads = max;
202     }
203
204     /**
205      * Enqueues a transaction that will be serviced when a worker is
206      * available. If the queue is full or cannot accept new transactions, the
207      * transaction is not enqueued, and false is returned.
208      *
209      * @return true if enqueued, false if queue is full or cannot accept new
210      * transactions.
211      */

212     public synchronized boolean enqueue(Transaction transaction) {
213         mTotalEnqueueAttempts++;
214
215         if (transaction == null || mThreadPool.isClosed()) {
216             return false;
217         }
218
219         int queueSize;
220         if ((queueSize = mQueue.size()) >= mMaxSize) {
221             if (mListeners.size() > 0) {
222                 TransactionQueueEvent event =
223                     new TransactionQueueEvent(this, transaction);
224
225                 Iterator it = mListeners.iterator();
226                 while (it.hasNext()) {
227                     ((TransactionQueueListener)it.next())
228                         .transactionQueueFull(event);
229                 }
230             }
231             return false;
232         }
233
234         if (!mSuspended) {
235             if (!ensureWaitingThread()) {
236                 return false;
237             }
238         }
239
240         mTotalEnqueued++;
241
242         TransactionQueueEvent event =
243             new TransactionQueueEvent(this, transaction);
244
245         mQueue.addLast(event);
246
247         if (++queueSize > mPeakQueueSize) {
248             mPeakQueueSize = queueSize;
249         }
250
251         notify();
252
253         if (mListeners.size() > 0) {
254             Iterator it = mListeners.iterator();
255             while (it.hasNext()) {
256                 ((TransactionQueueListener)it.next())
257                     .transactionEnqueued(event);
258             }
259         }
260
261         return true;
262     }
263
264     /**
265      * Suspends processing of transactions in the queue until resume is called.
266      * If suspend is called on a TransactionQueue that is already suspended,
267      * the call has no effect.
268      */

269     public synchronized void suspend() {
270         if (!mSuspended) {
271             mQueue.addFirst(null);
272             notify();
273             mSuspended = true;
274         }
275     }
276
277     /**
278      * Resumes processing of transactions in the queue if suspend was called.
279      * If resume is called on a TransactionQueue that is already running, the
280      * call has no effect, but true is still returned.
281      *
282      * @return false if couldn't resume because no threads available from pool.
283      */

284     public synchronized boolean resume() {
285         if (mSuspended) {
286             mSuspended = false;
287         }
288         return ensureWaitingThread();
289     }
290
291     /**
292      * Make this TransactionQueue go into idle mode and allow it to be
293      * reclaimed by the garbage collector if it is no longer used. Any pending
294      * transactions will be serviced, and any servicing transactions will
295      * finish. If any transactions are added to the TransactionQueue while
296      * idle, it will reactivate itself. New TransactionQueues start out in an
297      * idle state.
298      *
299      * @see #setIdleTimeout
300      * @see #getIdleTimeout
301      */

302     public synchronized void idle() {
303         mQueue.addLast(null);
304         notify();
305     }
306
307     public synchronized void addTransactionQueueListener
308         (TransactionQueueListener listener) {
309
310         mListeners.add(listener);
311     }
312
313     public synchronized void removeTransactionQueueListener
314         (TransactionQueueListener listener) {
315
316         mListeners.remove(listener);
317     }
318
319     public synchronized void addUncaughtExceptionListener
320         (UncaughtExceptionListener listener) {
321
322         mExceptionListeners.add(listener);
323     }
324
325     public synchronized void removeUncaughtExceptionListener
326         (UncaughtExceptionListener listener) {
327
328         mExceptionListeners.remove(listener);
329     }
330
331     /**
332      * Returns the number of currently queued transactions.
333      */

334     public synchronized int getQueueSize() {
335         return mQueue.size();
336     }
337
338     /**
339      * Returns the current amount of worker threads.
340      */

341     public synchronized int getThreadCount() {
342         return mThreadCount;
343     }
344
345     /**
346      * Returns a snapshot of the statistics on this TransactionQueue.
347      */

348     public synchronized TransactionQueueData getStatistics() {
349         return new TransactionQueueData(this,
350                                         mTimeLapseStart,
351                                         System.currentTimeMillis(),
352                                         mQueue.size(),
353                                         mThreadCount,
354                                         mServicingCount,
355                                         mPeakQueueSize,
356                                         mPeakThreadCount,
357                                         mPeakServicingCount,
358                                         mTotalEnqueueAttempts,
359                                         mTotalEnqueued,
360                                         mTotalServiced,
361                                         mTotalExpired,
362                                         mTotalServiceExceptions,
363                                         mTotalUncaughtExceptions,
364                                         mTotalQueueDuration,
365                                         mTotalServiceDuration);
366     }
367
368     /**
369      * Resets all time lapse statistics.
370      */

371     public synchronized void resetStatistics() {
372         mPeakQueueSize = 0;
373         mPeakThreadCount = 0;
374         mPeakServicingCount = 0;
375         mTotalEnqueueAttempts = 0;
376         mTotalEnqueued = 0;
377         mTotalServiced = 0;
378         mTotalExpired = 0;
379         mTotalServiceExceptions = 0;
380         mTotalUncaughtExceptions = 0;
381         mTotalQueueDuration = 0;
382         mTotalServiceDuration = 0;
383
384         mTimeLapseStart = System.currentTimeMillis();
385     }
386
387     /**
388      * Understands and applies the following integer properties.
389      *
390      * <ul>
391      * <li>max.size - setMaximumSize
392      * <li>max.threads - setMaximumThreads
393      * <li>timeout.idle - setIdleTimeout
394      * <li>timeout.transaction - setTransactionTimeout
395      * <li>tune.size - Automatically tunes queue size when "true" and
396      * transaction timeout set.
397      * <li>tune.threads - Automatically tunes maximum thread count.
398      * </ul>
399      */

400     public synchronized void applyProperties(PropertyMap properties) {
401         if (properties.containsKey("max.size")) {
402             setMaximumSize(properties.getInt("max.size"));
403         }
404
405         if (properties.containsKey("max.threads")) {
406             setMaximumThreads(properties.getInt("max.threads"));
407         }
408
409         if (properties.containsKey("timeout.idle")) {
410             setIdleTimeout(properties.getNumber("timeout.idle").longValue());
411         }
412
413         if (properties.containsKey("timeout.transaction")) {
414             setTransactionTimeout
415                 (properties.getNumber("timeout.transaction").longValue());
416         }
417
418         if ("true".equalsIgnoreCase(properties.getString("tune.size"))) {
419             addTransactionQueueListener(new TransactionQueueSizeTuner());
420         }
421
422         if ("true".equalsIgnoreCase(properties.getString("tune.threads"))) {
423             addTransactionQueueListener(new TransactionQueueThreadTuner());
424         }
425     }
426
427     synchronized void startThread(boolean canwait)
428         throws InterruptedException JavaDoc {
429
430         if (mThreadCount < mMaxThreads) {
431             String JavaDoc threadName = getName() + ' ' + (mThreadId++);
432             if (canwait) {
433                 mThreadPool.start(mWorker, threadName);
434             }
435             else {
436                 mThreadPool.start(mWorker, 0, threadName);
437             }
438
439             if (++mThreadCount > mPeakThreadCount) {
440                 mPeakThreadCount = mThreadCount;
441             }
442         }
443     }
444
445     /**
446      * Returns null when the TransactionQueue should go idle.
447      */

448     synchronized TransactionQueueEvent nextTransactionEvent()
449         throws InterruptedException JavaDoc {
450
451         if (mQueue.isEmpty()) {
452             if (mIdleTimeout != 0) {
453                 if (mIdleTimeout < 0) {
454                     wait();
455                 }
456                 else {
457                     wait(mIdleTimeout);
458                 }
459             }
460         }
461
462         if (mQueue.isEmpty()) {
463             return null;
464         }
465
466         return (TransactionQueueEvent)mQueue.removeFirst();
467     }
468
469     synchronized TransactionQueueEvent transactionDequeued
470         (TransactionQueueEvent event) {
471
472         if (++mServicingCount > mPeakServicingCount) {
473             mPeakServicingCount = mServicingCount;
474         }
475
476         TransactionQueueEvent deqEvent = new TransactionQueueEvent(event);
477
478         mTotalQueueDuration +=
479             (deqEvent.getTimestampMillis() - event.getTimestampMillis());
480
481         if (mListeners.size() > 0) {
482             Iterator it = mListeners.iterator();
483             while (it.hasNext()) {
484                 ((TransactionQueueListener)it.next())
485                     .transactionDequeued(deqEvent);
486             }
487         }
488
489         return deqEvent;
490     }
491
492     synchronized void transactionServiced(TransactionQueueEvent event) {
493         TransactionQueueEvent svcEvent = new TransactionQueueEvent(event);
494
495         mTotalServiceDuration +=
496             (svcEvent.getTimestampMillis() - event.getTimestampMillis());
497
498         if (mListeners.size() > 0) {
499             Iterator it = mListeners.iterator();
500             while (it.hasNext()) {
501                 ((TransactionQueueListener)it.next())
502                     .transactionServiced(svcEvent);
503             }
504         }
505
506         // Adjust counters at end in case a listener threw an exception and let
507
// the call to transactionException adjust the counters instead.
508
mServicingCount--;
509         mTotalServiced++;
510     }
511
512     synchronized void transactionExpired(TransactionQueueEvent event) {
513         mServicingCount--;
514         mTotalExpired++;
515
516         if (mListeners.size() > 0) {
517             event = new TransactionQueueEvent(event);
518
519             Iterator it = mListeners.iterator();
520             while (it.hasNext()) {
521                 ((TransactionQueueListener)it.next())
522                     .transactionExpired(event);
523             }
524         }
525     }
526
527     synchronized void transactionException(TransactionQueueEvent event,
528                                            Throwable JavaDoc e) {
529         mServicingCount--;
530         mTotalServiceExceptions++;
531
532         if (mListeners.size() > 0) {
533             event = new TransactionQueueEvent(event, e);
534
535             Iterator it = mListeners.iterator();
536             while (it.hasNext()) {
537                 ((TransactionQueueListener)it.next())
538                     .transactionException(event);
539             }
540         }
541     }
542
543     synchronized void uncaughtException(Throwable JavaDoc e) {
544         mTotalUncaughtExceptions++;
545
546         if (mExceptionListeners.size() > 0) {
547             UncaughtExceptionEvent event =
548                 new UncaughtExceptionEvent(this, e);
549
550             Iterator it = mExceptionListeners.iterator();
551             while (it.hasNext()) {
552                 ((UncaughtExceptionListener)it.next())
553                     .uncaughtException(event);
554             }
555         }
556         else {
557             Thread JavaDoc current = Thread.currentThread();
558             current.getThreadGroup().uncaughtException(current, e);
559         }
560     }
561
562     synchronized boolean exitThread(boolean force) {
563         if (!force && (mThreadCount - mServicingCount) <= 1 &&
564             mQueue.size() > 0 && !mSuspended) {
565
566             // Can't exit thread because transactions are waiting to
567
// be serviced, and no thread is waiting on the queue.
568
return false;
569         }
570         else {
571             mThreadCount--;
572             return true;
573         }
574     }
575
576     private synchronized boolean ensureWaitingThread() {
577         if (mThreadCount <= mServicingCount) {
578             try {
579                 // Only wait if no threads. Otherwise the lock on this object
580
// will prevent threads from entering the exitThread method.
581
startThread(mThreadCount == 0);
582             }
583             catch (NoThreadException e) {
584                 if (!e.isThreadPoolClosed()) {
585                     if (mThreadCount == 0) {
586                         uncaughtException(e);
587                         return false;
588                     }
589                 }
590             }
591             catch (InterruptedException JavaDoc e) {
592                 return false;
593             }
594             catch (Throwable JavaDoc e) {
595                 uncaughtException(e);
596                 return false;
597             }
598         }
599         return true;
600     }
601
602     private class Worker implements Runnable JavaDoc {
603         public void run() {
604             boolean forceExit = false;
605             TransactionQueueEvent event;
606
607             while (true) {
608                 try {
609                     // Phase 1: wait for a transaction
610
try {
611                         if ((event = nextTransactionEvent()) == null) {
612                             // Go into idle mode.
613
continue;
614                         }
615                     }
616                     catch (InterruptedException JavaDoc e) {
617                         forceExit = true;
618                         continue;
619                     }
620
621                     long enqueueTimestamp = event.getTimestampMillis();
622
623                     // Phase 2: spawn off a replacement thread
624
try {
625                         startThread(false);
626                     }
627                     catch (NoThreadException e) {
628                         if (e.isThreadPoolClosed()) {
629                             forceExit = true;
630                             // Don't "continue" because the transaction must
631
// still be serviced first.
632
}
633                     }
634                     catch (InterruptedException JavaDoc e) {
635                         forceExit = true;
636                         // Don't "continue" because the transaction must
637
// still be serviced first.
638
}
639                     catch (Throwable JavaDoc e) {
640                         uncaughtException(e);
641                     }
642                     finally {
643                         // Only indicate that transaction has been dequeued
644
// after a replacement thread has been created.
645
// Queue time is more accurate this way because time
646
// spent waiting for a thread is time spent not being
647
// serviced.
648
try {
649                             event = transactionDequeued(event);
650                         }
651                         catch (Throwable JavaDoc e) {
652                             uncaughtException(e);
653                         }
654                     }
655
656                     long serviceTimestamp = event.getTimestampMillis();
657
658                     // Phase 3: service the transaction
659
long timeout = getTransactionTimeout();
660                     if (timeout >= 0 &&
661                         (serviceTimestamp - enqueueTimestamp) >= timeout) {
662                         try {
663                             event.getTransaction().cancel();
664                         }
665                         finally {
666                             transactionExpired(event);
667                         }
668                     }
669                     else {
670                         try {
671                             event.getTransaction().service();
672                             transactionServiced(event);
673                         }
674                         catch (Throwable JavaDoc e) {
675                             uncaughtException(e);
676
677                             try {
678                                 event.getTransaction().cancel();
679                             }
680                             catch (Throwable JavaDoc e2) {
681                                 uncaughtException(e2);
682                             }
683
684                             transactionException(event, e);
685                         }
686                     }
687                 }
688                 catch (Throwable JavaDoc e) {
689                     uncaughtException(e);
690                 }
691                 finally {
692                     if (exitThread(forceExit)) {
693                         break;
694                     }
695                 }
696             }
697         }
698     }
699 }
700
Popular Tags