KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > DynamicWorkerPool


1 // $Id: DynamicWorkerPool.java 1365 2007-06-23 10:06:40Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket;
23
24 import java.util.Collection JavaDoc;
25 import java.util.LinkedList JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Timer JavaDoc;
28 import java.util.TimerTask JavaDoc;
29 import java.util.concurrent.Callable JavaDoc;
30 import java.util.concurrent.Executor JavaDoc;
31 import java.util.concurrent.Future JavaDoc;
32 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
33 import java.util.concurrent.RejectedExecutionException JavaDoc;
34 import java.util.concurrent.ThreadFactory JavaDoc;
35 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
36 import java.util.concurrent.TimeUnit JavaDoc;
37 import java.util.logging.Level JavaDoc;
38 import java.util.logging.Logger JavaDoc;
39
40 import org.xsocket.IWorkerPool;
41
42
43 /**
44  * @deprecated use a ThreadPool implementation of java.util.concurrent package ({@link Executor}) instead
45  *
46  * @author grro@xsocket.org
47  */

48 public final class DynamicWorkerPool implements IWorkerPool {
49     
50     private static final Logger JavaDoc LOG = Logger.getLogger(DynamicWorkerPool.class.getName());
51
52     public static final String JavaDoc WORKER_PREFIX = "xWorker";
53     
54     public static final int DEFAULT_LOAD_THRESHOLD_DECREASE = 20;
55     public static final int DEFAULT_LOAD_THRESHOLD_INCREASE = 80;
56     public static final int DEFAULT_ADJUST_CHECK_PERIOD = 1000;
57     
58     
59     private static final int SLUGGISH_PERIOD = 30 * 1000;
60
61     private final Timer JavaDoc timer = new Timer JavaDoc("xDynamicWorkerPoolTimer", true);
62
63     
64     private ThreadPoolExecutor JavaDoc executor = null;
65     private final Object JavaDoc decLock = new Object JavaDoc();
66
67     private SizeManager sizeManager = null;
68     private int minSize = 0;
69     private int maxSize = 0;
70
71     
72     /**
73      *
74      * @param minSize the min pool size (and the initial size)
75      * @param maxSize the max pool size
76      */

77     public DynamicWorkerPool(int minSize, int maxSize) {
78         this.minSize = minSize;
79         this.maxSize = maxSize;
80         
81         int initial = minSize;
82         if (initial < 1) {
83             initial = 1;
84         }
85         
86         executor = new ThreadPoolExecutor JavaDoc(initial, initial
87                                           , 0L, TimeUnit.MILLISECONDS
88                                           , new LinkedBlockingQueue JavaDoc<Runnable JavaDoc>()
89                                           , new WorkerThreadFactory());
90         
91         sizeManager = new SizeManager(DEFAULT_ADJUST_CHECK_PERIOD);
92         
93         // reduce timer priority
94
timer.schedule(new TimerTask JavaDoc() {
95               public void run() {
96                 Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
97               }
98             }, 0);
99     }
100     
101     
102     /**
103      * {@inheritDoc}
104      */

105     public void execute(Runnable JavaDoc command) {
106         try {
107             if (minSize < 1) {
108                 synchronized (decLock) {
109                     if (getPoolSize() < 1) {
110                         sizeManager.timeLastChange = System.currentTimeMillis(); // prevent immediately decrease
111
executor.setCorePoolSize(1);
112                     }
113                 }
114             }
115             
116             executor.execute(command);
117             
118         } catch (RejectedExecutionException JavaDoc e) {
119             
120             // is pool is shutdown run dedicated thread
121
if (executor.isShutdown()) {
122                 Thread JavaDoc t = new Thread JavaDoc(command);
123                 t.setDaemon(true);
124                 t.start();
125                 
126             // houldn't occur
127
} else {
128                 LOG.warning("couldn't process command " + command + ". Reason: " + e.toString());
129             }
130         }
131     }
132         
133     
134
135     /**
136      * {@inheritDoc}
137      */

138     public <T> List JavaDoc<Future JavaDoc<T>> invokeAll(Collection JavaDoc<Callable JavaDoc<T>> tasks) throws InterruptedException JavaDoc {
139         return executor.invokeAll(tasks);
140     }
141     
142     
143     /**
144      * {@inheritDoc}
145      */

146     public int getActiveCount() {
147         return executor.getActiveCount();
148     }
149
150     
151     /**
152      * {@inheritDoc}
153      */

154     public int getPoolSize() {
155         return executor.getCorePoolSize();
156     }
157
158
159     
160     /**
161      * return the load (range: 0...100)
162      *
163      * @return the load
164      */

165     public int getLoad() {
166         return (int) sizeManager.load.getValue();
167     }
168     
169     private int currentLoad() {
170         int currentSize = getPoolSize();
171         if (currentSize == 0) {
172             return 0;
173         }
174         
175         int activeCount = getActiveCount();
176         if (activeCount == 0) {
177             return 0;
178         }
179         
180         return (int) ((activeCount * 100) / currentSize);
181     }
182     
183
184     /**
185      * @return the maximum pool size
186      */

187     public int getMaximumPoolSize() {
188         return maxSize;
189     }
190     
191     
192
193     /**
194      * @return the minimum pool size
195      */

196     public int getMinimumPoolSize() {
197         return minSize;
198     }
199     
200     
201     /**
202      * @return true, if the worker pool is open
203      */

204     public boolean isOpen() {
205         return !executor.isShutdown();
206     }
207     
208     
209     /**
210      *
211      */

212     public void close() {
213         sizeManager.shutdown();
214         executor.shutdownNow();
215         
216         timer.cancel();
217     }
218     
219
220     /**
221      *
222      * @param adjustPeriodSec the adjust check period of the pool size in seconds
223      */

224     public void setAdjustPeriod(int adjustPeriodSec) {
225         sizeManager.setAdjustPeriod(adjustPeriodSec);
226     }
227     
228     
229     /**
230      * @return the adjust check period of the pool size in seconds
231      */

232     public int getAdjustPeriod() {
233         return sizeManager.getAdjustPeriod();
234     }
235
236     
237     /**
238      * @return the increase threshold
239      */

240     public int getThresholdIncrease() {
241         return sizeManager.getIncThreshold();
242     }
243
244     
245     /**
246      * @param incThreshold the increase threshold
247      */

248     public void setThresholdIncrease(int incThreshold) {
249         sizeManager.setIncThreshold(incThreshold);
250     }
251
252
253     /**
254      * @return the decrease threshold
255      */

256     public int getThresholdDecrease() {
257         return sizeManager.getDecThreshold();
258     }
259     
260     
261     /**
262      * @param decThreshold the decrease threshold
263      */

264     public void setThresholdDecrease(int decThreshold) {
265         sizeManager.setDecThreshold(decThreshold);
266     }
267     
268
269
270     /**
271      * @return the load
272      */

273     int getLoadSluggish() {
274         return (int) sizeManager.sluggishLoad.getValue();
275     }
276     
277     
278     /**
279      * @return the decrease rate
280      */

281     long getDecRate() {
282         return sizeManager.getDecRate();
283     }
284     
285     
286     /**
287      * {@inheritDoc}
288      */

289     @Override JavaDoc
290     public String JavaDoc toString() {
291         return "DynamicWorkerPool (size=" + getPoolSize() + ", running=" + getActiveCount()+ ", load=" + getLoad()
292              + ", minSize=" + getMinimumPoolSize() + ", maxSize=" + getMaximumPoolSize() + ", isOpen=" + isOpen() + ")";
293     }
294     
295     
296     private static final class WorkerThreadFactory implements ThreadFactory JavaDoc {
297         private static int poolCounter = 0;
298         private int threadCounter = 0;
299         private String JavaDoc namePrefix = null;
300         
301         WorkerThreadFactory() {
302             namePrefix = WORKER_PREFIX + "-" + (++poolCounter) + "-";
303         }
304         
305         public Thread JavaDoc newThread(Runnable JavaDoc r) {
306             Thread JavaDoc t = new Thread JavaDoc(r);
307             t.setName(namePrefix + (++threadCounter));
308             t.setDaemon(true);
309             t.setPriority(Thread.NORM_PRIORITY);
310             return t;
311         }
312     }
313     
314     
315     private final class SizeManager {
316         private long timeLastChange = System.currentTimeMillis();
317         
318         private int adjustPeriodSec = 0;
319         
320         private int decRate = 0;
321         private Average load = null;
322         private Average sluggishLoad = null;
323         
324         private int incThreshold = DEFAULT_LOAD_THRESHOLD_INCREASE;
325         private int decThreshold = DEFAULT_LOAD_THRESHOLD_DECREASE;
326         
327         private TimerTask JavaDoc task = null;
328         
329
330         
331         
332         public SizeManager(int period) {
333             load = new Average(3);
334             sluggishLoad = new Average((int) (SLUGGISH_PERIOD / period));
335             decRate = 5 * period;
336
337             setAdjustPeriod(period);
338         }
339         
340         
341         public int getDecThreshold() {
342             return decThreshold;
343         }
344         
345         public int getIncThreshold() {
346             return incThreshold;
347         }
348         
349         public long getDecRate() {
350             return decRate;
351         }
352         
353         public void setDecThreshold(int decThreshold) {
354             this.decThreshold = decThreshold;
355         }
356         
357         public void setIncThreshold(int incThreshold) {
358             this.incThreshold = incThreshold;
359         }
360         
361         public void setAdjustPeriod(int period) {
362             this.adjustPeriodSec = period;
363             
364             if (task != null) {
365                 task.cancel();
366             }
367             
368             
369             task = new TimerTask JavaDoc() {
370                 @Override JavaDoc
371                 public void run() {
372                     adjustWorkerSize();
373                 }
374             };
375             timer.schedule(task, period, period);
376         }
377         
378         
379         public int getAdjustPeriod() {
380             return adjustPeriodSec;
381         }
382         
383         
384         public void shutdown() {
385             if (task != null) {
386                 task.cancel();
387             }
388         }
389         
390         
391         private void adjustWorkerSize() {
392
393             try {
394                 int size = getPoolSize();
395                 double load = currentLoad();
396                 
397                 checkForInc(size, load);
398                 checkForDec(size, load);
399             } catch (Exception JavaDoc e) {
400                 if (LOG.isLoggable(Level.FINE)) {
401                     LOG.fine("error occured vy adjusting pool size. Reason: " + e.toString());
402                 }
403             }
404         }
405     
406         
407         private void checkForInc(int size, double currentLoad) {
408             load.add((int) currentLoad);
409             
410             if ((load.getValue() >= incThreshold)) {
411                 if (size < maxSize) {
412                     if (LOG.isLoggable(Level.FINE)) {
413                         LOG.fine("average load is " + load.getValue() + " increase pool size. new size is "
414                                 + (size + 1) + " (minSize=" + getMinimumPoolSize()
415                                 + ", maxSize=" + getMaximumPoolSize() + ")");
416                     }
417                     executor.setCorePoolSize(size + 1);
418                     timeLastChange = System.currentTimeMillis();
419                 }
420             }
421         }
422         
423         
424         private void checkForDec(int size, double currentLoad) {
425             sluggishLoad.add((int) currentLoad);
426     
427             if (load.getValue() <= decThreshold) {
428                 if ((sluggishLoad.getValue() <= decThreshold) && (size > minSize)) {
429                     if (System.currentTimeMillis() > (timeLastChange + decRate)) {
430                         if (size == 1) {
431                             synchronized (decLock) {
432                                 if (!executor.getQueue().isEmpty()) {
433                                     if (LOG.isLoggable(Level.FINE)) {
434                                         LOG.fine("average load is " + sluggishLoad.getValue() + " decrease pool size. new size is "
435                                                 + (size - 1) + " minSize=" + getMinimumPoolSize()
436                                                 + ", maxSize=" + getMaximumPoolSize() + ")");
437                                     }
438                                     executor.setCorePoolSize(size - 1);
439                                     timeLastChange = System.currentTimeMillis();
440                                 }
441                             }
442                         }
443                     }
444                 }
445             }
446         }
447     }
448
449
450
451     
452     private static final class Average {
453         private LinkedList JavaDoc<Double JavaDoc> list = new LinkedList JavaDoc<Double JavaDoc>();
454         private int capacity = 0;
455         
456         Average(int capacity) {
457             this.capacity = capacity;
458         }
459         
460         public void add(double size) {
461             list.addLast(size);
462             if (list.size() > capacity) {
463                 list.removeFirst();
464             }
465         }
466         
467         public void clear() {
468             list.clear();
469         }
470         
471         @SuppressWarnings JavaDoc("unchecked")
472         public double getValue() {
473             if (list.size() == 0) {
474                 return 0;
475             }
476
477             double i = 0;
478             LinkedList JavaDoc<Double JavaDoc> copy = (LinkedList JavaDoc<Double JavaDoc>) list.clone();
479             
480             for (double size : copy) {
481                 i += size;
482             }
483             
484             if (i <= 0) {
485                 return 0;
486             }
487             
488             i = i / copy.size();
489             
490             return i;
491         }
492     }
493 }
494
Popular Tags