1 45 package net.jforum.util.concurrent.executor; 46 47 import net.jforum.util.concurrent.Executor; 48 import net.jforum.util.concurrent.Queue; 49 import net.jforum.util.concurrent.Result; 50 import net.jforum.util.concurrent.Task; 51 52 55 public class PooledExecutor implements Executor 56 { 57 static final int DEFAULT_MIN_SIZE = Runtime.getRuntime().availableProcessors(); 58 static final int DEFAULT_MAX_SIZE = 4 * DEFAULT_MIN_SIZE; 59 static final int DEFAULT_MAX_IDLE = DEFAULT_MIN_SIZE; 60 static final long DEFAULT_KEEP_ALIVE = 60 * 1000; 61 62 static final long DEFAULT_CREATION_DELAY = 1000; 64 65 private final Queue queue; 66 private final Object lock = new Object (); 67 68 private int minSize = DEFAULT_MIN_SIZE; 69 private int maxSize = DEFAULT_MAX_SIZE; 70 private int maxIdle = DEFAULT_MAX_IDLE; 71 private long keepAlive = DEFAULT_KEEP_ALIVE; 72 private long minCreationDelay = DEFAULT_CREATION_DELAY; 73 74 private int threadCount = 0; 75 private long lastCreation = 0; 76 private int waiting = 0; 77 78 private void createThread() 79 { 80 long curtime = System.currentTimeMillis(); 81 if (threadCount > 0 82 && minCreationDelay > 0 83 && curtime - lastCreation < minCreationDelay) { 84 return; 85 } 86 87 Thread worker = new Thread (new Worker(),"jforum"); 88 worker.setDaemon(true); 89 worker.start(); 90 91 lastCreation = curtime; 92 ++threadCount; 93 } 94 95 private class Worker extends AbstractWorker 96 { 97 protected Object take() throws InterruptedException { 98 synchronized(lock) { 99 ++waiting; 100 101 if((threadCount > maxSize && maxSize > 0) 103 || (waiting > maxIdle && maxIdle > 0 && threadCount > minSize)) 106 return null; 107 } 108 109 try { 110 if(keepAlive >= 0) { 111 return queue.pool(keepAlive); 112 } 113 114 return queue.get(); 115 } finally { 116 synchronized(lock) { 117 --waiting; 118 } 119 } 120 } 121 122 protected void cleanup() { 123 synchronized(lock) { 124 if(--threadCount < minSize) 125 createThread(); 126 } 127 } 128 } 129 130 public PooledExecutor(final Queue queue) { 131 this.queue = queue; 132 } 133 134 protected void queue(Object obj) throws InterruptedException 135 { 136 for(;;) { 137 synchronized(lock) { 138 if(threadCount < minSize) { 139 createThread(); 140 } 141 142 if(queue.offer(obj, 0)) { 143 break; 144 } 145 146 if(threadCount < maxSize) { 147 createThread(); 148 } 149 150 if(queue.offer(obj, 0)) { 151 break; 152 } 153 } 154 } 155 } 156 157 public void execute(Task task) throws InterruptedException 158 { 159 queue(task); 160 } 161 162 public Result executeWithResult(Task task) throws InterruptedException 163 { 164 SimpleResult result = new SimpleResult(task); 165 queue(result); 166 167 return result; 168 } 169 170 public long getKeepAlive() 171 { 172 return keepAlive; 173 } 174 175 public int getMaxIdle() 176 { 177 return maxIdle; 178 } 179 180 public int getMaxSize() { 181 return maxSize; 182 } 183 184 public long getMinCreationDelay() 185 { 186 return minCreationDelay; 187 } 188 189 public int getMinSize() 190 { 191 return minSize; 192 } 193 194 195 public void setKeepAlive(long l) 196 { 197 synchronized(lock) { 198 keepAlive = l; 199 } 200 } 201 202 public void setMaxIdle(int i) 203 { 204 synchronized(lock) { 205 maxIdle = i; 206 } 207 } 208 209 public void setMaxSize(int maxSize) 210 { 211 synchronized(lock) { 212 if(maxSize > 0 && maxSize < minSize) { 213 throw new IllegalArgumentException ("max size smaller than min size"); 214 } 215 216 this.maxSize = maxSize; 217 } 218 } 219 220 public void setMinCreationDelay(long l) 221 { 222 synchronized(lock) { 223 minCreationDelay = l; 224 } 225 } 226 227 public void setMinSize(int minSize) 228 { 229 synchronized(lock) { 230 if(minSize < 1 || (maxSize > 0 && maxSize < minSize)) { 231 throw new IllegalArgumentException ("max size smaller than min size"); 232 } 233 234 this.minSize = minSize; 235 } 236 } 237 238 } 239 | Popular Tags |