1 25 26 27 package org.objectweb.jonas_lib; 28 29 import java.util.LinkedList ; 30 31 import javax.resource.spi.work.ExecutionContext ; 32 import javax.resource.spi.work.Work ; 33 import javax.resource.spi.work.WorkCompletedException ; 34 import javax.resource.spi.work.WorkEvent ; 35 import javax.resource.spi.work.WorkException ; 36 import javax.resource.spi.work.WorkListener ; 37 import javax.resource.spi.work.WorkManager ; 38 import javax.resource.spi.work.WorkRejectedException ; 39 import javax.transaction.NotSupportedException ; 40 import javax.transaction.SystemException ; 41 import javax.transaction.xa.Xid ; 42 43 import org.objectweb.jonas.common.Log; 44 import org.objectweb.jotm.Current; 45 import org.objectweb.transaction.jta.TransactionManager; 46 import org.objectweb.util.monolog.api.BasicLevel; 47 import org.objectweb.util.monolog.api.Logger; 48 49 50 51 55 public class JWorkManager implements WorkManager { 56 57 protected LinkedList workList = new LinkedList (); 58 59 protected static int poolnumber = 0; 60 protected static int threadnumber = 0; 61 62 protected int maxpoolsz; 63 protected int minpoolsz; 64 protected int poolsz; protected int freeThreads; protected long waitingTime; 68 protected boolean valid = true; 70 protected static final long FEW_MORE_SECONDS = 3000; 71 72 private static Logger logger = null; 73 74 private TransactionManager tm; 75 76 80 public JWorkManager(int minsz, int maxsz, TransactionManager tm, long threadwait) { 81 minpoolsz = minsz; 82 maxpoolsz = maxsz; 83 waitingTime = threadwait * 1000L; 84 this.tm = tm; 85 poolnumber++; 86 logger = Log.getLogger(Log.JONAS_WORK_MGR_PREFIX); 87 if (logger.isLoggable(BasicLevel.DEBUG)) { 88 logger.log(BasicLevel.DEBUG, "thread pool #" + poolnumber); 89 logger.log(BasicLevel.DEBUG, "minpoolsz = " + minsz + " maxpoolsz = " + maxsz); 90 } 91 for (poolsz = 0; poolsz < minsz; poolsz++) { 92 WorkThread st = new WorkThread(this, threadnumber++, poolnumber); 93 st.start(); 94 } 95 } 96 97 101 104 public int getCurrentPoolSize() { 105 return poolsz; 106 } 107 108 111 public int getMinPoolSize() { 112 return minpoolsz; 113 } 114 115 118 public int getMaxPoolSize() { 119 return maxpoolsz; 120 } 121 122 126 public void setMinPoolSize(int minsz) { 127 minpoolsz = minsz; 128 } 129 130 134 public void setMaxPoolSize(int maxsz) { 135 maxpoolsz = maxsz; 136 } 137 138 142 152 public void doWork(Work work) throws WorkException { 153 doMyWork(work, INDEFINITE, null, null, 0); 154 } 155 156 175 public void doWork(Work work, long timeout, ExecutionContext ectx, WorkListener listener) throws WorkException { 176 if (logger.isLoggable(BasicLevel.DEBUG)) { 177 logger.log(BasicLevel.DEBUG, ""); 178 } 179 if (listener != null) { 180 listener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 181 } 182 doMyWork(work, timeout, ectx, listener, System.currentTimeMillis()); 183 } 184 185 199 public long startWork(Work work) throws WorkException { 200 return startWork(work, INDEFINITE, null, null); 201 } 202 203 226 public long startWork(Work work, long timeout, ExecutionContext ectx, WorkListener listener) throws WorkException { 227 if (logger.isLoggable(BasicLevel.DEBUG)) { 228 logger.log(BasicLevel.DEBUG, ""); 229 } 230 JWork mywork = new JWork(work, timeout, ectx, listener); 231 if (listener != null) { 232 listener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 233 } 234 long starttime = System.currentTimeMillis(); 235 long duration = 0; 236 synchronized (workList) { 237 workList.add(mywork); 238 if (poolsz < maxpoolsz && workList.size() > freeThreads) { 239 poolsz++; 241 WorkThread st = new WorkThread(this, threadnumber++, poolnumber); 242 st.start(); 243 } else { 244 workList.notify(); 245 } 246 } 247 boolean started = false; 249 synchronized (mywork) { 250 if (! mywork.isStarted()) { 251 try { 252 long waittime = waitingTime; 254 if (timeout < waittime) { 255 waittime = timeout + FEW_MORE_SECONDS; 256 } 257 mywork.wait(waittime); 258 } catch (InterruptedException e) { 259 throw new WorkRejectedException ("Interrupted"); 260 } 261 } 262 started = mywork.isStarted(); 263 } 264 duration = System.currentTimeMillis() - starttime; 265 if (! started) { 266 synchronized (workList) { 267 if (! workList.remove(mywork)) { 269 if (logger.isLoggable(BasicLevel.DEBUG)) { 270 logger.log(BasicLevel.DEBUG, "cannot remove work"); 271 } 272 } 273 throw new WorkRejectedException (WorkException.START_TIMED_OUT); 274 } 275 } 276 return duration; 277 } 278 279 299 public void scheduleWork(Work work) throws WorkException { 300 scheduleWork(work, INDEFINITE, null, null); 301 } 302 303 323 public void scheduleWork(Work work, long timeout, ExecutionContext ectx, WorkListener listener) throws WorkException { 324 if (logger.isLoggable(BasicLevel.DEBUG)) { 325 logger.log(BasicLevel.DEBUG, ""); 326 } 327 JWork mywork = new JWork(work, timeout, ectx, listener); 328 if (listener != null) { 329 listener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 330 } 331 synchronized (workList) { 332 workList.add(mywork); 333 if (poolsz < maxpoolsz && workList.size() > freeThreads) { 334 poolsz++; 336 WorkThread st = new WorkThread(this, threadnumber++, poolnumber); 337 st.start(); 338 } else { 339 workList.notify(); 341 } 342 } 343 } 344 345 348 private void doMyWork(Work work, long timeout, ExecutionContext ectx, WorkListener listener, long creationTime) throws WorkException { 349 if (logger.isLoggable(BasicLevel.DEBUG)) { 350 logger.log(BasicLevel.DEBUG, "timeout=" + timeout); 351 } 352 if (listener != null) { 354 long duration = System.currentTimeMillis() - creationTime; 355 if (duration > timeout) { 356 logger.log(BasicLevel.WARN, "REJECTED: duration=" + duration); 358 listener.workRejected(new WorkEvent (this, WorkEvent.WORK_REJECTED, work, null)); 359 return; 360 } 361 listener.workStarted(new WorkEvent (this, WorkEvent.WORK_STARTED, work, null)); 362 } 363 364 Xid xid = null; 367 if (ectx != null) { 368 xid = ectx.getXid(); 369 if (xid != null) { 370 long txtimeout = ectx.getTransactionTimeout(); 371 try { 372 if (txtimeout != WorkManager.UNKNOWN) { 373 ((Current) tm).begin(xid, txtimeout); 374 } else { 375 ((Current) tm).begin(xid); 376 } 377 } catch (NotSupportedException e) { 378 throw new WorkException ("Error starting a new transaction", e); 379 } catch (SystemException e) { 380 throw new WorkException ("Error starting a new transaction", e); 381 } 382 } 383 } 384 385 try { 386 work.run(); 387 if (listener != null) { 389 listener.workCompleted(new WorkEvent (this, WorkEvent.WORK_COMPLETED, work, null)); 390 } 391 } catch (Exception e) { 392 if (listener != null) { 393 listener.workCompleted(new WorkEvent (this, WorkEvent.WORK_COMPLETED, work, null)); 394 } 395 throw new WorkCompletedException (e); 396 } finally { 397 if (xid != null) { 398 ((Current) tm).clearThreadTx(); 399 } 400 } 401 } 402 403 407 public void nextWork() throws WorkException , InterruptedException { 408 JWork run = null; 409 boolean haswait = false; 410 synchronized (workList) { 411 while (workList.isEmpty()) { 412 if ((haswait && freeThreads > minpoolsz) || !valid) { 413 poolsz--; 414 throw new InterruptedException ("Thread ending"); 415 } 416 try { 417 freeThreads++; 418 if (logger.isLoggable(BasicLevel.DEBUG)) { 419 logger.log(BasicLevel.DEBUG, "waiting"); 420 } 421 workList.wait(waitingTime); 422 if (logger.isLoggable(BasicLevel.DEBUG)) { 423 logger.log(BasicLevel.DEBUG, "notified"); 424 } 425 freeThreads--; 426 haswait = true; 427 } catch (InterruptedException e) { 428 freeThreads--; 429 poolsz--; 430 throw e; 431 } 432 } 433 run = (JWork) workList.removeFirst(); 434 synchronized(run) { 436 if (logger.isLoggable(BasicLevel.DEBUG)) { 437 logger.log(BasicLevel.DEBUG, "start new work"); 438 } 439 run.setStarted(); 440 run.notify(); 441 } 442 } 443 doMyWork(run.getWork(), run.getTimeout(), run.getExecutionContext(), run.getWorkListener(), 444 run.getCreationTime()); 445 } 446 447 450 public synchronized void stopThreads() { 451 if (logger.isLoggable(BasicLevel.DEBUG)) { 452 logger.log(BasicLevel.DEBUG, ""); 453 } 454 valid = false; 455 notifyAll(); 456 poolnumber--; 457 } 458 459 class JWork { 460 private Work work; 461 private long timeout; 462 private ExecutionContext ectx; 463 private WorkListener listener; 464 private long creationTime; 465 private boolean started = false; 466 467 public JWork(Work work, long timeout, ExecutionContext ectx, WorkListener listener) { 468 this.work = work; 469 this.timeout = timeout; 470 this.ectx = ectx; 471 this.listener = listener; 472 creationTime = System.currentTimeMillis(); 473 if (logger.isLoggable(BasicLevel.DEBUG)) { 474 logger.log(BasicLevel.DEBUG, "timeout=" + timeout); 475 } 476 } 477 478 public Work getWork() { 479 return work; 480 } 481 482 public long getTimeout() { 483 return timeout; 484 } 485 486 public ExecutionContext getExecutionContext() { 487 return ectx; 488 } 489 490 public WorkListener getWorkListener() { 491 return listener; 492 } 493 494 public long getCreationTime() { 495 return creationTime; 496 } 497 498 public boolean isStarted() { 499 return started; 500 } 501 502 public void setStarted() { 503 if (logger.isLoggable(BasicLevel.DEBUG)) { 504 logger.log(BasicLevel.DEBUG, ""); 505 } 506 started = true; 507 } 508 } 509 510 513 class WorkThread extends Thread { 514 515 private JWorkManager mgr; 516 private int number; 517 518 524 WorkThread(JWorkManager m, int num, int wm) { 525 mgr = m; 526 number = num; 527 setName("WorkThread-" + wm + "/" + num); 528 } 529 530 public void run() { 531 if (logger.isLoggable(BasicLevel.DEBUG)) { 532 logger.log(BasicLevel.DEBUG, "running"); 533 } 534 while (true) { 535 try { 536 mgr.nextWork(); 537 } catch (InterruptedException e) { 538 if (logger.isLoggable(BasicLevel.DEBUG)) { 539 logger.log(BasicLevel.DEBUG, "Exiting: ", e); 540 } 541 return; 542 } catch (WorkException e) { 543 logger.log(BasicLevel.ERROR, "Exception during work run: ", e); 544 } 545 } 546 } 547 548 } 549 550 } 551 552 | Popular Tags |