1 27 28 package javax.management.timer; 29 30 import java.util.*; 31 import java.io.*; 32 33 39 40 public class Scheduler extends Thread { 41 42 46 int MAX_THREADS = 4; 47 48 51 52 public int getMaxThreads() { 53 return MAX_THREADS ; 54 } 55 56 private static int DEFAULT_MAX_THREADS = 4; 57 58 63 64 public static int getDefaultMaxThreads() { 65 return DEFAULT_MAX_THREADS ; 66 } 67 72 73 public static void setDefaultMaxThreads(int i) { 74 if ( (i>=0) && (i<100) ) { 75 DEFAULT_MAX_THREADS=i; 76 } 77 } 78 79 86 87 public boolean setMaxThreads(int i) { 88 if (isAlive()) return false; 89 if ( (i>=0) && (i<100) ) { 90 MAX_THREADS=i; 91 return true ; 92 } 93 return false ; 94 } 95 96 97 101 static int TOTAL_THREADS = 0; 102 103 104 108 109 public static int getTotalThreads() { 110 return TOTAL_THREADS ; 111 } 112 113 static boolean STOP_ALL = false; 115 116 boolean STOP_THIS = false; 118 119 int NUM_THREADS_STOPPED=0; 121 122 Vector runnables = new Vector(); Vector times = new Vector(); Vector workers = new Vector(); Vector ready_tasks = new Vector(); 127 128 131 private static Hashtable schedulers = new Hashtable(15) ; 132 133 private static Hashtable maxThreads = new Hashtable(15); 134 135 private static String confFile = "conf/threads.conf" ; 138 139 private static boolean readConfFile = false ; 140 141 public static String getConfFile() { 142 return confFile; 143 } 144 145 public static void setConfFile(String s) { 146 confFile=s; 147 } 148 149 private static synchronized void readTheConfFile() { 150 if (readConfFile) return; 151 BufferedReader is = null; 152 String line = null; 153 try { 154 File ff = new File(confFile); 155 if (!ff.exists()) { 156 readConfFile=true; 158 return ; 159 } 160 is = new BufferedReader(new FileReader(ff)); 161 while ( (line = is.readLine()) != null) { 162 if (line.trim().equals("")) continue; 163 else if (line.startsWith("#")) continue; 164 else { 165 StringTokenizer tok = new StringTokenizer(line); 166 if (tok.countTokens()==2) { 167 String s = tok.nextToken(); 168 int numb =-1; 169 try { 170 numb =Integer.parseInt(tok.nextToken()); 171 } catch ( NumberFormatException nfe) {} 172 if ( (numb<0) || (numb>100) ) { 173 System.err.println(" Invalid line in the conf file "+ 174 confFile+ 175 " :"+line); 176 continue; 177 } 178 maxThreads.put(s,new Integer (numb)); 179 } else { 180 System.err.println(" Invalid line in the conf file "+confFile+ 181 " :"+line); 182 } 183 } 184 } 185 } catch (IOException e) { 186 System.err.println("Scheduler File read Error:"+confFile + ": "+e); 187 } catch (SecurityException anye) {} 188 readConfFile = true ; 189 } 190 191 public static Scheduler createScheduler(String nam) { 192 return createScheduler(nam,-1); 193 } 194 195 196 201 202 public static Scheduler createScheduler(String nam,int maxThreadNumber) { 203 synchronized (schedulers) { 204 if (nam==null) return null; 205 Scheduler sch = getScheduler(nam); 206 if (sch!=null) return sch ; 207 readTheConfFile(); 208 if ((maxThreadNumber<=0) || (maxThreadNumber>100) ) { 209 Integer integ = (Integer )maxThreads.get(nam); 210 if (integ!=null) sch =new Scheduler (nam,integ.intValue()); 211 else sch =new Scheduler (nam); 212 } else sch =new Scheduler (nam,maxThreadNumber); 213 schedulers.put(nam ,sch); 214 return sch; 217 } 218 } 219 226 227 public static Scheduler getScheduler(String nam) { 228 if (nam==null) return null; 229 return (Scheduler )schedulers.get(nam); 230 } 231 232 236 private Scheduler(String name,int maxThreads) 237 { 238 super(name); 239 MAX_THREADS = maxThreads; 240 } 241 245 private Scheduler(String name) 246 { 247 this(name,DEFAULT_MAX_THREADS); 248 } 249 250 253 public synchronized void scheduleTask(Runnable task, Date when) 254 { 255 if (when == null) when = new Date(); 256 for (int i=0;i<times.size();i++) { 257 Date d = (Date) times.elementAt(i); 258 if (d.after(when)) { 259 times.insertElementAt(when,i); 260 runnables.insertElementAt(task,i); 261 return; 262 } 263 } 264 times.addElement(when); 265 runnables.addElement(task); 266 notifyAll(); 267 } 268 269 272 273 public synchronized void removeTask(Runnable task) 274 { 275 if (task == null) return; 276 for (int i=0;i<runnables.size();i++) { 277 Runnable r = (Runnable ) runnables.elementAt(i); 278 if (task.equals(r)) { 279 runnables.removeElement(r); 280 times.removeElementAt(i); 281 i--; 282 } 283 } 284 } 285 286 290 291 public boolean resumeAll() { 292 synchronized (schedulers) { 293 if (!STOP_ALL) return false; 294 STOP_ALL=false; 295 for (Enumeration en=schedulers.elements();en.hasMoreElements();) { 296 Scheduler sch = (Scheduler )en.nextElement(); 297 sch.start(); 298 } 299 } 300 return false ; 301 } 302 303 public void killScheduler() 306 { 307 if (STOP_ALL) return ; 308 STOP_THIS= true; 309 STOP_ALL = true; 310 try{ 311 for (int i=0;i<workers.size();i++) { 312 WorkerThread worker = (WorkerThread ) workers.elementAt(i); 313 worker.wakeUp(); 314 } 315 }catch(Throwable th){th.printStackTrace();} 316 for(int i = 0; i < 50; i++) { 318 if(isAlive()) 319 { 320 try{ 321 Thread.sleep(20); 322 }catch(Exception e){} 323 } 324 else 325 break; 326 } 327 } 328 333 334 public static boolean stopAll() 335 { 336 synchronized (schedulers) { 337 if (STOP_ALL) return false; 338 int count=0; 339 STOP_ALL = true; 340 int totalorg =TOTAL_THREADS; 341 while (TOTAL_THREADS>0) 342 { 343 if(count >= STOP_TIME_OUT) { 344 System.err.println("Schedulers did not stop properly: " 345 + (totalorg-TOTAL_THREADS) + 346 " threads stopped out of " + 347 totalorg); 348 System.err.println("The remaining "+ 349 TOTAL_THREADS+ 350 " threads did not stop in "+ 351 STOP_TIME_OUT+ " seconds "); 352 return false; 353 } 354 try { 355 Thread.sleep(1000); 356 count++; 357 } 358 catch (Exception e){} 359 } 360 System.out.println((totalorg-TOTAL_THREADS) + 361 " of the "+totalorg+ 362 " active threads in the control "+ 363 " of the schedulers stopped"); 364 365 TOTAL_THREADS=0; 366 return true; 367 } 368 } 369 370 371 378 379 public boolean stopThis() 380 { 381 int count=0; 382 STOP_THIS = true; 383 while (NUM_THREADS_STOPPED < MAX_THREADS) 384 { 385 if(count >= STOP_TIME_OUT) { 386 System.err.println("Scheduler:"+getName()+" did not stop properly: " + NUM_THREADS_STOPPED + " threads stopped out of " + MAX_THREADS); 387 System.err.println("The remaining "+ 388 (MAX_THREADS-NUM_THREADS_STOPPED)+ 389 " threads of scheduler:"+getName()+ 390 "did not stop in "+ 391 STOP_TIME_OUT+ " seconds "); 392 393 return false; 394 } 395 try { 396 Thread.sleep(1000); 397 count++; 398 } 399 catch (Exception e){} 400 } 401 System.out.println(NUM_THREADS_STOPPED + "out of "+MAX_THREADS+ 402 " active threads stopped in "+ 403 " Scheduler:"+getName()); 404 405 return true; 406 } 407 408 415 416 public boolean cleanUp() { 417 synchronized (schedulers) { 423 times.removeAllElements(); 424 runnables.removeAllElements(); 425 ready_tasks.removeAllElements(); 426 workers.removeAllElements(); 427 schedulers.remove(getName()); 428 return true; 429 } 430 } 431 432 static int STOP_TIME_OUT = 15; 433 434 441 442 443 444 public void setStopTimeout(int timeout) 445 { 446 STOP_TIME_OUT = timeout; 447 } 448 449 450 synchronized Runnable getTheWork() { 451 while (times.size() == 0) try { 452 wait(10); 453 if ( (STOP_ALL) || (STOP_THIS) )return null; 454 } catch (InterruptedException iex) {} 455 456 Date first = (Date) times.firstElement(); 457 Date now = new Date(); 458 if (first.after(now)) return null; 459 Runnable task = (Runnable ) runnables.firstElement(); 460 runnables.removeElement(task); 461 times.removeElement(first); 462 return task; 463 } 464 465 466 467 public void run() { 468 synchronized (schedulers) { 469 STOP_ALL=false; 470 STOP_THIS=false; 471 } 472 startWorkers(); if (MAX_THREADS==0) return ; 474 while ( (!STOP_ALL) && (!STOP_THIS) ) 475 try { 476 Runnable task = getTheWork(); 477 if (task == null) { 478 try { 479 waitSchedule(); 480 } catch (InterruptedException iex) {} 481 } else { 482 startTask(task); 483 } 484 } catch (Exception ex) { 485 System.err.println("Exception scheduling task in scheduler:" 486 +getName()+" "+ ex); 487 } 489 return; 490 } 491 492 synchronized void waitSchedule() throws InterruptedException { 493 wait(10); 494 } 495 496 497 synchronized void startTask(Runnable task) { 498 ready_tasks.addElement(task); 499 for (int i=0;i<workers.size();i++) { WorkerThread worker = (WorkerThread ) workers.elementAt(i); 501 worker.wakeUp(); 502 } 503 } 504 505 506 507 synchronized Runnable getNextTask() { 508 if (ready_tasks.size() == 0) return null; 509 Runnable task = (Runnable ) ready_tasks.firstElement(); 510 ready_tasks.removeElement(task); 511 return task; 512 } 513 514 515 synchronized void startWorkers() { 516 if ( (STOP_ALL) || (STOP_THIS) ) return; 517 workers = new Vector(); 518 for (int i=0;i<MAX_THREADS;i++) { 519 TOTAL_THREADS += 1; 520 WorkerThread worker = new WorkerThread (this,getName()+"-"+(i+1)); 521 workers.addElement(worker); 522 worker.start(); 523 } 524 } 525 526 public void deregisterThisScheduler(String nam) { 528 if(nam == null) 529 return; 530 else 531 { 532 Scheduler s = (Scheduler )schedulers.remove(nam); 533 s.stopAll(); 534 } 535 536 } 537 538 } 539 540 541 542 543 544 545 | Popular Tags |