KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > easybeans > jca > workmanager > ResourceWorkManager


1 /**
2  * EasyBeans
3  * Copyright (C) 2006 Bull S.A.S.
4  * Contact: easybeans@objectweb.org
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA
20  *
21  * --------------------------------------------------------------------------
22  * $Id: ResourceWorkManager.java 1140 2006-10-06 09:21:59Z benoitf $
23  * --------------------------------------------------------------------------
24  */

25
26 package org.objectweb.easybeans.jca.workmanager;
27
28 import java.util.LinkedList JavaDoc;
29
30 import javax.resource.spi.work.ExecutionContext JavaDoc;
31 import javax.resource.spi.work.Work JavaDoc;
32 import javax.resource.spi.work.WorkCompletedException JavaDoc;
33 import javax.resource.spi.work.WorkEvent JavaDoc;
34 import javax.resource.spi.work.WorkException JavaDoc;
35 import javax.resource.spi.work.WorkListener JavaDoc;
36 import javax.resource.spi.work.WorkManager JavaDoc;
37 import javax.resource.spi.work.WorkRejectedException JavaDoc;
38 import javax.transaction.NotSupportedException JavaDoc;
39 import javax.transaction.SystemException JavaDoc;
40 import javax.transaction.TransactionManager JavaDoc;
41 import javax.transaction.xa.Xid JavaDoc;
42
43 import org.objectweb.easybeans.log.JLog;
44 import org.objectweb.easybeans.log.JLogFactory;
45 import org.objectweb.jotm.Current;
46
47 /**
48  * Implementation of the Resource Work Manager API.
49  * @author Philippe Durieux (JOnAS)
50  * @author Florent Benoit (EasyBeans)
51  */

