KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > geronimo > connector > work > WorkerContext


1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.geronimo.connector.work;
19
20 import javax.resource.spi.work.ExecutionContext JavaDoc;
21 import javax.resource.spi.work.Work JavaDoc;
22 import javax.resource.spi.work.WorkAdapter JavaDoc;
23 import javax.resource.spi.work.WorkCompletedException JavaDoc;
24 import javax.resource.spi.work.WorkEvent JavaDoc;
25 import javax.resource.spi.work.WorkException JavaDoc;
26 import javax.resource.spi.work.WorkListener JavaDoc;
27 import javax.resource.spi.work.WorkManager JavaDoc;
28 import javax.resource.spi.work.WorkRejectedException JavaDoc;
29 import javax.transaction.xa.XAException JavaDoc;
30 import javax.transaction.InvalidTransactionException JavaDoc;
31 import javax.transaction.SystemException JavaDoc;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35
36 import org.apache.geronimo.transaction.manager.ImportedTransactionActiveException;
37 import org.apache.geronimo.transaction.manager.XAWork;
38 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
39
40 /**
41  * Work wrapper providing an execution context to a Work instance.
42  *
43  * @version $Rev: 476049 $ $Date: 2006-11-16 23:35:17 -0500 (Thu, 16 Nov 2006) $
44  */

