1 22 package org.jboss.resource.work; 23 24 import javax.management.ObjectName ; 25 import javax.resource.spi.work.ExecutionContext ; 26 import javax.resource.spi.work.Work ; 27 import javax.resource.spi.work.WorkException ; 28 import javax.resource.spi.work.WorkListener ; 29 import javax.resource.spi.work.WorkManager ; 30 import javax.transaction.xa.Xid ; 31 32 import org.jboss.system.ServiceMBeanSupport; 33 import org.jboss.tm.JBossXATerminator; 34 import org.jboss.util.threadpool.Task; 35 import org.jboss.util.threadpool.ThreadPool; 36 37 43 public class JBossWorkManager extends ServiceMBeanSupport implements WorkManager , JBossWorkManagerMBean 44 { 45 46 private boolean trace = log.isTraceEnabled(); 47 48 49 private ThreadPool threadPool; 50 51 52 private ObjectName threadPoolName; 53 54 55 private JBossXATerminator xaTerminator; 56 57 58 private ObjectName xaTerminatorName; 59 60 65 public ThreadPool getThreadPool() 66 { 67 return threadPool; 68 } 69 70 75 public void setThreadPool(ThreadPool threadPool) 76 { 77 this.threadPool = threadPool; 78 } 79 80 public ObjectName getThreadPoolName() 81 { 82 return threadPoolName; 83 } 84 85 public void setThreadPoolName(ObjectName threadPoolName) 86 { 87 this.threadPoolName = threadPoolName; 88 } 89 90 public ObjectName getXATerminatorName() 91 { 92 return xaTerminatorName; 93 } 94 95 public void setXATerminatorName(ObjectName xaTerminatorName) 96 { 97 this.xaTerminatorName = xaTerminatorName; 98 } 99 100 public WorkManager getInstance() 101 { 102 return this; 103 } 104 105 public void doWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException 106 { 107 if (ctx == null) 108 ctx = new ExecutionContext (); 109 WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_FOR_COMPLETE, startTimeout, ctx, listener); 110 importWork(wrapper); 111 executeWork(wrapper); 112 if (wrapper.getWorkException() != null) 113 throw wrapper.getWorkException(); 114 } 115 116 public void doWork(Work work) throws WorkException 117 { 118 doWork(work, WorkManager.INDEFINITE, null, null); 119 } 120 121 public long startWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException 122 { 123 if (ctx == null) 124 ctx = new ExecutionContext (); 125 WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_FOR_START, startTimeout, ctx, listener); 126 importWork(wrapper); 127 executeWork(wrapper); 128 if (wrapper.getWorkException() != null) 129 throw wrapper.getWorkException(); 130 return wrapper.getBlockedElapsed(); 131 } 132 133 public long startWork(Work work) throws WorkException 134 { 135 return startWork(work, WorkManager.INDEFINITE, null, null); 136 } 137 138 public void scheduleWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException 139 { 140 if (ctx == null) 141 ctx = new ExecutionContext (); 142 WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_NONE, startTimeout, ctx, listener); 143 importWork(wrapper); 144 executeWork(wrapper); 145 if (wrapper.getWorkException() != null) 146 throw wrapper.getWorkException(); 147 } 148 149 public void scheduleWork(Work work) throws WorkException 150 { 151 scheduleWork(work, WorkManager.INDEFINITE, null, null); 152 } 153 154 protected void startService() throws Exception 155 { 156 if (threadPoolName == null) 157 throw new IllegalStateException ("No thread pool name"); 158 159 threadPool = (ThreadPool) server.getAttribute(threadPoolName, "Instance"); 160 161 if (xaTerminatorName == null) 162 throw new IllegalStateException ("No xa terminator name"); 163 164 xaTerminator = (JBossXATerminator) server.getAttribute(xaTerminatorName, "XATerminator"); 165 } 166 167 173 protected void importWork(WorkWrapper wrapper) throws WorkException 174 { 175 trace = log.isTraceEnabled(); 176 if (trace) 177 log.trace("Importing work " + wrapper); 178 179 ExecutionContext ctx = wrapper.getExecutionContext(); 180 if (ctx != null) 181 { 182 Xid xid = ctx.getXid(); 183 if (xid != null) 184 { 185 long timeout = ctx.getTransactionTimeout(); 186 xaTerminator.registerWork(wrapper.getWork(), xid, timeout); 187 } 188 } 189 if (trace) 190 log.trace("Imported work " + wrapper); 191 } 192 193 199 protected void executeWork(WorkWrapper wrapper) throws WorkException 200 { 201 if (trace) 202 log.trace("Submitting work to thread pool " + wrapper); 203 204 threadPool.runTaskWrapper(wrapper); 205 206 if (trace) 207 log.trace("Submitted work to thread pool " + wrapper); 208 } 209 210 216 protected void startWork(WorkWrapper wrapper) throws WorkException 217 { 218 if (trace) 219 log.trace("Starting work " + wrapper); 220 221 ExecutionContext ctx = wrapper.getExecutionContext(); 222 if (ctx != null) 223 { 224 Xid xid = ctx.getXid(); 225 if (xid != null) 226 { 227 xaTerminator.startWork(wrapper.getWork(), xid); 228 } 229 } 230 if (trace) 231 log.trace("Started work " + wrapper); 232 } 233 234 240 protected void endWork(WorkWrapper wrapper) 241 { 242 if (trace) 243 log.trace("Ending work " + wrapper); 244 245 ExecutionContext ctx = wrapper.getExecutionContext(); 246 if (ctx != null) 247 { 248 Xid xid = ctx.getXid(); 249 if (xid != null) 250 { 251 xaTerminator.endWork(wrapper.getWork(), xid); 252 } 253 } 254 if (trace) 255 log.trace("Ended work " + wrapper); 256 } 257 258 264 protected void cancelWork(WorkWrapper wrapper) 265 { 266 if (trace) 267 log.trace("Cancel work " + wrapper); 268 269 ExecutionContext ctx = wrapper.getExecutionContext(); 270 if (ctx != null) 271 { 272 Xid xid = ctx.getXid(); 273 if (xid != null) 274 { 275 xaTerminator.cancelWork(wrapper.getWork(), xid); 276 } 277 } 278 if (trace) 279 log.trace("Canceled work " + wrapper); 280 } 281 } 282 | Popular Tags |