1 10 11 27 28 package org.mule.impl.work; 29 30 import edu.emory.mathcs.backport.java.util.concurrent.Executor; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.mule.MuleManager; 35 import org.mule.config.ThreadingProfile; 36 import org.mule.umo.UMOException; 37 import org.mule.umo.manager.UMOWorkManager; 38 39 import javax.resource.spi.XATerminator ; 40 import javax.resource.spi.work.ExecutionContext ; 41 import javax.resource.spi.work.Work ; 42 import javax.resource.spi.work.WorkCompletedException ; 43 import javax.resource.spi.work.WorkException ; 44 import javax.resource.spi.work.WorkListener ; 45 46 52 public class MuleWorkManager implements UMOWorkManager 53 { 54 57 protected static final Log logger = LogFactory.getLog(MuleWorkManager.class); 58 59 63 private WorkExecutorPool syncWorkExecutorPool; 64 65 69 private WorkExecutorPool startWorkExecutorPool; 70 71 75 private WorkExecutorPool scheduledWorkExecutorPool; 76 77 private final WorkExecutor scheduleWorkExecutor = new ScheduleWorkExecutor(); 78 private final WorkExecutor startWorkExecutor = new StartWorkExecutor(); 79 private final WorkExecutor syncWorkExecutor = new SyncWorkExecutor(); 80 81 84 public MuleWorkManager() 85 { 86 this(MuleManager.getConfiguration().getDefaultThreadingProfile(), null); 87 } 88 89 public MuleWorkManager(ThreadingProfile profile, String name) 90 { 91 if (name == null) 92 { 93 name = "WorkManager#" + hashCode(); 94 } 95 syncWorkExecutorPool = new NullWorkExecutorPool(profile, name); 96 startWorkExecutorPool = new NullWorkExecutorPool(profile, name); 97 scheduledWorkExecutorPool = new NullWorkExecutorPool(profile, name); 98 } 99 100 public void start() throws UMOException 101 { 102 syncWorkExecutorPool = syncWorkExecutorPool.start(); 103 startWorkExecutorPool = startWorkExecutorPool.start(); 104 scheduledWorkExecutorPool = scheduledWorkExecutorPool.start(); 105 } 106 107 public void stop() throws UMOException 108 { 109 syncWorkExecutorPool = syncWorkExecutorPool.stop(); 110 startWorkExecutorPool = startWorkExecutorPool.stop(); 111 scheduledWorkExecutorPool = scheduledWorkExecutorPool.stop(); 112 } 113 114 public void dispose() 115 { 116 try 117 { 118 stop(); 119 } 120 catch (UMOException e) 121 { 122 logger.warn("Error while disposing Work Manager: " + e.getMessage(), e); 123 124 } 125 } 126 127 public XATerminator getXATerminator() 129 { 130 return null; 131 } 132 133 public int getSyncThreadCount() 134 { 135 return syncWorkExecutorPool.getPoolSize(); 136 } 137 138 public int getSyncMaximumPoolSize() 139 { 140 return syncWorkExecutorPool.getMaximumPoolSize(); 141 } 142 143 public void setSyncMaximumPoolSize(int maxSize) 144 { 145 syncWorkExecutorPool.setMaximumPoolSize(maxSize); 146 } 147 148 public int getStartThreadCount() 149 { 150 return startWorkExecutorPool.getPoolSize(); 151 } 152 153 public int getStartMaximumPoolSize() 154 { 155 return startWorkExecutorPool.getMaximumPoolSize(); 156 } 157 158 public void setStartMaximumPoolSize(int maxSize) 159 { 160 startWorkExecutorPool.setMaximumPoolSize(maxSize); 161 } 162 163 public int getScheduledThreadCount() 164 { 165 return scheduledWorkExecutorPool.getPoolSize(); 166 } 167 168 public int getScheduledMaximumPoolSize() 169 { 170 return scheduledWorkExecutorPool.getMaximumPoolSize(); 171 } 172 173 public void setScheduledMaximumPoolSize(int maxSize) 174 { 175 scheduledWorkExecutorPool.setMaximumPoolSize(maxSize); 176 } 177 178 183 public void doWork(Work work) throws WorkException 184 { 185 executeWork(new WorkerContext(work), syncWorkExecutor, syncWorkExecutorPool); 186 } 187 188 195 public void doWork(Work work, long startTimeout, ExecutionContext execContext, WorkListener workListener) 196 throws WorkException 197 { 198 WorkerContext workWrapper = new WorkerContext(work, startTimeout, execContext, workListener); 199 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 200 executeWork(workWrapper, syncWorkExecutor, syncWorkExecutorPool); 201 } 202 203 208 public long startWork(Work work) throws WorkException 209 { 210 WorkerContext workWrapper = new WorkerContext(work); 211 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 212 executeWork(workWrapper, startWorkExecutor, startWorkExecutorPool); 213 return System.currentTimeMillis() - workWrapper.getAcceptedTime(); 214 } 215 216 223 public long startWork(Work work, 224 long startTimeout, 225 ExecutionContext execContext, 226 WorkListener workListener) throws WorkException 227 { 228 WorkerContext workWrapper = new WorkerContext(work, startTimeout, execContext, workListener); 229 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 230 executeWork(workWrapper, startWorkExecutor, startWorkExecutorPool); 231 return System.currentTimeMillis() - workWrapper.getAcceptedTime(); 232 } 233 234 239 public void scheduleWork(Work work) throws WorkException 240 { 241 WorkerContext workWrapper = new WorkerContext(work); 242 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 243 executeWork(workWrapper, scheduleWorkExecutor, scheduledWorkExecutorPool); 244 } 245 246 253 public void scheduleWork(Work work, 254 long startTimeout, 255 ExecutionContext execContext, 256 WorkListener workListener) throws WorkException 257 { 258 WorkerContext workWrapper = new WorkerContext(work, startTimeout, execContext, workListener); 259 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 260 executeWork(workWrapper, scheduleWorkExecutor, scheduledWorkExecutorPool); 261 } 262 263 270 private void executeWork(WorkerContext work, WorkExecutor workExecutor, Executor pooledExecutor) 271 throws WorkException 272 { 273 work.workAccepted(this); 274 try 275 { 276 workExecutor.doExecute(work, pooledExecutor); 277 WorkException exception = work.getWorkException(); 278 if (null != exception) 279 { 280 throw exception; 281 } 282 } 283 catch (InterruptedException e) 284 { 285 WorkCompletedException wcj = new WorkCompletedException ("The execution has been interrupted.", e); 286 wcj.setErrorCode(WorkException.INTERNAL); 287 throw wcj; 288 } 289 } 290 } 291 | Popular Tags |