1 23 package com.sun.enterprise.web.connector.grizzly; 24 25 import java.util.concurrent.ArrayBlockingQueue ; 26 import java.util.concurrent.ThreadPoolExecutor ; 27 import java.util.concurrent.RejectedExecutionHandler ; 28 import java.util.concurrent.TimeUnit ; 29 30 31 37 public class ThreadPoolExecutorPipeline implements Pipeline, 38 RejectedExecutionHandler { 39 40 41 44 private int waitingThreads = 0; 45 46 47 50 private int maxThreads = 20; 51 52 53 56 private int minThreads = 10; 57 58 59 62 private int port = 8080; 63 64 65 68 private int threadCount =0; 69 70 71 74 private String name; 75 76 77 80 private int priority = Thread.NORM_PRIORITY; 81 82 83 86 private boolean isStarted = false; 87 88 89 92 private ThreadPoolExecutor workerThreads; 93 94 95 98 private ArrayBlockingQueue <Runnable > arrayBlockingQueue; 99 100 101 104 private int maxQueueSizeInBytes = -1; 105 106 107 110 private int queueSizeInBytes = 4096; 111 112 113 116 protected PipelineStatistic pipelineStat; 117 119 123 public void initPipeline(){ 124 125 if (isStarted){ 126 return; 127 } 128 isStarted = true; 129 arrayBlockingQueue = 130 new ArrayBlockingQueue <Runnable >(maxQueueSizeInBytes, true); 131 132 workerThreads = new ThreadPoolExecutor ( 133 maxThreads, 134 maxThreads, 135 0L, 136 TimeUnit.MILLISECONDS, 137 arrayBlockingQueue, 138 new GrizzlyThreadFactory(name,port,priority), 139 this); 140 } 141 142 145 public void startPipeline(){ 146 if (isStarted){ 147 return; 148 } 149 ; } 151 152 153 156 public void stopPipeline(){ 157 if (!isStarted){ 158 return; 159 } 160 isStarted = false; 161 workerThreads.shutdown(); 162 } 163 164 166 167 170 public void addTask(Task task){ 171 if (workerThreads.getQueue().size() > maxQueueSizeInBytes ){ 172 task.cancelTask("Maximum Connections Reached: " 173 + pipelineStat.getQueueSizeInBytes() 174 + " -- Retry later", HtmlHelper.OK); 175 task.getSelectorThread().returnTask(task); 176 return; 177 } 178 workerThreads.execute((Runnable )task); 179 180 if ( pipelineStat != null) { 181 pipelineStat.gather(size()); 182 } 183 } 184 185 186 190 public Task getTask() { 191 return null; 192 } 193 194 195 200 public int size() { 201 return workerThreads.getQueue().size(); 202 } 203 204 205 208 public boolean interruptThread(long threadID){ 209 return ((GrizzlyThreadFactory)workerThreads.getThreadFactory()) 210 .interruptThread(threadID); 211 } 212 214 217 public int getWaitingThread(){ 218 return workerThreads.getPoolSize() - workerThreads.getActiveCount(); 219 } 220 221 222 225 public void setMaxThreads(int maxThreads){ 226 this.maxThreads = maxThreads; 227 } 228 229 230 233 public int getMaxThreads(){ 234 return maxThreads; 235 } 236 237 238 241 public int getCurrentThreadCount() { 242 return workerThreads.getPoolSize() ; 243 } 244 245 246 250 public int getCurrentThreadsBusy(){ 251 return workerThreads.getActiveCount(); 252 } 253 254 255 258 public int getMaxSpareThreads() { 259 return getWaitingThread(); 260 } 261 262 263 266 public void setPriority(int priority){ 267 this.priority = priority; 268 } 269 270 271 274 public void setName(String name){ 275 this.name = name; 276 } 277 278 279 283 public String getName(){ 284 return name+port; 285 } 286 287 288 292 public void setPort(int port){ 293 this.port = port; 294 } 295 296 297 302 public void setMinThreads(int minThreads){ 303 this.minThreads = minThreads; 304 } 305 306 307 311 public void setQueueSizeInBytes(int maxQueueSizeInBytes){ 312 this.maxQueueSizeInBytes = maxQueueSizeInBytes; 313 if ( pipelineStat != null ) 314 pipelineStat.setQueueSizeInBytes(maxQueueSizeInBytes); 315 } 316 317 318 322 public int getQueueSizeInBytes(){ 323 return maxQueueSizeInBytes; 324 } 325 326 327 public String toString(){ 328 return "name: " + name + " maxThreads: " + maxThreads 329 + " minThreads:" + minThreads; 330 } 331 332 333 337 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){ 338 Task task = (Task)r; 339 task.cancelTask("Maximum Connections Reached -- Retry later", 340 HtmlHelper.OK); 341 task.getSelectorThread().returnTask(task); 342 } 343 344 345 public void setThreadsIncrement(int threadsIncrement){ 346 ; } 348 349 350 public void setThreadsTimeout(int threadsTimeout){ 351 ; } 353 354 355 358 public int getMinSpareThreads() { 359 return 0; 360 } 361 362 363 366 public void setMinSpareThreads(int minSpareThreads) { 367 } 368 369 370 374 public void setPipelineStatistic(PipelineStatistic pipelineStatistic){ 375 this.pipelineStat = pipelineStatistic; 376 } 377 378 379 383 public PipelineStatistic getPipelineStatistic(){ 384 return pipelineStat; 385 } 386 } 387 | Popular Tags |