1 23 package com.sun.enterprise.web.connector.grizzly; 24 25 import java.util.LinkedList ; 26 import java.util.NoSuchElementException ; 27 28 34 public class LinkedListPipeline extends LinkedList implements Pipeline{ 35 36 37 40 protected int waitingThreads = 0; 41 42 43 46 protected int maxThreads = 20; 47 48 49 52 protected int minThreads = 5; 53 54 57 protected int minSpareThreads = 2; 58 59 60 63 protected int port = 8080; 64 65 66 69 protected int threadCount = 0; 70 71 72 75 protected String name; 76 77 78 81 protected int priority = Thread.NORM_PRIORITY; 82 83 84 87 protected boolean isStarted = false; 88 89 90 93 protected WorkerThread[] workerThreads; 94 95 96 99 protected int maxQueueSizeInBytes = -1; 100 101 102 105 protected int threadsIncrement = 1; 106 107 108 111 protected int threadsTimeout = Constants.DEFAULT_TIMEOUT; 112 113 114 117 protected PipelineStatistic pipelineStat; 118 119 121 public LinkedListPipeline(){ 122 super(); 123 } 124 125 public LinkedListPipeline(int maxThreads, int minThreads, String name, 126 int port, int priority){ 127 128 this.maxThreads = maxThreads; 129 this.port = port; 130 this.name = name; 131 this.minThreads = minThreads; 132 this.priority = priority; 133 134 if ( minThreads < minSpareThreads ) 135 minSpareThreads = minThreads; 136 137 } 138 139 140 public LinkedListPipeline(int maxThreads, int minThreads, String name, 141 int port){ 142 143 this(maxThreads,minThreads,name,port,Thread.NORM_PRIORITY); 144 } 145 146 147 149 153 public void initPipeline(){ 154 155 if (minThreads > maxThreads) { 156 minThreads = maxThreads; 157 } 158 159 workerThreads = new WorkerThread[maxThreads]; 160 increaseWorkerThread(minThreads, false); 161 162 } 163 164 165 169 public void startPipeline(){ 170 if (!isStarted) { 171 for (int i=0; i < minThreads; i++){ 172 workerThreads[i].start(); 173 } 174 isStarted = true; 175 } 176 } 177 178 179 183 public void stopPipeline(){ 184 if (isStarted) { 185 for (int i=0; i < threadCount; i++){ 186 workerThreads[i].terminate(); 187 } 188 isStarted = false; 189 } 190 } 191 192 193 196 protected void increaseWorkerThread(int increment, boolean startThread){ 197 WorkerThread workerThread; 198 int currentCount = threadCount; 199 int increaseCount = threadCount + increment; 200 for (int i=currentCount; i < increaseCount; i++){ 201 workerThread = new WorkerThread(this, 202 name + "WorkerThread-" + port + "-" + i); 203 workerThread.setPriority(priority); 204 205 if (startThread) 206 workerThread.start(); 207 208 workerThreads[i] = workerThread; 209 threadCount++; 210 } 211 } 212 213 214 217 public boolean interruptThread(long threadID){ 218 ThreadGroup threadGroup = workerThreads[0].getThreadGroup(); 219 Thread [] threads = new Thread [threadGroup.activeCount()]; 220 threadGroup.enumerate(threads); 221 222 for (Thread thread: threads){ 223 if ( thread != null && thread.getId() == threadID ){ 224 if ( Thread.State.RUNNABLE != thread.getState()){ 225 try{ 226 thread.interrupt(); 227 return true; 228 } catch (Throwable t){ 229 ; } 231 } 232 } 233 } 234 return false; 235 } 236 237 238 240 241 244 public synchronized void addTask(Task task) { 245 246 boolean rejected = false; 247 int queueSize = size(); 248 if ( maxQueueSizeInBytes != -1 && maxQueueSizeInBytes < queueSize){ 249 task.cancelTask("Maximum Connections Reached: " 250 + maxQueueSizeInBytes + " -- Retry later", 251 HtmlHelper.OK); 252 task.getSelectorThread().returnTask(task); 253 return; 254 } 255 256 addLast(task); 257 notify(); 258 259 if (threadCount < maxThreads && waitingThreads < queueSize ){ 261 int left = maxThreads - threadCount; 262 if (threadsIncrement > left){ 263 threadsIncrement = left; 264 } 265 increaseWorkerThread(threadsIncrement,true); 266 } 267 } 268 269 270 274 public synchronized Task getTask() { 275 if ( isEmpty() ) { 276 277 try { 278 waitingThreads++; 279 wait(); 280 281 } catch (InterruptedException e) { 282 Thread.interrupted(); 283 } 284 waitingThreads--; 285 286 } 287 try { 288 return (Task)removeFirst(); 289 } catch(NoSuchElementException e) { 290 return null; 291 } finally { 292 if ( pipelineStat != null) { 293 pipelineStat.gather(size()); 294 } 295 } 296 } 297 298 299 303 public boolean isEmpty() { 304 return (size() - waitingThreads <= 0); 305 } 306 307 309 312 public int getWaitingThread(){ 313 return waitingThreads; 314 } 315 316 317 320 public void setMaxThreads(int maxThreads){ 321 this.maxThreads = maxThreads; 322 } 323 324 325 328 public int getMaxThreads(){ 329 return maxThreads; 330 } 331 332 333 public int getCurrentThreadCount() { 334 return threadCount; 335 } 336 337 338 342 public int getCurrentThreadsBusy(){ 343 return (threadCount - waitingThreads); 344 } 345 346 347 350 public int getMaxSpareThreads() { 351 return maxThreads; 352 } 353 354 355 358 public int getMinSpareThreads() { 359 return minSpareThreads; 360 } 361 362 363 366 public void setMinSpareThreads(int minSpareThreads) { 367 this.minSpareThreads = minSpareThreads; 368 } 369 370 371 374 public void setPriority(int priority){ 375 this.priority = priority; 376 } 377 378 379 382 public void setName(String name){ 383 this.name = name; 384 } 385 386 387 391 public String getName(){ 392 return name+port; 393 } 394 395 396 400 public void setPort(int port){ 401 this.port = port; 402 } 403 404 405 410 public void setMinThreads(int minThreads){ 411 this.minThreads = minThreads; 412 } 413 414 415 public String toString(){ 416 return "name: " + name + " maxThreads: " + maxThreads 417 + " minThreads:" + minThreads; 418 } 419 420 421 425 public void setThreadsIncrement(int threadsIncrement){ 426 this.threadsIncrement = threadsIncrement; 427 } 428 429 430 433 public void setThreadsTimeout(int threadsTimeout){ 434 this.threadsTimeout = threadsTimeout; 435 } 436 437 438 442 public int getTaskQueuedCount(){ 443 return size(); 444 } 445 446 447 451 public void setQueueSizeInBytes(int maxQueueSizeInBytesCount){ 452 this.maxQueueSizeInBytes = maxQueueSizeInBytesCount; 453 } 454 455 456 460 public int getQueueSizeInBytes(){ 461 return maxQueueSizeInBytes; 462 } 463 464 465 469 public void setPipelineStatistic(PipelineStatistic pipelineStatistic){ 470 this.pipelineStat = pipelineStatistic; 471 } 472 473 474 478 public PipelineStatistic getPipelineStatistic(){ 479 return pipelineStat; 480 } 481 482 } 483 | Popular Tags |