1 25 26 package org.objectweb.easybeans.jca.workmanager; 27 28 import java.util.LinkedList ; 29 30 import javax.resource.spi.work.ExecutionContext ; 31 import javax.resource.spi.work.Work ; 32 import javax.resource.spi.work.WorkCompletedException ; 33 import javax.resource.spi.work.WorkEvent ; 34 import javax.resource.spi.work.WorkException ; 35 import javax.resource.spi.work.WorkListener ; 36 import javax.resource.spi.work.WorkManager ; 37 import javax.resource.spi.work.WorkRejectedException ; 38 import javax.transaction.NotSupportedException ; 39 import javax.transaction.SystemException ; 40 import javax.transaction.TransactionManager ; 41 import javax.transaction.xa.Xid ; 42 43 import org.objectweb.easybeans.log.JLog; 44 import org.objectweb.easybeans.log.JLogFactory; 45 import org.objectweb.jotm.Current; 46 47 52 public class ResourceWorkManager implements WorkManager { 53 54 57 private static final long MILLISECONDS = 1000L; 58 59 62 private static JLog logger = JLogFactory.getLog(ResourceWorkManager.class); 63 64 67 private LinkedList <ResourceWork> workList = new LinkedList <ResourceWork>(); 68 69 72 private static int poolnumber = 0; 73 74 78 private static int threadnumber = 0; 79 80 83 private int maxpoolsz; 84 85 88 private int minpoolsz; 89 90 93 private int poolsz; 94 95 98 private int freeThreads; 99 100 103 private long waitingTime; 104 105 108 private boolean stopped = false; 109 110 113 private static final long FEW_MORE_SECONDS = 3000; 114 115 118 private TransactionManager transactionManager; 119 120 127 @SuppressWarnings ("boxing") 128 public ResourceWorkManager(final TransactionManager transactionManager, final int minsz, final int maxsz, 129 final long threadwait) { 130 this.minpoolsz = minsz; 131 this.maxpoolsz = maxsz; 132 this.waitingTime = threadwait * MILLISECONDS; 133 this.transactionManager = transactionManager; 134 poolnumber++; 136 if (logger.isDebugEnabled()) { 137 logger.debug("thread pool {0}", poolnumber); 138 logger.debug("minpool size = {0} and maxpool sizez = {1}", minsz, maxsz); 139 } 140 for (poolsz = 0; poolsz < minsz; poolsz++) { 142 ResourceWorkThread resourceWorkThread = new ResourceWorkThread(this, poolnumber, threadnumber++); 143 resourceWorkThread.start(); 144 } 145 } 146 147 150 public int getCurrentPoolSize() { 151 return poolsz; 152 } 153 154 157 public int getMinPoolSize() { 158 return minpoolsz; 159 } 160 161 164 public int getMaxPoolSize() { 165 return maxpoolsz; 166 } 167 168 172 public void setMinPoolSize(final int minsz) { 173 minpoolsz = minsz; 174 } 175 176 180 public void setMaxPoolSize(final int maxsz) { 181 maxpoolsz = maxsz; 182 } 183 184 196 public void doWork(final Work work) throws WorkRejectedException , WorkCompletedException , WorkException { 197 doMyWork(work, INDEFINITE, null, null, 0); 198 } 199 200 219 public void doWork(final Work work, final long timeout, final ExecutionContext executionContext, 220 final WorkListener workListener) throws WorkRejectedException , WorkCompletedException , WorkException { 221 if (workListener != null) { 222 workListener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 223 } 224 doMyWork(work, timeout, executionContext, workListener, System.currentTimeMillis()); 225 } 226 227 241 public long startWork(final Work work) throws WorkRejectedException , WorkException { 242 return startWork(work, INDEFINITE, null, null); 243 } 244 245 268 public long startWork(final Work work, final long timeout, final ExecutionContext executionContext, 269 final WorkListener workListener) throws WorkRejectedException , WorkException { 270 271 ResourceWork resourceWork = new ResourceWork(work, timeout, executionContext, workListener); 272 if (workListener != null) { 273 workListener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 274 } 275 long starttime = System.currentTimeMillis(); 276 long duration = 0; 277 synchronized (workList) { 278 workList.add(resourceWork); 279 if (poolsz < maxpoolsz && workList.size() > freeThreads) { 280 poolsz++; 282 ResourceWorkThread resourceWorkThread = new ResourceWorkThread(this, threadnumber++, poolnumber); 283 resourceWorkThread.start(); 284 } else { 285 workList.notify(); 286 } 287 } 288 boolean started = false; 290 synchronized (resourceWork) { 291 if (!resourceWork.isStarted()) { 292 try { 293 long waittime = waitingTime; 295 if (timeout < waittime) { 296 waittime = timeout + FEW_MORE_SECONDS; 297 } 298 resourceWork.wait(waittime); 299 } catch (InterruptedException e) { 300 throw new WorkRejectedException ("Interrupted"); 301 } 302 } 303 started = resourceWork.isStarted(); 304 } 305 duration = System.currentTimeMillis() - starttime; 306 if (!started) { 307 synchronized (workList) { 308 if (!workList.remove(resourceWork)) { 310 logger.debug("Cannot remove the work"); 311 } 312 throw new WorkRejectedException (WorkException.START_TIMED_OUT); 313 } 314 } 315 return duration; 316 } 317 318 330 public void scheduleWork(final Work work) throws WorkRejectedException , WorkException { 331 scheduleWork(work, INDEFINITE, null, null); 332 } 333 334 354 public void scheduleWork(final Work work, final long timeout, final ExecutionContext executionContext, 355 final WorkListener workListener) throws WorkRejectedException , WorkException { 356 357 ResourceWork resourceWork = new ResourceWork(work, timeout, executionContext, workListener); 358 if (workListener != null) { 359 workListener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 360 } 361 synchronized (workList) { 362 workList.add(resourceWork); 363 if (poolsz < maxpoolsz && workList.size() > freeThreads) { 364 poolsz++; 366 ResourceWorkThread resourceWorkThread = new ResourceWorkThread(this, threadnumber++, poolnumber); 367 resourceWorkThread.start(); 368 } else { 369 workList.notify(); 371 } 372 } 373 } 374 375 390 @SuppressWarnings ("boxing") 391 private void doMyWork(final Work work, final long timeout, final ExecutionContext executionContext, 392 final WorkListener workListener, final long creationTime) throws WorkException { 393 394 if (workListener != null) { 396 long duration = System.currentTimeMillis() - creationTime; 397 if (duration > timeout) { 398 logger.warn("REJECTED: duration= {0}", duration); 400 workListener.workRejected(new WorkEvent (this, WorkEvent.WORK_REJECTED, work, null)); 401 return; 402 } 403 workListener.workStarted(new WorkEvent (this, WorkEvent.WORK_STARTED, work, null)); 404 } 405 406 Xid xid = null; 408 if (executionContext != null) { 409 xid = executionContext.getXid(); 410 if (xid != null) { 411 long txtimeout = executionContext.getTransactionTimeout(); 412 try { 413 if (txtimeout != WorkManager.UNKNOWN) { 414 ((Current) transactionManager).begin(xid, txtimeout); 415 } else { 416 ((Current) transactionManager).begin(xid); 417 } 418 } catch (NotSupportedException e) { 419 throw new WorkException ("Error starting a new transaction", e); 420 } catch (SystemException e) { 421 throw new WorkException ("Error starting a new transaction", e); 422 } 423 } 424 } 425 426 try { 427 work.run(); 428 if (workListener != null) { 430 workListener.workCompleted(new WorkEvent (this, WorkEvent.WORK_COMPLETED, work, null)); 431 } 432 } catch (Exception e) { 433 if (workListener != null) { 434 workListener.workCompleted(new WorkEvent (this, WorkEvent.WORK_COMPLETED, work, null)); 435 } 436 throw new WorkCompletedException (e); 437 } finally { 438 if (xid != null) { 439 ((Current) transactionManager).clearThreadTx(); 440 } 441 } 442 } 443 444 450 public void nextWork() throws WorkException , InterruptedException , ResourceWorkManagerStoppedException { 451 ResourceWork run = null; 452 boolean haswait = false; 453 synchronized (workList) { 454 while (workList.isEmpty()) { 455 if ((haswait && freeThreads > minpoolsz) || stopped) { 456 poolsz--; 457 throw new ResourceWorkManagerStoppedException("Manager is stopped"); 458 } 459 try { 460 freeThreads++; 461 workList.wait(waitingTime); 462 freeThreads--; 463 haswait = true; 464 } catch (InterruptedException e) { 465 freeThreads--; 466 poolsz--; 467 throw e; 468 } 469 } 470 run = workList.removeFirst(); 471 synchronized (run) { 473 logger.debug("Starting a new work"); 474 run.setStarted(); 475 run.notify(); 476 } 477 } 478 doMyWork(run.getWork(), run.getTimeout(), run.getExecutionContext(), run.getWorkListener(), run 479 .getCreationTime()); 480 } 481 482 485 public synchronized void stopThreads() { 486 stopped = true; 487 notifyAll(); 488 poolnumber--; 489 } 490 491 } 492 | Popular Tags |