1 17 18 package org.apache.geronimo.connector.work; 19 20 import javax.resource.spi.work.ExecutionContext ; 21 import javax.resource.spi.work.Work ; 22 import javax.resource.spi.work.WorkAdapter ; 23 import javax.resource.spi.work.WorkCompletedException ; 24 import javax.resource.spi.work.WorkEvent ; 25 import javax.resource.spi.work.WorkException ; 26 import javax.resource.spi.work.WorkListener ; 27 import javax.resource.spi.work.WorkManager ; 28 import javax.resource.spi.work.WorkRejectedException ; 29 import javax.transaction.xa.XAException ; 30 import javax.transaction.InvalidTransactionException ; 31 import javax.transaction.SystemException ; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 36 import org.apache.geronimo.transaction.manager.ImportedTransactionActiveException; 37 import org.apache.geronimo.transaction.manager.XAWork; 38 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; 39 40 45 public class WorkerContext implements Work { 46 47 private static final Log log = LogFactory.getLog(WorkerContext.class); 48 49 52 private static final WorkListener NULL_WORK_LISTENER = new WorkAdapter () { 53 public void workRejected(WorkEvent event) { 54 if (event.getException() != null) { 55 if (event.getException() instanceof WorkCompletedException && event.getException().getCause() != null) { 56 log.error(event.getWork().toString(), event.getException().getCause()); 57 } else { 58 log.error(event.getWork().toString(), event.getException()); 59 } 60 } 61 } 62 }; 63 64 67 private int threadPriority; 68 69 72 private Work adaptee; 73 74 77 private boolean isAccepted; 78 79 82 private long acceptedTime; 83 84 87 private int nbRetry; 88 89 93 private long startTimeOut; 94 95 98 private final ExecutionContext executionContext; 99 100 private final XAWork xaWork; 101 102 105 private WorkListener workListener = NULL_WORK_LISTENER; 106 107 110 private WorkException workException; 111 112 115 private CountDownLatch startLatch = new CountDownLatch(1); 116 117 120 private CountDownLatch endLatch = new CountDownLatch(1); 121 122 128 public WorkerContext(Work work, XAWork xaWork) { 129 adaptee = work; 130 executionContext = null; 131 this.xaWork = xaWork; 132 } 133 134 145 public WorkerContext(Work aWork, 146 long aStartTimeout, 147 ExecutionContext execContext, 148 XAWork xaWork, 149 WorkListener workListener) { 150 adaptee = aWork; 151 startTimeOut = aStartTimeout; 152 executionContext = execContext; 153 this.xaWork = xaWork; 154 if (null != workListener) { 155 this.workListener = workListener; 156 } 157 } 158 159 162 public void release() { 163 adaptee.release(); 164 } 165 166 174 public void setThreadPriority(int aPriority) { 175 threadPriority = aPriority; 176 } 177 178 186 public int getThreadPriority() { 187 return threadPriority; 188 } 189 190 197 public synchronized void workAccepted(Object anObject) { 198 isAccepted = true; 199 acceptedTime = System.currentTimeMillis(); 200 workListener.workAccepted(new WorkEvent (anObject, 201 WorkEvent.WORK_ACCEPTED, adaptee, null)); 202 } 203 204 210 public synchronized long getAcceptedTime() { 211 return acceptedTime; 212 } 213 214 220 public long getStartTimeout() { 221 return startTimeOut; 222 } 223 224 231 public synchronized boolean isTimedOut() { 232 assert isAccepted: "The work is not accepted."; 233 if (0 == startTimeOut || startTimeOut == WorkManager.INDEFINITE) { 236 return false; 237 } 238 boolean isTimeout = acceptedTime + startTimeOut > 0 && 239 System.currentTimeMillis() > acceptedTime + startTimeOut; 240 if (log.isDebugEnabled()) { 241 log.debug(this 242 + " accepted at " 243 + acceptedTime 244 + (isTimeout ? " has timed out." : " has not timed out. ") 245 + nbRetry 246 + " retries have been performed."); 247 } 248 if (isTimeout) { 249 workException = new WorkRejectedException (this + " has timed out.", 250 WorkException.START_TIMED_OUT); 251 workListener.workRejected(new WorkEvent (this, 252 WorkEvent.WORK_REJECTED, 253 adaptee, 254 workException)); 255 return true; 256 } 257 nbRetry++; 258 return isTimeout; 259 } 260 261 266 public synchronized WorkException getWorkException() { 267 return workException; 268 } 269 270 273 public void run() { 274 if (isTimedOut()) { 275 startLatch.countDown(); 278 endLatch.countDown(); 279 return; 280 } 281 workListener.workStarted(new WorkEvent (this, WorkEvent.WORK_STARTED, adaptee, null)); 285 startLatch.countDown(); 286 try { 289 if (executionContext == null || executionContext.getXid() == null) { 290 adaptee.run(); 291 } else { 292 try { 293 long transactionTimeout = executionContext.getTransactionTimeout(); 294 xaWork.begin(executionContext.getXid(), transactionTimeout < 0 ? 0 : transactionTimeout); 296 } catch (XAException e) { 297 throw new WorkCompletedException ("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); 298 } catch (InvalidTransactionException e) { 299 throw new WorkCompletedException ("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); 300 } catch (SystemException e) { 301 throw new WorkCompletedException ("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); 302 } catch (ImportedTransactionActiveException e) { 303 throw new WorkCompletedException ("Transaction already active for xid " + executionContext.getXid(), WorkCompletedException.TX_CONCURRENT_WORK_DISALLOWED); 304 } 305 try { 306 adaptee.run(); 307 } finally { 308 xaWork.end(executionContext.getXid()); 309 } 310 311 } 312 workListener.workCompleted(new WorkEvent (this, WorkEvent.WORK_COMPLETED, adaptee, null)); 313 } catch (Throwable e) { 314 workException = (WorkException ) (e instanceof WorkCompletedException ? e : new WorkCompletedException ("Unknown error", WorkCompletedException.UNDEFINED).initCause(e)); 315 workListener.workCompleted(new WorkEvent (this, WorkEvent.WORK_REJECTED, adaptee, 316 workException)); 317 } finally { 318 endLatch.countDown(); 319 } 320 } 321 322 329 public synchronized CountDownLatch provideStartLatch() { 330 return startLatch; 331 } 332 333 340 public synchronized CountDownLatch provideEndLatch() { 341 return endLatch; 342 } 343 344 public String toString() { 345 return "Work :" + adaptee; 346 } 347 348 } 349 | Popular Tags |