KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > util > FastQueue


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.util;
18
19 /**
20  * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
21  * length when you have strange producer thread problemes.
22  *
23  * FIXME add i18n support to log messages
24  * @author Rainer Jung
25  * @author Peter Rossbach
26  * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $
27  */

28 /**
29  * @author peter
30  *
31  * TODO To change the template for this generated type comment go to
32  * Window - Preferences - Java - Code Style - Code Templates
33  */

34 public class FastQueue implements IQueue {
35
36     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
37             .getLog(FastQueue.class);
38
39     /**
40      * This is the actual queue
41      */

42     private SingleRemoveSynchronizedAddLock lock = null;
43
44     /**
45      * First Object at queue (consumer message)
46      */

47     private LinkObject first = null;
48
49     /**
50      * Last object in queue (producer Object)
51      */

52     private LinkObject last = null;
53
54     /**
55      * Current Queue elements size
56      */

57     private int size = 0;
58
59     /**
60      * check lock to detect strange threadings things
61      */

62     private boolean checkLock = false;
63
64     /**
65      * protocol the thread wait times
66      */

67     private boolean timeWait = false;
68
69     /**
70      * calc stats data
71      */

72     private boolean doStats = false;
73
74     /**
75      *
76      */

77     private boolean inAdd = false;
78
79     /**
80      *
81      */

82     private boolean inRemove = false;
83
84     /**
85      *
86      */

87     private boolean inMutex = false;
88
89     /**
90      * limit the queue legnth ( default is unlimited)
91      */

92     private int maxQueueLength = 0;
93
94     /**
95      * addWaitTimeout for producer
96      */

97     private long addWaitTimeout = 10000L;
98
99     
100     /**
101      * removeWaitTimeout for consumer
102      */

103     private long removeWaitTimeout = 30000L;
104
105     /**
106      * enabled the queue
107      */

108     private boolean enabled = true;
109
110     /**
111      * calc all add objects
112      */

113     private long addCounter = 0;
114
115     /**
116      * calc all add objetcs in error state ( see limit queue length)
117      */

118     private long addErrorCounter = 0;
119
120     /**
121      * calc all remove objects
122      */

123     private long removeCounter = 0;
124
125     /**
126      * calc all remove objects failures (hupps probleme detection)
127      */

128     private long removeErrorCounter = 0;
129
130     /**
131      * Calc wait time thread
132      */

133     private long addWait = 0;
134
135     /**
136      * Calc remove time threads
137      */

138     private long removeWait = 0;
139
140     /**
141      * max queue size
142      */

143     private int maxSize = 0;
144
145     /**
146      * avg queue size
147      */

148     private long avgSize = 0;
149
150     /*
151      *
152      */

153     private int maxSizeSample = 0;
154
155     /*
156      *
157      */

158     private long avgSizeSample = 0;
159
160     /**
161      * avg size sample interval
162      */

163     private int sampleInterval = 100;
164
165     /**
166      * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
167      * Timeouts
168      */

169     public FastQueue() {
170         lock = new SingleRemoveSynchronizedAddLock();
171         lock.setAddWaitTimeout(addWaitTimeout);
172         lock.setRemoveWaitTimeout(removeWaitTimeout);
173     }
174
175     /**
176      * get current add wait timeout
177      *
178      * @return current wait timeout
179      */

180     public long getAddWaitTimeout() {
181         addWaitTimeout = lock.getAddWaitTimeout();
182         return addWaitTimeout;
183     }
184
185     /**
186      * Set add wait timeout (default 10000 msec)
187      *
188      * @param timeout
189      */

190     public void setAddWaitTimeout(long timeout) {
191         addWaitTimeout = timeout;
192         lock.setAddWaitTimeout(addWaitTimeout);
193     }
194
195     /**
196      * get current remove wait timeout
197      *
198      * @return
199      */

200     public long getRemoveWaitTimeout() {
201         removeWaitTimeout = lock.getRemoveWaitTimeout();
202         return removeWaitTimeout;
203     }
204
205     /**
206      * set remove wait timeout ( default 30000 msec)
207      *
208      * @param timeout
209      */

210     public void setRemoveWaitTimeout(long timeout) {
211         removeWaitTimeout = timeout;
212         lock.setRemoveWaitTimeout(removeWaitTimeout);
213     }
214
215     /*
216      * get Max Queue length
217      *
218      * @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength()
219      */

220     public int getMaxQueueLength() {
221         return maxQueueLength;
222     }
223
224     public void setMaxQueueLength(int length) {
225         maxQueueLength = length;
226     }
227
228     public boolean isEnabled() {
229         return enabled;
230     }
231
232     public void setEnabled(boolean enable) {
233         enabled = enable;
234         if (!enabled) {
235             lock.abortRemove();
236         }
237     }
238
239     /*
240      * @return Returns the checkLock.
241      */

242     public boolean isCheckLock() {
243         return checkLock;
244     }
245
246     /*
247      * @param checkLock The checkLock to set.
248      */

249     public void setCheckLock(boolean checkLock) {
250         this.checkLock = checkLock;
251     }
252
253     /*
254      * @return Returns the doStats.
255      */

256     public boolean isDoStats() {
257         return doStats;
258     }
259
260     /*
261      * @param doStats The doStats to set.
262      */

263     public void setDoStats(boolean doStats) {
264         this.doStats = doStats;
265     }
266
267     /*
268      * @return Returns the timeWait.
269      */

270     public boolean isTimeWait() {
271         return timeWait;
272     }
273
274     /*
275      * @param timeWait The timeWait to set.
276      */

277     public void setTimeWait(boolean timeWait) {
278         this.timeWait = timeWait;
279     }
280
281     public int getSampleInterval() {
282         return sampleInterval;
283     }
284
285     public void setSampleInterval(int interval) {
286         sampleInterval = interval;
287     }
288
289     public long getAddCounter() {
290         return addCounter;
291     }
292
293     public void setAddCounter(long counter) {
294         addCounter = counter;
295     }
296
297     public long getAddErrorCounter() {
298         return addErrorCounter;
299     }
300
301     public void setAddErrorCounter(long counter) {
302         addErrorCounter = counter;
303     }
304
305     public long getRemoveCounter() {
306         return removeCounter;
307     }
308
309     public void setRemoveCounter(long counter) {
310         removeCounter = counter;
311     }
312
313     public long getRemoveErrorCounter() {
314         return removeErrorCounter;
315     }
316
317     public void setRemoveErrorCounter(long counter) {
318         removeErrorCounter = counter;
319     }
320
321     public long getAddWait() {
322         return addWait;
323     }
324
325     public void setAddWait(long wait) {
326         addWait = wait;
327     }
328
329     public long getRemoveWait() {
330         return removeWait;
331     }
332
333     public void setRemoveWait(long wait) {
334         removeWait = wait;
335     }
336
337     /**
338      * @return
339      */

340     public int getMaxSize() {
341         return maxSize;
342     }
343
344     /**
345      * @param size
346      */

347     public void setMaxSize(int size) {
348         maxSize = size;
349     }
350
351     
352     /**
353      * Avg queue size
354      * @return
355      */

356     public long getAvgSize() {
357         if (addCounter > 0) {
358             return avgSize / addCounter;
359         } else {
360             return 0;
361         }
362     }
363
364     /**
365      * reset all stats data
366      */

367     public void resetStatistics() {
368         addCounter = 0;
369         addErrorCounter = 0;
370         removeCounter = 0;
371         removeErrorCounter = 0;
372         avgSize = 0;
373         maxSize = 0;
374         addWait = 0;
375         removeWait = 0;
376     }
377
378     /**
379      * unlock queue for next add
380      */

381     public void unlockAdd() {
382         lock.unlockAdd(size > 0 ? true : false);
383     }
384
385     /**
386      * unlock queue for next remove
387      */

388     public void unlockRemove() {
389         lock.unlockRemove();
390     }
391
392     /**
393      * start queuing
394      */

395     public void start() {
396         setEnabled(true);
397     }
398
399     /**
400      * start queuing
401      */

402     public void stop() {
403         setEnabled(false);
404     }
405
406     public long getSample() {
407         return addCounter % sampleInterval;
408     }
409
410     public int getMaxSizeSample() {
411         return maxSizeSample;
412     }
413
414     public void setMaxSizeSample(int size) {
415         maxSizeSample = size;
416     }
417
418     public long getAvgSizeSample() {
419         long sample = addCounter % sampleInterval;
420         if (sample > 0) {
421             return avgSizeSample / sample;
422         } else if (addCounter > 0) {
423             return avgSizeSample / sampleInterval;
424         } else {
425             return 0;
426         }
427     }
428
429     public int getSize() {
430         int sz;
431         sz = size;
432         return sz;
433     }
434
435     /* Add new data to the queue
436      * @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, java.lang.Object)
437      * FIXME extract some method
438      */

439     public boolean add(String JavaDoc key, Object JavaDoc data) {
440         boolean ok = true;
441         long time = 0;
442
443         if (!enabled) {
444             if (log.isInfoEnabled())
445                 log.info("FastQueue: queue disabled, add aborted");
446             return false;
447         }
448
449         if (timeWait) {
450             time = System.currentTimeMillis();
451         }
452         lock.lockAdd();
453         try {
454             if (timeWait) {
455                 addWait += (System.currentTimeMillis() - time);
456             }
457
458             if (log.isTraceEnabled()) {
459                 log.trace("FastQueue: add starting with size " + size);
460             }
461             if (checkLock) {
462                 if (inAdd)
463                     log.warn("FastQueue.add: Detected other add");
464                 inAdd = true;
465                 if (inMutex)
466                     log.warn("FastQueue.add: Detected other mutex in add");
467                 inMutex = true;
468             }
469
470             if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
471                 ok = false;
472                 if (log.isTraceEnabled()) {
473                     log.trace("FastQueue: Could not add, since queue is full ("
474                             + size + ">=" + maxQueueLength + ")");
475                 }
476
477             } else {
478                 LinkObject element = new LinkObject(key, data);
479                 if (size == 0) {
480                     first = last = element;
481                     size = 1;
482                 } else {
483                     if (last == null) {
484                         ok = false;
485                         log
486                                 .error("FastQueue: Could not add, since last is null although size is "
487                                         + size + " (>0)");
488                     } else {
489                         last.append(element);
490                         last = element;
491                         size++;
492                     }
493                 }
494
495             }
496
497             if (doStats) {
498                 if (ok) {
499                     if (addCounter % sampleInterval == 0) {
500                         maxSizeSample = 0;
501                         avgSizeSample = 0;
502                     }
503                     addCounter++;
504                     if (size > maxSize) {
505                         maxSize = size;
506                     }
507                     if (size > maxSizeSample) {
508                         maxSizeSample = size;
509                     }
510                     avgSize += size;
511                     avgSizeSample += size;
512                 } else {
513                     addErrorCounter++;
514                 }
515             }
516
517             if (first == null) {
518                 log.error("FastQueue: first is null, size is " + size
519                         + " at end of add");
520             }
521             if (last == null) {
522                 log.error("FastQueue: last is null, size is " + size
523                         + " at end of add");
524             }
525
526             if (checkLock) {
527                 if (!inMutex)
528                     log.warn("FastQueue: Cancelled by other mutex in add");
529                 inMutex = false;
530                 if (!inAdd)
531                     log.warn("FastQueue: Cancelled by other add");
532                 inAdd = false;
533             }
534             if (log.isTraceEnabled()) {
535                 log.trace("FastQueue: add ending with size " + size);
536             }
537
538             if (timeWait) {
539                 time = System.currentTimeMillis();
540             }
541         } finally {
542             lock.unlockAdd(true);
543         }
544         if (timeWait) {
545             addWait += (System.currentTimeMillis() - time);
546         }
547         return ok;
548     }
549
550     /* remove the complete queued object list
551      * @see org.apache.catalina.cluster.util.IQueue#remove()
552      * FIXME extract some method
553      */