45 public class WorkerContext implements Work JavaDoc {
46
47     private static final Log log = LogFactory.getLog(WorkerContext.class);
48
49     /**
50      * Null WorkListener used as the default WorkListener.
51      */

52     private static final WorkListener JavaDoc NULL_WORK_LISTENER = new WorkAdapter JavaDoc() {
53         public void workRejected(WorkEvent JavaDoc event) {
54             if (event.getException() != null) {
55                 if (event.getException() instanceof WorkCompletedException JavaDoc && event.getException().getCause() != null) {
56                     log.error(event.getWork().toString(), event.getException().getCause());
57                 } else {
58                     log.error(event.getWork().toString(), event.getException());
59                 }
60             }
61         }
62     };
63
64     /**
65      * Priority of the thread, which will execute this work.
66      */

67     private int threadPriority;
68
69     /**
70      * Actual work to be executed.
71      */

72     private Work JavaDoc adaptee;
73
74     /**
75      * Indicates if this work has been accepted.
76      */

77     private boolean isAccepted;
78
79     /**
80      * System.currentTimeMillis() when the wrapped Work has been accepted.
81      */

82     private long acceptedTime;
83
84     /**
85      * Number of times that the execution of this work has been tried.
86      */

87     private int nbRetry;
88
89     /**
90      * Time duration (in milliseconds) within which the execution of the Work
91      * instance must start.
92      */

93     private long startTimeOut;
94
95     /**
96      * Execution context of the actual work to be executed.
97      */

98     private final ExecutionContext JavaDoc executionContext;
99
100     private final XAWork xaWork;
101
102     /**
103      * Listener to be notified during the life-cycle of the work treatment.
104      */

105     private WorkListener JavaDoc workListener = NULL_WORK_LISTENER;
106
107     /**
108      * Work exception, if any.
109      */

110     private WorkException JavaDoc workException;
111
112     /**
113      * A latch, which is released when the work is started.
114      */

115     private CountDownLatch startLatch = new CountDownLatch(1);
116
117     /**
118      * A latch, which is released when the work is completed.
119      */

120     private CountDownLatch endLatch = new CountDownLatch(1);
121
122     /**
123      * Create a WorkWrapper.
124      *
125      * @param work Work to be wrapped.
126      * @param xaWork
127      */

128     public WorkerContext(Work JavaDoc work, XAWork xaWork) {
129         adaptee = work;
130         executionContext = null;
131         this.xaWork = xaWork;
132     }
133
134     /**
135      * Create a WorkWrapper with the specified execution context.
136      *
137      * @param aWork Work to be wrapped.
138      * @param aStartTimeout a time duration (in milliseconds) within which the
139      * execution of the Work instance must start.
140      * @param execContext an object containing the execution context with which
141      * the submitted Work instance must be executed.
142      * @param workListener an object which would be notified when the various
143      * Work processing events (work accepted, work rejected, work started,
144      */

145     public WorkerContext(Work JavaDoc aWork,
146                          long aStartTimeout,
147                          ExecutionContext JavaDoc execContext,
148                          XAWork xaWork,
149                          WorkListener JavaDoc workListener) {
150         adaptee = aWork;
151         startTimeOut = aStartTimeout;
152         executionContext = execContext;
153         this.xaWork = xaWork;
154         if (null != workListener) {
155             this.workListener = workListener;
156         }
157     }
158
159     /* (non-Javadoc)
160      * @see javax.resource.spi.work.Work#release()
161      */

162     public void release() {
163         adaptee.release();
164     }
165
166     /**
167      * Defines the thread priority level of the thread, which will be dispatched
168      * to process this work. This priority level must be the same one for a
169      * given resource adapter.
170      *
171      * @param aPriority Priority of the thread to be used to process the wrapped
172      * Work instance.
173      */

174     public void setThreadPriority(int aPriority) {
175         threadPriority = aPriority;
176     }
177
178     /**
179      * Gets the priority level of the thread, which will be dispatched
180      * to process this work. This priority level must be the same one for a
181      * given resource adapter.
182      *
183      * @return The priority level of the thread to be dispatched to
184      * process the wrapped Work instance.
185      */

186     public int getThreadPriority() {
187         return threadPriority;
188     }
189
190     /**
191      * Call-back method used by a Work executor in order to notify this
192      * instance that the wrapped Work instance has been accepted.
193      *
194      * @param anObject Object on which the event initially occurred. It should
195      * be the work executor.
196      */

197     public synchronized void workAccepted(Object JavaDoc anObject) {
198         isAccepted = true;
199         acceptedTime = System.currentTimeMillis();
200         workListener.workAccepted(new WorkEvent JavaDoc(anObject,
201                 WorkEvent.WORK_ACCEPTED, adaptee, null));
202     }
203
204     /**
205      * System.currentTimeMillis() when the Work has been accepted. This method
206      * can be used to compute the duration of a work.
207      *
208      * @return When the work has been accepted.
209      */

210     public synchronized long getAcceptedTime() {
211         return acceptedTime;
212     }
213
214     /**
215      * Gets the time duration (in milliseconds) within which the execution of
216      * the Work instance must start.
217      *
218      * @return Time out duration.
219      */

220     public long getStartTimeout() {
221         return startTimeOut;
222     }
223
224     /**
225      * Used by a Work executor in order to know if this work, which should be
226      * accepted but not started has timed out. This method MUST be called prior
227      * to retry the execution of a Work.
228      *
229      * @return true if the Work has timed out and false otherwise.
230      */

231     public synchronized boolean isTimedOut() {
232         assert isAccepted: "The work is not accepted.";
233         // A value of 0 means that the work never times out.
234
//??? really?
235
if (0 == startTimeOut || startTimeOut == WorkManager.INDEFINITE) {
236             return false;
237         }
238         boolean isTimeout = acceptedTime + startTimeOut > 0 &&
239                 System.currentTimeMillis() > acceptedTime + startTimeOut;
240         if (log.isDebugEnabled()) {
241             log.debug(this
242                     + " accepted at "
243                     + acceptedTime
244                     + (isTimeout ? " has timed out." : " has not timed out. ")
245                     + nbRetry
246                     + " retries have been performed.");
247         }
248         if (isTimeout) {
249             workException = new WorkRejectedException JavaDoc(this + " has timed out.",
250                     WorkException.START_TIMED_OUT);
251             workListener.workRejected(new WorkEvent JavaDoc(this,
252                     WorkEvent.WORK_REJECTED,
253                     adaptee,
254                     workException));
255             return true;
256         }
257         nbRetry++;
258         return isTimeout;
259     }
260
261     /**
262      * Gets the WorkException, if any, thrown during the execution.
263      *
264      * @return WorkException, if any.
265      */

266     public synchronized WorkException JavaDoc getWorkException() {
267         return workException;
268     }
269
270     /* (non-Javadoc)
271      * @see java.lang.Runnable#run()
272      */

273     public void run() {
274         if (isTimedOut()) {
275             // In case of a time out, one releases the start and end latches
276
// to prevent a dead-lock.
277
startLatch.countDown();
278             endLatch.countDown();
279             return;
280         }
281         // Implementation note: the work listener is notified prior to release
282
// the start lock. This behavior is intentional and seems to be the
283
// more conservative.
284
workListener.workStarted(new WorkEvent JavaDoc(this, WorkEvent.WORK_STARTED, adaptee, null));
285         startLatch.countDown();
286         //Implementation note: we assume this is being called without an interesting TransactionContext,
287
//and ignore/replace whatever is associated with the current thread.
288
try {
289             if (executionContext == null || executionContext.getXid() == null) {
290                 adaptee.run();
291             } else {
292                 try {
293                     long transactionTimeout = executionContext.getTransactionTimeout();
294                     //translate -1 value to 0 to indicate default transaction timeout.
295
xaWork.begin(executionContext.getXid(), transactionTimeout < 0 ? 0 : transactionTimeout);
296                 } catch (XAException JavaDoc e) {
297                     throw new WorkCompletedException JavaDoc("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e);
298                 } catch (InvalidTransactionException JavaDoc e) {
299                     throw new WorkCompletedException JavaDoc("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e);
300                 } catch (SystemException JavaDoc e) {
301                     throw new WorkCompletedException JavaDoc("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e);
302                 } catch (ImportedTransactionActiveException e) {
303                     throw new WorkCompletedException JavaDoc("Transaction already active for xid " + executionContext.getXid(), WorkCompletedException.TX_CONCURRENT_WORK_DISALLOWED);
304                 }
305                 try {
306                     adaptee.run();
307                 } finally {
308                     xaWork.end(executionContext.getXid());
309                 }
310
311             }
312             workListener.workCompleted(new WorkEvent JavaDoc(this, WorkEvent.WORK_COMPLETED, adaptee, null));
313         } catch (Throwable JavaDoc e) {
314             workException = (WorkException JavaDoc) (e instanceof WorkCompletedException JavaDoc ? e : new WorkCompletedException JavaDoc("Unknown error", WorkCompletedException.UNDEFINED).initCause(e));
315             workListener.workCompleted(new WorkEvent JavaDoc(this, WorkEvent.WORK_REJECTED, adaptee,
316                     workException));
317         } finally {
318             endLatch.countDown();
319         }
320     }
321
322     /**
323      * Provides a latch, which can be used to wait the start of a work
324      * execution.
325      *
326      * @return Latch that a caller can acquire to wait for the start of a
327      * work execution.
328      */

329     public synchronized CountDownLatch provideStartLatch() {
330         return startLatch;
331     }
332
333     /**
334      * Provides a latch, which can be used to wait the end of a work
335      * execution.
336      *
337      * @return Latch that a caller can acquire to wait for the end of a
338      * work execution.
339      */

340     public synchronized CountDownLatch provideEndLatch() {
341         return endLatch;
342     }
343
344     public String JavaDoc toString() {
345         return "Work :" + adaptee;
346     }
347
348 }
349
Popular Tags