1 16 17 package org.springframework.jca.work; 18 19 import javax.resource.spi.work.ExecutionContext ; 20 import javax.resource.spi.work.Work ; 21 import javax.resource.spi.work.WorkAdapter ; 22 import javax.resource.spi.work.WorkCompletedException ; 23 import javax.resource.spi.work.WorkEvent ; 24 import javax.resource.spi.work.WorkException ; 25 import javax.resource.spi.work.WorkListener ; 26 import javax.resource.spi.work.WorkManager ; 27 import javax.resource.spi.work.WorkRejectedException ; 28 29 import org.springframework.core.task.AsyncTaskExecutor; 30 import org.springframework.core.task.SimpleAsyncTaskExecutor; 31 import org.springframework.core.task.SyncTaskExecutor; 32 import org.springframework.core.task.TaskExecutor; 33 import org.springframework.core.task.TaskRejectedException; 34 import org.springframework.core.task.TaskTimeoutException; 35 import org.springframework.util.Assert; 36 37 63 public class SimpleTaskWorkManager implements WorkManager { 64 65 private TaskExecutor syncTaskExecutor = new SyncTaskExecutor(); 66 67 private TaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor(); 68 69 70 75 public void setSyncTaskExecutor(TaskExecutor syncTaskExecutor) { 76 this.syncTaskExecutor = syncTaskExecutor; 77 } 78 79 86 public void setAsyncTaskExecutor(TaskExecutor asyncTaskExecutor) { 87 this.asyncTaskExecutor = asyncTaskExecutor; 88 } 89 90 91 public void doWork(Work work) throws WorkException { 92 doWork(work, WorkManager.INDEFINITE, null, null); 93 } 94 95 public void doWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener) 96 throws WorkException { 97 98 Assert.state(this.syncTaskExecutor != null, "No 'syncTaskExecutor' set"); 99 executeWork(this.syncTaskExecutor, work, startTimeout, false, executionContext, workListener); 100 } 101 102 public long startWork(Work work) throws WorkException { 103 return startWork(work, WorkManager.INDEFINITE, null, null); 104 } 105 106 public long startWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener) 107 throws WorkException { 108 109 Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set"); 110 return executeWork(this.asyncTaskExecutor, work, startTimeout, true, executionContext, workListener); 111 } 112 113 public void scheduleWork(Work work) throws WorkException { 114 scheduleWork(work, WorkManager.INDEFINITE, null, null); 115 } 116 117 public void scheduleWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener) 118 throws WorkException { 119 120 Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set"); 121 executeWork(this.asyncTaskExecutor, work, startTimeout, false, executionContext, workListener); 122 } 123 124 125 137 protected long executeWork(TaskExecutor taskExecutor, Work work, long startTimeout, 138 boolean blockUntilStarted, ExecutionContext executionContext, WorkListener workListener) 139 throws WorkException { 140 141 if (executionContext != null && executionContext.getXid() != null) { 142 throw new WorkException ("SimpleTaskWorkManager does not supported imported XIDs: " + executionContext.getXid()); 143 } 144 WorkListener workListenerToUse = workListener; 145 if (workListenerToUse == null) { 146 workListenerToUse = new WorkAdapter (); 147 } 148 149 boolean isAsync = (taskExecutor instanceof AsyncTaskExecutor); 150 DelegatingWorkAdapter workHandle = new DelegatingWorkAdapter(work, workListenerToUse, !isAsync); 151 try { 152 if (isAsync) { 153 ((AsyncTaskExecutor) taskExecutor).execute(workHandle, startTimeout); 154 } 155 else { 156 taskExecutor.execute(workHandle); 157 } 158 } 159 catch (TaskTimeoutException ex) { 160 WorkException wex = new WorkRejectedException ("TaskExecutor rejected Work because of timeout: " + work, ex); 161 wex.setErrorCode(WorkException.START_TIMED_OUT); 162 workListenerToUse.workRejected(new WorkEvent (this, WorkEvent.WORK_REJECTED, work, wex)); 163 throw wex; 164 } 165 catch (TaskRejectedException ex) { 166 WorkException wex = new WorkRejectedException ("TaskExecutor rejected Work: " + work, ex); 167 wex.setErrorCode(WorkException.INTERNAL); 168 workListenerToUse.workRejected(new WorkEvent (this, WorkEvent.WORK_REJECTED, work, wex)); 169 throw wex; 170 } 171 catch (Throwable ex) { 172 WorkException wex = new WorkException ("TaskExecutor failed to execute Work: " + work, ex); 173 wex.setErrorCode(WorkException.INTERNAL); 174 throw wex; 175 } 176 if (isAsync) { 177 workListenerToUse.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 178 } 179 180 if (blockUntilStarted) { 181 long acceptanceTime = System.currentTimeMillis(); 182 synchronized (workHandle.monitor) { 183 try { 184 while (!workHandle.started) { 185 workHandle.monitor.wait(); 186 } 187 } 188 catch (InterruptedException ex) { 189 Thread.currentThread().interrupt(); 190 } 191 } 192 return (System.currentTimeMillis() - acceptanceTime); 193 } 194 else { 195 return WorkManager.UNKNOWN; 196 } 197 } 198 199 200 204 private static class DelegatingWorkAdapter implements Work { 205 206 private final Work work; 207 208 private final WorkListener workListener; 209 210 private final boolean acceptOnExecution; 211 212 public final Object monitor = new Object (); 213 214 public boolean started = false; 215 216 public DelegatingWorkAdapter(Work work, WorkListener workListener, boolean acceptOnExecution) { 217 this.work = work; 218 this.workListener = workListener; 219 this.acceptOnExecution = acceptOnExecution; 220 } 221 222 public void run() { 223 if (this.acceptOnExecution) { 224 this.workListener.workAccepted(new WorkEvent (this, WorkEvent.WORK_ACCEPTED, work, null)); 225 } 226 synchronized (this.monitor) { 227 this.started = true; 228 this.monitor.notify(); 229 } 230 this.workListener.workStarted(new WorkEvent (this, WorkEvent.WORK_STARTED, this.work, null)); 231 try { 232 this.work.run(); 233 } 234 catch (RuntimeException ex) { 235 this.workListener.workCompleted( 236 new WorkEvent (this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException (ex))); 237 throw ex; 238 } 239 catch (Error err) { 240 this.workListener.workCompleted( 241 new WorkEvent (this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException (err))); 242 throw err; 243 } 244 this.workListener.workCompleted(new WorkEvent (this, WorkEvent.WORK_COMPLETED, this.work, null)); 245 } 246 247 public void release() { 248 this.work.release(); 249 } 250 } 251 252 } 253 | Popular Tags |