554     public LinkObject remove() {
555         LinkObject element;
556         boolean gotLock;
557         long time = 0;
558
559         if (!enabled) {
560             if (log.isInfoEnabled())
561                 log.info("FastQueue: queue disabled, remove aborted");
562             return null;
563         }
564
565         if (timeWait) {
566             time = System.currentTimeMillis();
567         }
568         gotLock = lock.lockRemove();
569         try {
570
571             if (!gotLock) {
572                 if (enabled) {
573                     if (timeWait) {
574                         removeWait += (System.currentTimeMillis() - time);
575                     }
576                     if (doStats) {
577                         removeErrorCounter++;
578                     }
579                     if (log.isInfoEnabled())
580                         log
581                                 .info("FastQueue: Remove aborted although queue enabled");
582                 } else {
583                     if (log.isInfoEnabled())
584                         log.info("FastQueue: queue disabled, remove aborted");
585                 }
586                 return null;
587             }
588
589             if (timeWait) {
590                 removeWait += (System.currentTimeMillis() - time);
591             }
592
593             if (log.isTraceEnabled()) {
594                 log.trace("FastQueue: remove starting with size " + size);
595             }
596             if (checkLock) {
597                 if (inRemove)
598                     log.warn("FastQueue: Detected other remove");
599                 inRemove = true;
600                 if (inMutex)
601                     log.warn("FastQueue: Detected other mutex in remove");
602                 inMutex = true;
603             }
604
605             element = first;
606
607             if (doStats) {
608                 if (element != null) {
609                     removeCounter++;
610                 } else {
611                     removeErrorCounter++;
612                     log
613                             .error("FastQueue: Could not remove, since first is null although size is "
614                                     + size + " (>0)");
615                 }
616             }
617
618             first = last = null;
619             size = 0;
620
621             if (checkLock) {
622                 if (!inMutex)
623                     log.warn("FastQueue: Cancelled by other mutex in remove");
624                 inMutex = false;
625                 if (!inRemove)
626                     log.warn("FastQueue: Cancelled by other remove");
627                 inRemove = false;
628             }
629             if (log.isTraceEnabled()) {
630                 log.trace("FastQueue: remove ending with size " + size);
631             }
632
633             if (timeWait) {
634                 time = System.currentTimeMillis();
635             }
636         } finally {
637             lock.unlockRemove();
638         }
639         if (timeWait) {
640             removeWait += (System.currentTimeMillis() - time);
641         }
642         return element;
643     }
644
645 }
Popular Tags