KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > workqueue > AutomaticWorkQueueImpl


1 package org.objectweb.celtix.bus.workqueue;
2
3 import java.util.concurrent.ArrayBlockingQueue JavaDoc;
4 import java.util.concurrent.RejectedExecutionException JavaDoc;
5 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
6 import java.util.concurrent.TimeUnit JavaDoc;
7 import java.util.logging.Level JavaDoc;
8 import java.util.logging.Logger JavaDoc;
9
10 import org.objectweb.celtix.common.logging.LogUtils;
11 import org.objectweb.celtix.workqueue.AutomaticWorkQueue;
12
13 public class AutomaticWorkQueueImpl extends ThreadPoolExecutor JavaDoc implements AutomaticWorkQueue {
14
15     static final int DEFAULT_MAX_QUEUE_SIZE = 128;
16     private static final Logger JavaDoc LOG =
17         LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class);
18     
19     int maxQueueSize;
20
21     AutomaticWorkQueueImpl(int mqs, int initialThreads, int highWaterMark, int lowWaterMark,
22                            long dequeueTimeout) {
23         
24         super(-1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark,
25             -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark,
26                 TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), TimeUnit.MILLISECONDS,
27                 mqs == -1 ? new ArrayBlockingQueue JavaDoc<Runnable JavaDoc>(DEFAULT_MAX_QUEUE_SIZE)
28                     : new ArrayBlockingQueue JavaDoc<Runnable JavaDoc>(mqs));
29         
30         maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs;
31         lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark;
32         highWaterMark = -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark;
33                 
34         StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
35         buf.append("Constructing automatic work queue with:\n");
36         buf.append("max queue size: " + maxQueueSize + "\n");
37         buf.append("initialThreads: " + initialThreads + "\n");
38         buf.append("lowWaterMark: " + lowWaterMark + "\n");
39         buf.append("highWaterMark: " + highWaterMark + "\n");
40         LOG.fine(buf.toString());
41
42         if (initialThreads > highWaterMark) {
43             initialThreads = highWaterMark;
44         }
45
46         // as we cannot prestart more core than corePoolSize initial threads, we temporarily
47
// change the corePoolSize to the number of initial threads
48
// this is important as otherwise these threads will be created only when the queue has filled up,
49
// potentially causing problems with starting up under heavy load
50
if (initialThreads < Integer.MAX_VALUE && initialThreads > 0) {
51             setCorePoolSize(initialThreads);
52             int started = prestartAllCoreThreads();
53             if (started < initialThreads) {
54                 LOG.log(Level.WARNING, "THREAD_START_FAILURE_MSG", new Object JavaDoc[] {started, initialThreads});
55             }
56             setCorePoolSize(lowWaterMark);
57         }
58     }
59
60     public String JavaDoc toString() {
61         StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
62         buf.append(super.toString());
63         buf.append(" [queue size: ");
64         buf.append(getSize());
65         buf.append(", max size: ");
66         buf.append(maxQueueSize);
67         buf.append(", threads: ");
68         buf.append(getPoolSize());
69         buf.append(", active threads: ");
70         buf.append(getActiveCount());
71         buf.append(", low water mark: ");
72         buf.append(getLowWaterMark());
73         buf.append(", high water mark: ");
74         buf.append(getHighWaterMark());
75         buf.append("]");
76         return buf.toString();
77     }
78     
79     // WorkQueue interface
80

81     /* (non-Javadoc)
82      * @see org.objectweb.celtix.workqueue.WorkQueue#execute(java.lang.Runnable, long)
83      */

84     public void execute(Runnable JavaDoc work, long timeout) {
85         try {
86             execute(work);
87         } catch (RejectedExecutionException JavaDoc ree) {
88             try {
89                 getQueue().offer(work, timeout, TimeUnit.MILLISECONDS);
90             } catch (InterruptedException JavaDoc ie) {
91                 throw new RejectedExecutionException JavaDoc(ie);
92             }
93         }
94     }
95
96     /* (non-Javadoc)
97      * @see org.objectweb.celtix.workqueue.WorkQueue#schedule(java.lang.Runnable, long)
98      */

99     public void schedule(final Runnable JavaDoc work, final long delay) {
100         // temporary implementation, replace with shared long-lived scheduler
101
// task
102
execute(new Runnable JavaDoc() {
103             public void run() {
104                 try {
105                     Thread.sleep(delay);
106                 } catch (InterruptedException JavaDoc ie) {
107                     // ignore
108
}
109                 work.run();
110             }
111         });
112     }
113     
114     // AutomaticWorkQueue interface
115

116     public void shutdown(boolean processRemainingWorkItems) {
117         if (!processRemainingWorkItems) {
118             getQueue().clear();
119         }
120         shutdown();
121     }
122
123     /**
124      * Gets the maximum size (capacity) of the backing queue.
125      * @return the maximum size (capacity) of the backing queue.
126      */

127     long getMaxSize() {
128         return maxQueueSize;
129     }
130
131     /**
132      * Gets the current size of the backing queue.
133      * @return the current size of the backing queue.
134      */

135     public long getSize() {
136         return getQueue().size();
137     }
138
139
140     public boolean isEmpty() {
141         return getQueue().size() == 0;
142     }
143
144     boolean isFull() {
145         return getQueue().remainingCapacity() == 0;
146     }
147
148     int getHighWaterMark() {
149         int hwm = getMaximumPoolSize();
150         return hwm == Integer.MAX_VALUE ? -1 : hwm;
151     }
152
153     int getLowWaterMark() {
154         int lwm = getCorePoolSize();
155         return lwm == Integer.MAX_VALUE ? -1 : lwm;
156     }
157
158     void setHighWaterMark(int hwm) {
159         setMaximumPoolSize(hwm < 0 ? Integer.MAX_VALUE : hwm);
160     }
161
162     void setLowWaterMark(int lwm) {
163         setCorePoolSize(lwm < 0 ? 0 : lwm);
164     }
165 }
166
Popular Tags