1 package org.objectweb.celtix.bus.workqueue; 2 3 import java.util.concurrent.ArrayBlockingQueue ; 4 import java.util.concurrent.RejectedExecutionException ; 5 import java.util.concurrent.ThreadPoolExecutor ; 6 import java.util.concurrent.TimeUnit ; 7 import java.util.logging.Level ; 8 import java.util.logging.Logger ; 9 10 import org.objectweb.celtix.common.logging.LogUtils; 11 import org.objectweb.celtix.workqueue.AutomaticWorkQueue; 12 13 public class AutomaticWorkQueueImpl extends ThreadPoolExecutor implements AutomaticWorkQueue { 14 15 static final int DEFAULT_MAX_QUEUE_SIZE = 128; 16 private static final Logger LOG = 17 LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class); 18 19 int maxQueueSize; 20 21 AutomaticWorkQueueImpl(int mqs, int initialThreads, int highWaterMark, int lowWaterMark, 22 long dequeueTimeout) { 23 24 super(-1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark, 25 -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark, 26 TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), TimeUnit.MILLISECONDS, 27 mqs == -1 ? new ArrayBlockingQueue <Runnable >(DEFAULT_MAX_QUEUE_SIZE) 28 : new ArrayBlockingQueue <Runnable >(mqs)); 29 30 maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs; 31 lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark; 32 highWaterMark = -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark; 33 34 StringBuffer buf = new StringBuffer (); 35 buf.append("Constructing automatic work queue with:\n"); 36 buf.append("max queue size: " + maxQueueSize + "\n"); 37 buf.append("initialThreads: " + initialThreads + "\n"); 38 buf.append("lowWaterMark: " + lowWaterMark + "\n"); 39 buf.append("highWaterMark: " + highWaterMark + "\n"); 40 LOG.fine(buf.toString()); 41 42 if (initialThreads > highWaterMark) { 43 initialThreads = highWaterMark; 44 } 45 46 if (initialThreads < Integer.MAX_VALUE && initialThreads > 0) { 51 setCorePoolSize(initialThreads); 52 int started = prestartAllCoreThreads(); 53 if (started < initialThreads) { 54 LOG.log(Level.WARNING, "THREAD_START_FAILURE_MSG", new Object [] {started, initialThreads}); 55 } 56 setCorePoolSize(lowWaterMark); 57 } 58 } 59 60 public String toString() { 61 StringBuffer buf = new StringBuffer (); 62 buf.append(super.toString()); 63 buf.append(" [queue size: "); 64 buf.append(getSize()); 65 buf.append(", max size: "); 66 buf.append(maxQueueSize); 67 buf.append(", threads: "); 68 buf.append(getPoolSize()); 69 buf.append(", active threads: "); 70 buf.append(getActiveCount()); 71 buf.append(", low water mark: "); 72 buf.append(getLowWaterMark()); 73 buf.append(", high water mark: "); 74 buf.append(getHighWaterMark()); 75 buf.append("]"); 76 return buf.toString(); 77 } 78 79 81 84 public void execute(Runnable work, long timeout) { 85 try { 86 execute(work); 87 } catch (RejectedExecutionException ree) { 88 try { 89 getQueue().offer(work, timeout, TimeUnit.MILLISECONDS); 90 } catch (InterruptedException ie) { 91 throw new RejectedExecutionException (ie); 92 } 93 } 94 } 95 96 99 public void schedule(final Runnable work, final long delay) { 100 execute(new Runnable () { 103 public void run() { 104 try { 105 Thread.sleep(delay); 106 } catch (InterruptedException ie) { 107 } 109 work.run(); 110 } 111 }); 112 } 113 114 116 public void shutdown(boolean processRemainingWorkItems) { 117 if (!processRemainingWorkItems) { 118 getQueue().clear(); 119 } 120 shutdown(); 121 } 122 123 127 long getMaxSize() { 128 return maxQueueSize; 129 } 130 131 135 public long getSize() { 136 return getQueue().size(); 137 } 138 139 140 public boolean isEmpty() { 141 return getQueue().size() == 0; 142 } 143 144 boolean isFull() { 145 return getQueue().remainingCapacity() == 0; 146 } 147 148 int getHighWaterMark() { 149 int hwm = getMaximumPoolSize(); 150 return hwm == Integer.MAX_VALUE ? -1 : hwm; 151 } 152 153 int getLowWaterMark() { 154 int lwm = getCorePoolSize(); 155 return lwm == Integer.MAX_VALUE ? -1 : lwm; 156 } 157 158 void setHighWaterMark(int hwm) { 159 setMaximumPoolSize(hwm < 0 ? Integer.MAX_VALUE : hwm); 160 } 161 162 void setLowWaterMark(int lwm) { 163 setCorePoolSize(lwm < 0 ? 0 : lwm); 164 } 165 } 166 | Popular Tags |