1 10 11 package org.mule.config; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue; 14 import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; 15 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler; 16 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue; 17 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; 18 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; 19 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; 20 21 import org.mule.impl.work.MuleWorkManager; 22 import org.mule.umo.manager.UMOWorkManager; 23 import org.mule.util.concurrent.NamedThreadFactory; 24 import org.mule.util.concurrent.WaitPolicy; 25 26 31 32 public class ThreadingProfile 33 { 34 35 38 public static final int DEFAULT_MAX_THREADS_ACTIVE = 8; 39 40 43 public static final int DEFAULT_MAX_THREADS_IDLE = 4; 44 45 48 public static final int DEFAULT_MAX_BUFFER_SIZE = 0; 49 50 53 public static final long DEFAULT_MAX_THREAD_TTL = 60000; 54 55 58 public static final long DEFAULT_THREAD_WAIT_TIMEOUT = 30000L; 59 60 63 public static final boolean DEFAULT_DO_THREADING = true; 64 65 68 public static final int WHEN_EXHAUSTED_WAIT = 0; 69 public static final int WHEN_EXHAUSTED_DISCARD = 1; 70 public static final int WHEN_EXHAUSTED_DISCARD_OLDEST = 2; 71 public static final int WHEN_EXHAUSTED_ABORT = 3; 72 public static final int WHEN_EXHAUSTED_RUN = 4; 73 74 77 public static final int DEFAULT_POOL_EXHAUST_ACTION = WHEN_EXHAUSTED_RUN; 78 79 private int maxThreadsActive = DEFAULT_MAX_THREADS_ACTIVE; 80 private int maxThreadsIdle = DEFAULT_MAX_THREADS_IDLE; 81 private int maxBufferSize = DEFAULT_MAX_BUFFER_SIZE; 82 private long threadTTL = DEFAULT_MAX_THREAD_TTL; 83 private long threadWaitTimeout = DEFAULT_THREAD_WAIT_TIMEOUT; 84 private int poolExhaustPolicy = DEFAULT_POOL_EXHAUST_ACTION; 85 private boolean doThreading = DEFAULT_DO_THREADING; 86 private int threadPriority = Thread.NORM_PRIORITY; 87 88 private WorkManagerFactory workManagerFactory = new DefaultWorkManagerFactory(); 89 90 private RejectedExecutionHandler rejectedExecutionHandler; 91 92 private ThreadFactory threadFactory; 93 94 public ThreadingProfile() 95 { 96 super(); 97 } 98 99 public ThreadingProfile(int maxThreadsActive, 100 int maxThreadsIdle, 101 long threadTTL, 102 int poolExhaustPolicy, 103 RejectedExecutionHandler rejectedExecutionHandler, 104 ThreadFactory threadFactory) 105 { 106 this.maxThreadsActive = maxThreadsActive; 107 this.maxThreadsIdle = maxThreadsIdle; 108 this.threadTTL = threadTTL; 109 this.poolExhaustPolicy = poolExhaustPolicy; 110 this.rejectedExecutionHandler = rejectedExecutionHandler; 111 this.threadFactory = threadFactory; 112 } 113 114 public ThreadingProfile(ThreadingProfile tp) 115 { 116 this.maxThreadsActive = tp.getMaxThreadsActive(); 117 this.maxThreadsIdle = tp.getMaxThreadsIdle(); 118 this.maxBufferSize = tp.getMaxBufferSize(); 119 this.threadTTL = tp.getThreadTTL(); 120 this.threadWaitTimeout = tp.getThreadWaitTimeout(); 121 this.poolExhaustPolicy = tp.getPoolExhaustedAction(); 122 this.doThreading = tp.isDoThreading(); 123 this.threadPriority = tp.getThreadPriority(); 124 this.rejectedExecutionHandler = tp.getRejectedExecutionHandler(); 125 this.threadFactory = tp.getThreadFactory(); 126 this.workManagerFactory = tp.getWorkManagerFactory(); 127 } 128 129 public int getMaxThreadsActive() 130 { 131 return maxThreadsActive; 132 } 133 134 public int getMaxThreadsIdle() 135 { 136 return maxThreadsIdle; 137 } 138 139 public long getThreadTTL() 140 { 141 return threadTTL; 142 } 143 144 public long getThreadWaitTimeout() 145 { 146 return threadWaitTimeout; 147 } 148 149 public int getThreadPriority() 150 { 151 return threadPriority; 152 } 153 154 public void setThreadPriority(int threadPriority) 155 { 156 this.threadPriority = threadPriority; 157 } 158 159 public int getPoolExhaustedAction() 160 { 161 return poolExhaustPolicy; 162 } 163 164 public RejectedExecutionHandler getRejectedExecutionHandler() 165 { 166 return rejectedExecutionHandler; 167 } 168 169 public ThreadFactory getThreadFactory() 170 { 171 return threadFactory; 172 } 173 174 public void setMaxThreadsActive(int maxThreadsActive) 175 { 176 this.maxThreadsActive = maxThreadsActive; 177 } 178 179 public void setMaxThreadsIdle(int maxThreadsIdle) 180 { 181 this.maxThreadsIdle = maxThreadsIdle; 182 } 183 184 public void setThreadTTL(long threadTTL) 185 { 186 this.threadTTL = threadTTL; 187 } 188 189 public void setThreadWaitTimeout(long threadWaitTimeout) 190 { 191 this.threadWaitTimeout = threadWaitTimeout; 192 } 193 194 public void setPoolExhaustedAction(int poolExhaustPolicy) 195 { 196 this.poolExhaustPolicy = poolExhaustPolicy; 197 } 198 199 public void setPoolExhaustedActionString(String poolExhaustPolicy) 200 { 201 if (poolExhaustPolicy != null) 202 { 203 if ("WAIT".equals(poolExhaustPolicy)) 204 { 205 this.poolExhaustPolicy = WHEN_EXHAUSTED_WAIT; 206 } 207 else if ("ABORT".equals(poolExhaustPolicy)) 208 { 209 this.poolExhaustPolicy = WHEN_EXHAUSTED_ABORT; 210 } 211 else if ("DISCARD".equals(poolExhaustPolicy)) 212 { 213 this.poolExhaustPolicy = WHEN_EXHAUSTED_DISCARD; 214 } 215 else if ("DISCARD_OLDEST".equals(poolExhaustPolicy)) 216 { 217 this.poolExhaustPolicy = WHEN_EXHAUSTED_DISCARD_OLDEST; 218 } 219 else 220 { 221 this.poolExhaustPolicy = WHEN_EXHAUSTED_RUN; 222 } 223 } 224 } 225 226 public void setBlockedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) 227 { 228 this.rejectedExecutionHandler = rejectedExecutionHandler; 229 } 230 231 public void setThreadFactory(ThreadFactory threadFactory) 232 { 233 this.threadFactory = threadFactory; 234 } 235 236 public int getMaxBufferSize() 237 { 238 return maxBufferSize; 239 } 240 241 public void setMaxBufferSize(int maxBufferSize) 242 { 243 this.maxBufferSize = maxBufferSize; 244 } 245 246 public WorkManagerFactory getWorkManagerFactory() 247 { 248 return workManagerFactory; 249 } 250 251 public void setWorkManagerFactory(WorkManagerFactory workManagerFactory) 252 { 253 this.workManagerFactory = workManagerFactory; 254 } 255 256 public UMOWorkManager createWorkManager(String name) 257 { 258 return workManagerFactory.createWorkManager(this, name); 259 } 260 261 public ThreadPoolExecutor createPool() 262 { 263 return createPool(null); 264 } 265 266 public ThreadPoolExecutor createPool(String name) 267 { 268 BlockingQueue buffer; 269 270 if (maxBufferSize > 0 && maxThreadsActive > 1) 271 { 272 buffer = new ArrayBlockingQueue(maxBufferSize); 273 } 274 else 275 { 276 buffer = new SynchronousQueue(); 277 } 278 279 if (maxThreadsActive < maxThreadsIdle) 280 { 281 maxThreadsIdle = maxThreadsActive; 282 } 283 284 ThreadPoolExecutor pool = new ThreadPoolExecutor(maxThreadsIdle, maxThreadsActive, threadTTL, 285 TimeUnit.MILLISECONDS, buffer); 286 287 if (rejectedExecutionHandler != null) 288 { 289 pool.setRejectedExecutionHandler(rejectedExecutionHandler); 290 } 291 292 if (name != null) 293 { 294 threadFactory = new NamedThreadFactory(name, threadPriority); 295 pool.setThreadFactory(threadFactory); 296 } 297 298 switch (poolExhaustPolicy) 299 { 300 case WHEN_EXHAUSTED_DISCARD_OLDEST : 301 { 302 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); 303 break; 304 } 305 case WHEN_EXHAUSTED_RUN : 306 { 307 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 308 break; 309 } 310 case WHEN_EXHAUSTED_ABORT : 311 { 312 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 313 break; 314 } 315 case WHEN_EXHAUSTED_DISCARD : 316 { 317 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); 318 break; 319 } 320 case WHEN_EXHAUSTED_WAIT : 321 { 322 pool.setRejectedExecutionHandler(new WaitPolicy(threadWaitTimeout, TimeUnit.MILLISECONDS)); 323 break; 324 } 325 default : 326 { 327 pool.setRejectedExecutionHandler(new WaitPolicy(threadWaitTimeout, TimeUnit.MILLISECONDS)); 328 break; 329 } 330 } 331 332 return pool; 333 } 334 335 public boolean isDoThreading() 336 { 337 return doThreading; 338 } 339 340 public void setDoThreading(boolean doThreading) 341 { 342 this.doThreading = doThreading; 343 } 344 345 public String toString() 346 { 347 return "ThreadingProfile{" + "maxThreadsActive=" + maxThreadsActive + ", maxThreadsIdle=" 348 + maxThreadsIdle + ", maxBufferSize=" + maxBufferSize + ", threadTTL=" + threadTTL 349 + ", poolExhaustPolicy=" + poolExhaustPolicy + ", threadWaitTimeout=" + threadWaitTimeout 350 + ", doThreading=" + doThreading + ", threadPriority=" + threadPriority 351 + ", workManagerFactory=" + workManagerFactory + ", rejectedExecutionHandler=" 352 + rejectedExecutionHandler + ", threadFactory=" + threadFactory + "}"; 353 } 354 355 public static interface WorkManagerFactory 356 { 357 UMOWorkManager createWorkManager(ThreadingProfile profile, String name); 358 } 359 360 private class DefaultWorkManagerFactory implements WorkManagerFactory 361 { 362 public UMOWorkManager createWorkManager(ThreadingProfile profile, String name) 363 { 364 return new MuleWorkManager(profile, name); 365 } 366 } 367 368 } 369 | Popular Tags |