1 23 24 package org.objectweb.dream.control.activity.task.thread; 25 26 import java.util.ArrayList ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.LinkedList ; 30 import java.util.List ; 31 import java.util.Set ; 32 33 import org.objectweb.dream.control.activity.task.Task; 34 import org.objectweb.dream.control.activity.task.TaskLifeCycleController; 35 import org.objectweb.dream.control.activity.task.TaskStoppedListener; 36 import org.objectweb.fractal.api.NoSuchInterfaceException; 37 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 38 import org.objectweb.fractal.julia.control.lifecycle.ChainedIllegalLifeCycleException; 39 import org.objectweb.util.monolog.api.BasicLevel; 40 41 46 public class ThreadPoolTask extends AbstractThreadTask 47 implements 48 TaskLifeCycleController, 49 ThreadPoolController 50 { 51 52 Object threadTaskLock = new Object (); 53 long waitTimeout = DEFAULT_WAIT_TIMEOUT; 54 int capacity = 0; 55 LinkedList activeThreads = new LinkedList (); 56 Set waitingThreads = new HashSet (); 57 58 List taskStoppedListeners = new ArrayList (); 59 boolean stopping; 60 61 64 protected boolean isExecuting() 65 { 66 return ((PoolThread) Thread.currentThread()).executing; 67 } 68 69 72 protected void setExecuting(boolean b) 73 { 74 ((PoolThread) Thread.currentThread()).executing = b; 75 } 76 77 81 84 public int getNbActiveThreads() 85 { 86 synchronized (threadTaskLock) 87 { 88 return activeThreads.size(); 89 } 90 } 91 92 95 public void addThreads(int i) throws ThreadPoolOverflowException, 96 IllegalLifeCycleException 97 { 98 synchronized (threadTaskLock) 99 { 100 if (getFcState() == STOPPED) 101 { 102 throw new IllegalLifeCycleException( 103 "Can't add thread in the stopped state."); 104 } 105 if (getNbActiveThreads() + i > getCapacity()) 106 { 107 throw new ThreadPoolOverflowException( 108 "Can't add Thread in pool due to capacity limitation"); 109 } 110 for (int j = 0; j < i; j++) 111 { 112 PoolThread t = null; 113 synchronized (waitingThreads) 114 { 115 if (!waitingThreads.isEmpty()) 116 { 117 Iterator iterator = waitingThreads.iterator(); 119 t = (PoolThread) iterator.next(); 120 iterator.remove(); 121 synchronized (activeThreads) 122 { 123 activeThreads.add(t); 124 } 125 synchronized (t) 126 { 127 logger.log(BasicLevel.DEBUG, "Notify a waiting thread task"); 128 t.revive = true; 129 t.notify(); 130 } 131 continue; 132 } 133 } 134 logger.log(BasicLevel.DEBUG, "Creates a new thread task"); 135 t = new PoolThread(); 136 synchronized (activeThreads) 137 { 138 activeThreads.add(t); 139 } 140 t.start(); 141 } 142 } 143 } 144 145 148 public void removeThreads(int i) throws IllegalLifeCycleException 149 { 150 synchronized (threadTaskLock) 151 { 152 if (getFcState() == STOPPED) 153 { 154 throw new IllegalLifeCycleException( 155 "Can't remove thread in the stopped state."); 156 } 157 for (int j = 0; j < i; j++) 158 { 159 PoolThread t; 160 synchronized (activeThreads) 161 { 162 t = (PoolThread) activeThreads.removeFirst(); 163 } 164 logger.log(BasicLevel.DEBUG, "Stop a thread task"); 165 t.executing = false; 166 t.interrupt(); 167 } 168 } 169 } 170 171 174 public void setCapacity(int i) 175 { 176 capacity = i; 177 } 178 179 182 public int getCapacity() 183 { 184 return capacity; 185 } 186 187 190 public long getWaitTimeout() 191 { 192 return waitTimeout; 193 } 194 195 198 public void setWaitTimeout(long millis) 199 { 200 waitTimeout = millis; 201 } 202 203 207 210 public void startFc() throws IllegalLifeCycleException 211 { 212 if (getFcState() == STARTED) 213 { 214 return; 215 } 216 synchronized (threadTaskLock) 217 { 218 super.startFc(); 219 stopping = false; 220 } 221 } 222 223 226 public void stopFc() throws IllegalLifeCycleException 227 { 228 if (getFcState() == STOPPED) 229 { 230 return; 231 } 232 synchronized (threadTaskLock) 233 { 234 logger.log(BasicLevel.DEBUG, "Stopping thread pool task"); 235 stopping = true; 236 interruptPool(); 237 boolean noActiveThread; 238 synchronized (activeThreads) 239 { 240 noActiveThread = activeThreads.isEmpty(); 241 } 242 while (!noActiveThread) 243 { 244 try 245 { 246 logger.log(BasicLevel.DEBUG, "Join threads of the task"); 247 threadTaskLock.wait(); 248 } 249 catch (InterruptedException e) 250 { 251 throw new ChainedIllegalLifeCycleException(e, weaveableC, 252 "Interrupted while waiting for the end of the thread pool."); 253 } 254 synchronized (activeThreads) 255 { 256 noActiveThread = activeThreads.isEmpty(); 257 } 258 } 259 } 260 } 261 262 265 public void asyncStop(TaskStoppedListener listener) 266 { 267 if (getFcState() == STOPPED) 268 { 269 Task taskItf = null; 271 try 272 { 273 taskItf = (Task) weaveableC.getFcInterface("task"); 274 } 275 catch (NoSuchInterfaceException ignored) 276 { 277 } 279 listener.taskStopped(taskItf); 280 return; 281 } 282 synchronized (threadTaskLock) 283 { 284 logger.log(BasicLevel.DEBUG, "Stopping asynchronously thread pool task"); 285 stopping = true; 286 interruptPool(); 287 taskStoppedListeners.add(listener); 288 } 289 } 290 291 295 protected void interruptPool() 296 { 297 logger.log(BasicLevel.DEBUG, "Interrupting threads of the task"); 298 synchronized (activeThreads) 299 { 300 Iterator iter = activeThreads.iterator(); 301 while (iter.hasNext()) 302 { 303 PoolThread thread = (PoolThread) iter.next(); 304 thread.executing = false; 305 thread.interrupt(); 306 } 307 } 308 synchronized (waitingThreads) 309 { 310 Iterator iter = waitingThreads.iterator(); 311 while (iter.hasNext()) 312 { 313 PoolThread thread = (PoolThread) iter.next(); 314 thread.executing = false; 315 thread.interrupt(); 316 } 317 } 318 } 319 320 324 private class PoolThread extends Thread 325 { 326 327 boolean executing = true; 328 boolean revive; 329 330 334 337 public void run() 338 { 339 logger.log(BasicLevel.DEBUG, "Begin of the run method"); 340 boolean runThread = true; 341 do 342 { 343 revive = false; 344 try 345 { 346 execute(null); 347 } 348 catch (InterruptedException e) 349 { 350 logger.log(BasicLevel.DEBUG, "Interrupted"); 351 } 352 finally 353 { 354 boolean noActiveThread; 355 synchronized (activeThreads) 356 { 357 activeThreads.remove(this); 359 if (activeThreads.size() >= getCapacity()) 360 { 361 logger.log(BasicLevel.DEBUG, "More active threads than capacity"); 362 runThread = false; 363 } 364 noActiveThread = activeThreads.isEmpty(); 365 } 366 List listeners = null; 367 synchronized (threadTaskLock) 368 { 369 if (stopping && noActiveThread) 370 { 371 logger.log(BasicLevel.DEBUG, 372 "No more active thread component stopped"); 373 try 374 { 375 ThreadPoolTask.super.stopFc(); 377 stopping = false; 378 } 379 catch (IllegalLifeCycleException ignored) 380 { 381 } 383 threadTaskLock.notifyAll(); 385 listeners = taskStoppedListeners; 386 taskStoppedListeners = new ArrayList (); 387 } 388 } 389 if (listeners != null) 390 { 391 Task taskItf = null; 393 try 394 { 395 taskItf = (Task) weaveableC.getFcInterface("task"); 396 } 397 catch (NoSuchInterfaceException ignored) 398 { 399 } 401 Iterator iter = listeners.iterator(); 402 while (iter.hasNext()) 403 { 404 TaskStoppedListener listener = (TaskStoppedListener) iter.next(); 405 listener.taskStopped(taskItf); 406 } 407 break; 409 } 410 synchronized (waitingThreads) 411 { 412 waitingThreads.add(this); 414 } 415 try 416 { 417 synchronized (this) 418 { 419 if (!revive && runThread) 420 { 421 try 422 { 423 logger.log(BasicLevel.DEBUG, "Waits for revive"); 424 this.wait(waitTimeout); 425 executing = true; 426 } 427 catch (InterruptedException e1) 428 { 429 logger.log(BasicLevel.DEBUG, 430 "Interrupted while waiting for timeout"); 431 } 432 } 433 } 434 } 435 finally 436 { 437 synchronized (waitingThreads) 438 { 439 waitingThreads.remove(this); 441 } 442 } 443 } 444 } 445 while (revive && runThread); 446 logger.log(BasicLevel.DEBUG, "End of the run method"); 447 } 448 } 449 } | Popular Tags |