52 public class ResourceWorkManager implements WorkManager JavaDoc {
53
54     /**
55      * MilliSeconds value.
56      */

57     private static final long MILLISECONDS = 1000L;
58
59     /**
60      * Logger.
61      */

62     private static JLog logger = JLogFactory.getLog(ResourceWorkManager.class);
63
64     /**
65      * List of ResourceWork (which wrap Work object).
66      */

67     private LinkedList JavaDoc<ResourceWork> workList = new LinkedList JavaDoc<ResourceWork>();
68
69     /**
70      * Identifier of this pool.
71      */

72     private static int poolnumber = 0;
73
74     /**
75      * Thread number (when building ResourceWorkThread, it assigns a new thread
76      * number).
77      */

78     private static int threadnumber = 0;
79
80     /**
81      * The maximum size of the pool.
82      */

83     private int maxpoolsz;
84
85     /**
86      * The minimum size of the pool.
87      */

88     private int minpoolsz;
89
90     /**
91      * The current size of thread pool.
92      */

93     private int poolsz;
94
95     /**
96      * Threads that are ready to work.
97      */

98     private int freeThreads;
99
100     /**
101      * The time to wait (in millisec).
102      */

103     private long waitingTime;
104
105     /**
106      * Pool status : by default, it is not stopped.
107      */

108     private boolean stopped = false;
109
110     /**
111      * Wait few more seconds when waiting.
112      */

113     private static final long FEW_MORE_SECONDS = 3000;
114
115     /**
116      * TransactionManager to use.
117      */

118     private TransactionManager JavaDoc transactionManager;
119
120     /**
121      * Default Constructor.
122      * @param minsz the minimum pool size
123      * @param maxsz the maximum pool size
124      * @param transactionManager the transaction manager to use.
125      * @param threadwait max time in seconds a thread will wait.
126      */

127     @SuppressWarnings JavaDoc("boxing")
128     public ResourceWorkManager(final TransactionManager JavaDoc transactionManager, final int minsz, final int maxsz,
129             final long threadwait) {
130         this.minpoolsz = minsz;
131         this.maxpoolsz = maxsz;
132         this.waitingTime = threadwait * MILLISECONDS;
133         this.transactionManager = transactionManager;
134         // new identifier
135
poolnumber++;
136         if (logger.isDebugEnabled()) {
137             logger.debug("thread pool {0}", poolnumber);
138             logger.debug("minpool size = {0} and maxpool sizez = {1}", minsz, maxsz);
139         }
140         // Build threads for work.
141
for (poolsz = 0; poolsz < minsz; poolsz++) {
142             ResourceWorkThread resourceWorkThread = new ResourceWorkThread(this, poolnumber, threadnumber++);
143             resourceWorkThread.start();
144         }
145     }
146
147     /**
148      * @return current pool size
149      */

150     public int getCurrentPoolSize() {
151         return poolsz;
152     }
153
154     /**
155      * @return min pool size
156      */

157     public int getMinPoolSize() {
158         return minpoolsz;
159     }
160
161     /**
162      * @return max pool size
163      */

164     public int getMaxPoolSize() {
165         return maxpoolsz;
166     }
167
168     /**
169      * Sets the min pool size.
170      * @param minsz the min pool size.
171      */

172     public void setMinPoolSize(final int minsz) {
173         minpoolsz = minsz;
174     }
175
176     /**
177      * Sets the max pool size.
178      * @param maxsz the max pool size.
179      */

180     public void setMaxPoolSize(final int maxsz) {
181         maxpoolsz = maxsz;
182     }
183
184     /**
185      * Accepts a Work instance for processing. This call blocks until the Work
186      * instance completes execution. There is no guarantee on when the accepted
187      * Work instance would start execution ie., there is no time constraint to
188      * start execution.
189      * @param work The unit of work to be done. Could be long or short-lived.
190      * @throws WorkRejectedException a Work instance has been rejected from
191      * further processing.
192      * @throws WorkCompletedException a Work instance has completed execution
193      * with an exception.
194      * @throws WorkException if work is not done
195      */

196     public void doWork(final Work JavaDoc work) throws WorkRejectedException JavaDoc, WorkCompletedException JavaDoc, WorkException JavaDoc {
197         doMyWork(work, INDEFINITE, null, null, 0);
198     }
199
200     /**
201      * Accepts a Work instance for processing. This call blocks until the Work
202      * instance completes execution.
203      * @param work The unit of work to be done. Could be long or short-lived.
204      * @param timeout a time duration (in milliseconds) within which the
205      * execution of the Work instance must start. Otherwise, the Work
206      * instance is rejected with a WorkRejectedException set to an
207      * appropriate error code (WorkRejectedException.TIMED_OUT).
208      * @param executionContext an object containing the execution context with
209      * which the submitted Work instance must be executed.
210      * @param workListener an object which would be notified when the various
211      * Work processing events (work accepted, work rejected, work
212      * started, work completed) occur.
213      * @throws WorkRejectedException a Work instance has been rejected from
214      * further processing.
215      * @throws WorkCompletedException a Work instance has completed execution
216      * with an exception.
217      * @throws WorkException if work is not done
218      */

219     public void doWork(final Work JavaDoc work, final long timeout, final ExecutionContext JavaDoc executionContext,
220             final WorkListener JavaDoc workListener) throws WorkRejectedException JavaDoc, WorkCompletedException JavaDoc, WorkException JavaDoc {
221         if (workListener != null) {
222             workListener.workAccepted(new WorkEvent JavaDoc(this, WorkEvent.WORK_ACCEPTED, work, null));
223         }
224         doMyWork(work, timeout, executionContext, workListener, System.currentTimeMillis());
225     }
226
227     /**
228      * Accepts a Work instance for processing. This call blocks until the Work
229      * instance starts execution but not until its completion. There is no
230      * guarantee on when the accepted Work instance would start execution ie.,
231      * there is no time constraint to start execution.
232      * @param work The unit of work to be done. Could be long or short-lived.
233      * @return the time elapsed (in milliseconds) from Work acceptance until
234      * start of execution. Note, this does not offer real-time
235      * guarantees. It is valid to return -1, if the actual start delay
236      * duration is unknown.
237      * @throws WorkRejectedException a Work instance has been rejected from
238      * further processing.
239      * @throws WorkException if work is not started
240      */

241     public long startWork(final Work JavaDoc work) throws WorkRejectedException JavaDoc, WorkException JavaDoc {
242         return startWork(work, INDEFINITE, null, null);
243     }
244
245     /**
246      * Accepts a Work instance for processing. This call blocks until the Work
247      * instance starts execution but not until its completion. There is no
248      * guarantee on when the accepted Work instance would start execution ie.,
249      * there is no time constraint to start execution.
250      * @param work The unit of work to be done. Could be long or short-lived.
251      * @param timeout a time duration (in milliseconds) within which the
252      * execution of the Work instance must start. Otherwise, the Work
253      * instance is rejected with a WorkRejectedException set to an
254      * appropriate error code (WorkRejectedException.TIMED_OUT).
255      * @param executionContext an object containing the execution context with
256      * which the submitted Work instance must be executed.
257      * @param workListener an object which would be notified when the various
258      * Work processing events (work accepted, work rejected, work
259      * started, work completed) occur.
260      * @return the time elapsed (in milliseconds) from Work acceptance until
261      * start of execution. Note, this does not offer real-time
262      * guarantees. It is valid to return -1, if the actual start delay
263      * duration is unknown.
264      * @throws WorkRejectedException a Work instance has been rejected from
265      * further processing.
266      * @throws WorkException if work is not started
267      */

268     public long startWork(final Work JavaDoc work, final long timeout, final ExecutionContext JavaDoc executionContext,
269             final WorkListener JavaDoc workListener) throws WorkRejectedException JavaDoc, WorkException JavaDoc {
270
271         ResourceWork resourceWork = new ResourceWork(work, timeout, executionContext, workListener);
272         if (workListener != null) {
273             workListener.workAccepted(new WorkEvent JavaDoc(this, WorkEvent.WORK_ACCEPTED, work, null));
274         }
275         long starttime = System.currentTimeMillis();
276         long duration = 0;
277         synchronized (workList) {
278             workList.add(resourceWork);
279             if (poolsz < maxpoolsz && workList.size() > freeThreads) {
280                 // We need one more thread.
281
poolsz++;
282                 ResourceWorkThread resourceWorkThread = new ResourceWorkThread(this, threadnumber++, poolnumber);
283                 resourceWorkThread.start();
284             } else {
285                 workList.notify();
286             }
287         }
288         // Wait until my work is started.
289
boolean started = false;
290         synchronized (resourceWork) {
291             if (!resourceWork.isStarted()) {
292                 try {
293                     // No need to wait after timeout is elapsed
294
long waittime = waitingTime;
295                     if (timeout < waittime) {
296                         waittime = timeout + FEW_MORE_SECONDS;
297                     }
298                     resourceWork.wait(waittime);
299                 } catch (InterruptedException JavaDoc e) {
300                     throw new WorkRejectedException JavaDoc("Interrupted");
301                 }
302             }
303             started = resourceWork.isStarted();
304         }
305         duration = System.currentTimeMillis() - starttime;
306         if (!started) {
307             synchronized (workList) {
308                 // Remove the work in the list
309
if (!workList.remove(resourceWork)) {
310                     logger.debug("Cannot remove the work");
311                 }
312                 throw new WorkRejectedException JavaDoc(WorkException.START_TIMED_OUT);
313             }
314         }
315         return duration;
316     }
317
318     /**
319      * Accepts a Work instance for processing. This call does not block and
320      * returns immediately once a Work instance has been accepted for
321      * processing. There is no guarantee on when the submitted Work instance
322      * would start execution ie., there is no time constraint to start
323      * execution.
324      * @param work The unit of work to be done. Could be long or short-lived.
325      * @throws WorkRejectedException - indicates that a Work instance has been
326      * rejected from further processing. This can occur due to internal
327      * factors.
328      * @throws WorkException if work is not scheduled.
329      */

330     public void scheduleWork(final Work JavaDoc work) throws WorkRejectedException JavaDoc, WorkException JavaDoc {
331         scheduleWork(work, INDEFINITE, null, null);
332     }
333
334     /**
335      * Accepts a Work instance for processing. This call does not block and
336      * returns immediately once a Work instance has been accepted for
337      * processing. There is no guarantee on when the submitted Work instance
338      * would start execution ie., there is no time constraint to start
339      * execution.
340      * @param work The unit of work to be done. Could be long or short-lived.
341      * @param timeout a time duration (in milliseconds) within which the
342      * execution of the Work instance must start. Otherwise, the Work
343      * instance is rejected with a WorkRejectedException set to an
344      * appropriate error code (WorkRejectedException.TIMED_OUT).
345      * @param executionContext an object containing the execution context with
346      * which the submitted Work instance must be executed.
347      * @param workListener an object which would be notified when the various
348      * Work processing events (work accepted, work rejected, work
349      * started, work completed) occur.
350      * @throws WorkRejectedException a Work instance has been rejected from
351      * further processing.
352      * @throws WorkException if work is not scheduled.
353      */

354     public void scheduleWork(final Work JavaDoc work, final long timeout, final ExecutionContext JavaDoc executionContext,
355             final WorkListener JavaDoc workListener) throws WorkRejectedException JavaDoc, WorkException JavaDoc {
356
357         ResourceWork resourceWork = new ResourceWork(work, timeout, executionContext, workListener);
358         if (workListener != null) {
359             workListener.workAccepted(new WorkEvent JavaDoc(this, WorkEvent.WORK_ACCEPTED, work, null));
360         }
361         synchronized (workList) {
362             workList.add(resourceWork);
363             if (poolsz < maxpoolsz && workList.size() > freeThreads) {
364                 // We need one more thread.
365
poolsz++;
366                 ResourceWorkThread resourceWorkThread = new ResourceWorkThread(this, threadnumber++, poolnumber);
367                 resourceWorkThread.start();
368             } else {
369                 // Just wake up a thread waiting for work.
370
workList.notify();
371             }
372         }
373     }
374
375     /**
376      * Internal method doing the work.
377      * @param work The unit of work to be done. Could be long or short-lived.
378      * @param timeout a time duration (in milliseconds) within which the
379      * execution of the Work instance must start. Otherwise, the Work
380      * instance is rejected with a WorkRejectedException set to an
381      * appropriate error code (WorkRejectedException.TIMED_OUT).
382      * @param executionContext an object containing the execution context with
383      * which the submitted Work instance must be executed.
384      * @param workListener an object which would be notified when the various
385      * Work processing events (work accepted, work rejected, work
386      * started, work completed) occur.
387      * @param creationTime the date of the creation of the work
388      * @throws WorkException if work is not performed.
389      */

390     @SuppressWarnings JavaDoc("boxing")
391     private void doMyWork(final Work JavaDoc work, final long timeout, final ExecutionContext JavaDoc executionContext,
392             final WorkListener JavaDoc workListener, final long creationTime) throws WorkException JavaDoc {
393
394         // Notify the listener that the work is started or rejected by timeout.
395
if (workListener != null) {
396             long duration = System.currentTimeMillis() - creationTime;
397             if (duration > timeout) {
398                 // This can occur only in case of scheduleWork
399
logger.warn("REJECTED: duration= {0}", duration);
400                 workListener.workRejected(new WorkEvent JavaDoc(this, WorkEvent.WORK_REJECTED, work, null));
401                 return;
402             }
403             workListener.workStarted(new WorkEvent JavaDoc(this, WorkEvent.WORK_STARTED, work, null));
404         }
405
406         // Setup ExecutionContext
407
Xid JavaDoc xid = null;
408         if (executionContext != null) {
409             xid = executionContext.getXid();
410             if (xid != null) {
411                 long txtimeout = executionContext.getTransactionTimeout();
412                 try {
413                     if (txtimeout != WorkManager.UNKNOWN) {
414                         ((Current) transactionManager).begin(xid, txtimeout);
415                     } else {
416                         ((Current) transactionManager).begin(xid);
417                     }
418                 } catch (NotSupportedException JavaDoc e) {
419                     throw new WorkException JavaDoc("Error starting a new transaction", e);
420                 } catch (SystemException JavaDoc e) {
421                     throw new WorkException JavaDoc("Error starting a new transaction", e);
422                 }
423             }
424         }
425
426         try {
427             work.run();
428             // Notify the listener that the work is completed.
429
if (workListener != null) {
430                 workListener.workCompleted(new WorkEvent JavaDoc(this, WorkEvent.WORK_COMPLETED, work, null));
431             }
432         } catch (Exception JavaDoc e) {
433             if (workListener != null) {
434                 workListener.workCompleted(new WorkEvent JavaDoc(this, WorkEvent.WORK_COMPLETED, work, null));
435             }
436             throw new WorkCompletedException JavaDoc(e);
437         } finally {
438             if (xid != null) {
439                 ((Current) transactionManager).clearThreadTx();
440             }
441         }
442     }
443
444     /**
445      * Do the next JWork object to be run.
446      * @throws WorkException if work is not done
447      * @throws InterruptedException if one object can't wait.
448      * @throws ResourceWorkManagerStoppedException if the manager is stopped.
449      */

450     public void nextWork() throws WorkException JavaDoc, InterruptedException JavaDoc, ResourceWorkManagerStoppedException {
451         ResourceWork run = null;
452         boolean haswait = false;
453         synchronized (workList) {
454             while (workList.isEmpty()) {
455                 if ((haswait && freeThreads > minpoolsz) || stopped) {
456                     poolsz--;
457                     throw new ResourceWorkManagerStoppedException("Manager is stopped");
458                 }
459                 try {
460                     freeThreads++;
461                     workList.wait(waitingTime);
462                     freeThreads--;
463                     haswait = true;
464                 } catch (InterruptedException JavaDoc e) {
465                     freeThreads--;
466                     poolsz--;
467                     throw e;
468                 }
469             }
470             run = workList.removeFirst();
471             // In case startWork() was called
472
synchronized (run) {
473                 logger.debug("Starting a new work");
474                 run.setStarted();
475                 run.notify();
476             }
477         }
478         doMyWork(run.getWork(), run.getTimeout(), run.getExecutionContext(), run.getWorkListener(), run
479                 .getCreationTime());
480     }
481
482     /**
483      * Remove this WorkManager : Stop all threads.
484      */

485     public synchronized void stopThreads() {
486         stopped = true;
487         notifyAll();
488         poolnumber--;
489     }
490
491 }
492
Popular Tags