KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > jonas_lib > JWorkManager


1 /**
2  * JOnAS: Java(TM) Open Application Server
3  * Copyright (C) 1999 Bull S.A.
4  * Contact: jonas-team@objectweb.org
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA
20  *
21  * --------------------------------------------------------------------------
22  * $Id: JWorkManager.java,v 1.7 2005/07/28 12:00:42 durieuxp Exp $
23  * --------------------------------------------------------------------------
24  */

25
26
27 package org.objectweb.jonas_lib;
28
29 import java.util.LinkedList JavaDoc;
30
31 import javax.resource.spi.work.ExecutionContext JavaDoc;
32 import javax.resource.spi.work.Work JavaDoc;
33 import javax.resource.spi.work.WorkCompletedException JavaDoc;
34 import javax.resource.spi.work.WorkEvent JavaDoc;
35 import javax.resource.spi.work.WorkException JavaDoc;
36 import javax.resource.spi.work.WorkListener JavaDoc;
37 import javax.resource.spi.work.WorkManager JavaDoc;
38 import javax.resource.spi.work.WorkRejectedException JavaDoc;
39 import javax.transaction.NotSupportedException JavaDoc;
40 import javax.transaction.SystemException JavaDoc;
41 import javax.transaction.xa.Xid JavaDoc;
42
43 import org.objectweb.jonas.common.Log;
44 import org.objectweb.jotm.Current;
45 import org.objectweb.transaction.jta.TransactionManager;
46 import org.objectweb.util.monolog.api.BasicLevel;
47 import org.objectweb.util.monolog.api.Logger;
48
49
50
51 /**
52  * Jonas Implementation of the Resource Work Manager
53  * @author durieuxp
54  */

55 public class JWorkManager implements WorkManager JavaDoc {
56
57     protected LinkedList JavaDoc workList = new LinkedList JavaDoc();
58
59     protected static int poolnumber = 0;
60     protected static int threadnumber = 0;
61
62     protected int maxpoolsz;
63     protected int minpoolsz;
64     protected int poolsz; // current size of thread pool
65
protected int freeThreads; // free threads ready to work
66
protected long waitingTime; // in millisec
67

68     protected boolean valid = true; // set to false when WorkManager is removed.
69

70     protected static final long FEW_MORE_SECONDS = 3000;
71
72     private static Logger logger = null;
73
74     private TransactionManager tm;
75
76     /**
77      * Constructor
78      * @param threadwait max time in seconds a thread will wait
79      */

80     public JWorkManager(int minsz, int maxsz, TransactionManager tm, long threadwait) {
81         minpoolsz = minsz;
82         maxpoolsz = maxsz;
83         waitingTime = threadwait * 1000L;
84         this.tm = tm;
85         poolnumber++;
86         logger = Log.getLogger(Log.JONAS_WORK_MGR_PREFIX);
87         if (logger.isLoggable(BasicLevel.DEBUG)) {
88             logger.log(BasicLevel.DEBUG, "thread pool #" + poolnumber);
89             logger.log(BasicLevel.DEBUG, "minpoolsz = " + minsz + " maxpoolsz = " + maxsz);
90         }
91         for (poolsz = 0; poolsz < minsz; poolsz++) {
92             WorkThread st = new WorkThread(this, threadnumber++, poolnumber);
93             st.start();
94         }
95     }
96
97     // --------------------------------------------------------------------------
98
// Management
99
// --------------------------------------------------------------------------
100

101     /**
102      * @return current pool size
103      */

104     public int getCurrentPoolSize() {
105         return poolsz;
106     }
107
108     /**
109      * @return min pool size
110      */

111     public int getMinPoolSize() {
112         return minpoolsz;
113     }
114
115     /**
116      * @return max pool size
117      */

118     public int getMaxPoolSize() {
119         return maxpoolsz;
120     }
121
122     /**
123      * Set the min pool size
124      * @param minsz
125      */

126     public void setMinPoolSize(int minsz) {
127         minpoolsz = minsz;
128     }
129
130     /**
131      * Set the max pool size
132      * @param maxsz
133      */

134     public void setMaxPoolSize(int maxsz) {
135         maxpoolsz = maxsz;
136     }
137
138     // --------------------------------------------------------------------------
139
// WorkManager implementation
140
// --------------------------------------------------------------------------
141

142     /**
143      * Accepts a Work instance for processing.
144      * This call blocks until the Work instance completes execution.
145      * There is no guarantee on when the accepted Work instance would start execution ie.,
146      * there is no time constraint to start execution.
147      * @param work The unit of work to be done. Could be long or short-lived.
148      * @throws WorkRejectedException a Work instance has been rejected from further processing.
149      * @throws WorkCompletedException a Work instance has completed execution with an exception.
150      * @throws WorkException
151      */

152     public void doWork(Work JavaDoc work) throws WorkException JavaDoc {
153         doMyWork(work, INDEFINITE, null, null, 0);
154     }
155
156     /**
157      * Accepts a Work instance for processing. This call blocks until the Work
158      * instance completes execution.
159      * @param work The unit of work to be done. Could be long or short-lived.
160      * @param timeout a time duration (in milliseconds) within which the
161      * execution of the Work instance must start. Otherwise, the Work
162      * instance is rejected with a WorkRejectedException set to an
163      * appropriate error code (WorkRejectedException.TIMED_OUT).
164      * @param ectx an object containing the execution context with which the
165      * submitted Work instance must be executed.
166      * @param listener an object which would be notified when the various Work
167      * processing events (work accepted, work rejected, work started,
168      * work completed) occur.
169      * @throws WorkRejectedException a Work instance has been rejected from
170      * further processing.
171      * @throws WorkCompletedException a Work instance has completed execution
172      * with an exception.
173      * @throws WorkException
174      */

175     public void doWork(Work JavaDoc work, long timeout, ExecutionContext JavaDoc ectx, WorkListener JavaDoc listener) throws WorkException JavaDoc {
176         if (logger.isLoggable(BasicLevel.DEBUG)) {
177             logger.log(BasicLevel.DEBUG, "");
178         }
179         if (listener != null) {
180             listener.workAccepted(new WorkEvent JavaDoc(this, WorkEvent.WORK_ACCEPTED, work, null));
181         }
182         doMyWork(work, timeout, ectx, listener, System.currentTimeMillis());
183     }
184
185     /**
186      * Accepts a Work instance for processing. This call blocks until the Work
187      * instance starts execution but not until its completion. There is no
188      * guarantee on when the accepted Work instance would start execution ie.,
189      * there is no time constraint to start execution.
190      * @param work The unit of work to be done. Could be long or short-lived.
191      * @return the time elapsed (in milliseconds) from Work acceptance until
192      * start of execution. Note, this does not offer real-time
193      * guarantees. It is valid to return -1, if the actual start delay
194      * duration is unknown.
195      * @throws WorkRejectedException a Work instance has been rejected from
196      * further processing.
197      * @throws WorkException
198      */

199     public long startWork(Work JavaDoc work) throws WorkException JavaDoc {
200         return startWork(work, INDEFINITE, null, null);
201     }
202
203     /**
204      * Accepts a Work instance for processing. This call blocks until the Work
205      * instance starts execution but not until its completion. There is no
206      * guarantee on when the accepted Work instance would start execution ie.,
207      * there is no time constraint to start execution.
208      * @param work The unit of work to be done. Could be long or short-lived.
209      * @param timeout a time duration (in milliseconds) within which the
210      * execution of the Work instance must start. Otherwise, the Work
211      * instance is rejected with a WorkRejectedException set to an
212      * appropriate error code (WorkRejectedException.TIMED_OUT).
213      * @param ectx an object containing the execution context with which the
214      * submitted Work instance must be executed.
215      * @param listener an object which would be notified when the various Work
216      * processing events (work accepted, work rejected, work started,
217      * work completed) occur.
218      * @return the time elapsed (in milliseconds) from Work acceptance until
219      * start of execution. Note, this does not offer real-time
220      * guarantees. It is valid to return -1, if the actual start delay
221      * duration is unknown.
222      * @throws WorkRejectedException a Work instance has been rejected from
223      * further processing.
224      * @throws WorkException
225      */

226     public long startWork(Work JavaDoc work, long timeout, ExecutionContext JavaDoc ectx, WorkListener JavaDoc listener) throws WorkException JavaDoc {
227         if (logger.isLoggable(BasicLevel.DEBUG)) {
228             logger.log(BasicLevel.DEBUG, "");
229         }
230         JWork mywork = new JWork(work, timeout, ectx, listener);
231         if (listener != null) {
232             listener.workAccepted(new WorkEvent JavaDoc(this, WorkEvent.WORK_ACCEPTED, work, null));
233         }
234         long starttime = System.currentTimeMillis();
235         long duration = 0;
236         synchronized (workList) {
237             workList.add(mywork);
238             if (poolsz < maxpoolsz && workList.size() > freeThreads) {
239                 // We need one more thread.
240
poolsz++;
241                 WorkThread st = new WorkThread(this, threadnumber++, poolnumber);
242                 st.start();
243             } else {
244                 workList.notify();
245             }
246         }
247         // Wait until my work is started.
248
boolean started = false;
249         synchronized (mywork) {
250             if (! mywork.isStarted()) {
251                 try {
252                     // No need to wait after timeout is elapsed
253
long waittime = waitingTime;
254                     if (timeout < waittime) {
255                         waittime = timeout + FEW_MORE_SECONDS;
256                     }
257                     mywork.wait(waittime);
258                 } catch (InterruptedException JavaDoc e) {
259                     throw new WorkRejectedException JavaDoc("Interrupted");
260                 }
261             }
262             started = mywork.isStarted();
263         }
264         duration = System.currentTimeMillis() - starttime;
265         if (! started) {
266             synchronized (workList) {
267                 // Remove the work in the list
268
if (! workList.remove(mywork)) {
269                     if (logger.isLoggable(BasicLevel.DEBUG)) {
270                         logger.log(BasicLevel.DEBUG, "cannot remove work");
271                     }
272                 }
273                 throw new WorkRejectedException JavaDoc(WorkException.START_TIMED_OUT);
274             }
275         }
276         return duration;
277     }
278
279     /**
280      * Accepts a Work instance for processing. This call does not block and
281      * returns immediately once a Work instance has been accepted for
282      * processing. There is no guarantee on when the submitted Work instance
283      * would start execution ie., there is no time constraint to start
284      * execution.
285      * @param work The unit of work to be done. Could be long or short-lived.
286      * @param timeout a time duration (in milliseconds) within which the
287      * execution of the Work instance must start. Otherwise, the Work
288      * instance is rejected with a WorkRejectedException set to an
289      * appropriate error code (WorkRejectedException.TIMED_OUT).
290      * @param ectx an object containing the execution context with which the
291      * submitted Work instance must be executed.
292      * @param listener an object which would be notified when the various Work
293      * processing events (work accepted, work rejected, work started,
294      * work completed) occur.
295      * @throws WorkRejectedException a Work instance has been rejected from
296      * further processing.
297      * @throws WorkException
298      */

299     public void scheduleWork(Work JavaDoc work) throws WorkException JavaDoc {
300         scheduleWork(work, INDEFINITE, null, null);
301     }
302
303     /**
304      * Accepts a Work instance for processing. This call does not block and
305      * returns immediately once a Work instance has been accepted for
306      * processing. There is no guarantee on when the submitted Work instance
307      * would start execution ie., there is no time constraint to start
308      * execution.
309      * @param work The unit of work to be done. Could be long or short-lived.
310      * @param timeout a time duration (in milliseconds) within which the
311      * execution of the Work instance must start. Otherwise, the Work
312      * instance is rejected with a WorkRejectedException set to an
313      * appropriate error code (WorkRejectedException.TIMED_OUT).
314      * @param ectx an object containing the execution context with which the
315      * submitted Work instance must be executed.
316      * @param listener an object which would be notified when the various Work
317      * processing events (work accepted, work rejected, work started,
318      * work completed) occur.
319      * @throws WorkRejectedException a Work instance has been rejected from
320      * further processing.
321      * @throws WorkException
322      */

323     public void scheduleWork(Work JavaDoc work, long timeout, ExecutionContext JavaDoc ectx, WorkListener JavaDoc listener) throws WorkException JavaDoc {
324         if (logger.isLoggable(BasicLevel.DEBUG)) {
325             logger.log(BasicLevel.DEBUG, "");
326         }
327         JWork mywork = new JWork(work, timeout, ectx, listener);
328         if (listener != null) {
329             listener.workAccepted(new WorkEvent JavaDoc(this, WorkEvent.WORK_ACCEPTED, work, null));
330         }
331          synchronized (workList) {
332             workList.add(mywork);
333             if (poolsz < maxpoolsz && workList.size() > freeThreads) {
334                 // We need one more thread.
335
poolsz++;
336                 WorkThread st = new WorkThread(this, threadnumber++, poolnumber);
337                 st.start();
338             } else {
339                 // Just wake up a thread waiting for work.
340
workList.notify();
341             }
342         }
343     }
344
345     /**
346      * Internal method doing the work.
347      */

348     private void doMyWork(Work JavaDoc work, long timeout, ExecutionContext JavaDoc ectx, WorkListener JavaDoc listener, long creationTime) throws WorkException JavaDoc {
349         if (logger.isLoggable(BasicLevel.DEBUG)) {
350             logger.log(BasicLevel.DEBUG, "timeout=" + timeout);
351         }
352         // Notify the listener that the work is started or rejected by timeout.
353
if (listener != null) {
354             long duration = System.currentTimeMillis() - creationTime;
355             if (duration > timeout) {
356                 // This can occur only in case of scheduleWork
357
logger.log(BasicLevel.WARN, "REJECTED: duration=" + duration);
358                listener.workRejected(new WorkEvent JavaDoc(this, WorkEvent.WORK_REJECTED, work, null));
359                 return;
360             }
361             listener.workStarted(new WorkEvent JavaDoc(this, WorkEvent.WORK_STARTED, work, null));
362         }
363
364         // Setup ExecutionContext
365
// TODO: Check this if doWork (same thread)
366
Xid JavaDoc xid = null;
367         if (ectx != null) {
368              xid = ectx.getXid();
369              if (xid != null) {
370                  long txtimeout = ectx.getTransactionTimeout();
371                  try {
372                      if (txtimeout != WorkManager.UNKNOWN) {
373                          ((Current) tm).begin(xid, txtimeout);
374                      } else {
375                          ((Current) tm).begin(xid);
376                      }
377                  } catch (NotSupportedException JavaDoc e) {
378                      throw new WorkException JavaDoc("Error starting a new transaction", e);
379                  } catch (SystemException JavaDoc e) {
380                      throw new WorkException JavaDoc("Error starting a new transaction", e);
381                  }
382              }
383         }
384
385         try {
386             work.run();
387             // Notify the listener that the work is completed.
388
if (listener != null) {
389                 listener.workCompleted(new WorkEvent JavaDoc(this, WorkEvent.WORK_COMPLETED, work, null));
390             }
391         } catch (Exception JavaDoc e) {
392             if (listener != null) {
393                 listener.workCompleted(new WorkEvent JavaDoc(this, WorkEvent.WORK_COMPLETED, work, null));
394             }
395             throw new WorkCompletedException JavaDoc(e);
396         } finally {
397             if (xid != null) {
398                 ((Current) tm).clearThreadTx();
399             }
400         }
401     }
402
403     /**
404      * Get the next JWork object to be run.
405      * @return next JWork object to be run, or null if thread must end.
406      */

407     public void nextWork() throws WorkException JavaDoc, InterruptedException JavaDoc {
408         JWork run = null;
409         boolean haswait = false;
410         synchronized (workList) {
411             while (workList.isEmpty()) {
412                 if ((haswait && freeThreads > minpoolsz) || !valid) {
413                     poolsz--;
414                     throw new InterruptedException JavaDoc("Thread ending");
415                 }
416                 try {
417                     freeThreads++;
418                     if (logger.isLoggable(BasicLevel.DEBUG)) {
419                         logger.log(BasicLevel.DEBUG, "waiting");
420                     }
421                     workList.wait(waitingTime);
422                     if (logger.isLoggable(BasicLevel.DEBUG)) {
423                         logger.log(BasicLevel.DEBUG, "notified");
424                     }
425                     freeThreads--;
426                     haswait = true;
427                 } catch (InterruptedException JavaDoc e) {
428                     freeThreads--;
429                     poolsz--;
430                     throw e;
431                 }
432             }
433             run = (JWork) workList.removeFirst();
434             // In case startWork() was called
435
synchronized(run) {
436                 if (logger.isLoggable(BasicLevel.DEBUG)) {
437                     logger.log(BasicLevel.DEBUG, "start new work");
438                 }
439                 run.setStarted();
440                 run.notify();
441             }
442         }
443         doMyWork(run.getWork(), run.getTimeout(), run.getExecutionContext(), run.getWorkListener(),
444                 run.getCreationTime());
445     }
446
447     /**
448      * Remove this WorkManager : Stop all threads
449      */

450     public synchronized void stopThreads() {
451         if (logger.isLoggable(BasicLevel.DEBUG)) {
452             logger.log(BasicLevel.DEBUG, "");
453         }
454         valid = false;
455         notifyAll();
456         poolnumber--;
457     }
458
459     class JWork {
460         private Work JavaDoc work;
461         private long timeout;
462         private ExecutionContext JavaDoc ectx;
463         private WorkListener JavaDoc listener;
464         private long creationTime;
465         private boolean started = false;
466
467         public JWork(Work JavaDoc work, long timeout, ExecutionContext JavaDoc ectx, WorkListener JavaDoc listener) {
468             this.work = work;
469             this.timeout = timeout;
470             this.ectx = ectx;
471             this.listener = listener;
472             creationTime = System.currentTimeMillis();
473             if (logger.isLoggable(BasicLevel.DEBUG)) {
474                 logger.log(BasicLevel.DEBUG, "timeout=" + timeout);
475             }
476         }
477
478         public Work JavaDoc getWork() {
479             return work;
480         }
481
482         public long getTimeout() {
483             return timeout;
484         }
485
486         public ExecutionContext JavaDoc getExecutionContext() {
487             return ectx;
488         }
489
490         public WorkListener JavaDoc getWorkListener() {
491             return listener;
492         }
493
494         public long getCreationTime() {
495             return creationTime;
496         }
497
498         public boolean isStarted() {
499             return started;
500         }
501
502         public void setStarted() {
503             if (logger.isLoggable(BasicLevel.DEBUG)) {
504                 logger.log(BasicLevel.DEBUG, "");
505             }
506             started = true;
507         }
508     }
509
510     /**
511      * Thread executing works for the work manager.
512      */

513     class WorkThread extends Thread JavaDoc {
514
515         private JWorkManager mgr;
516         private int number;
517
518         /**
519          * Constructor
520          * @param m The WorkManager
521          * @param num thread number
522          * @param wm workManager number
523          */

524         WorkThread(JWorkManager m, int num, int wm) {
525             mgr = m;
526             number = num;
527             setName("WorkThread-" + wm + "/" + num);
528         }
529
530         public void run() {
531             if (logger.isLoggable(BasicLevel.DEBUG)) {
532                 logger.log(BasicLevel.DEBUG, "running");
533             }
534             while (true) {
535                 try {
536                     mgr.nextWork();
537                 } catch (InterruptedException JavaDoc e) {
538                     if (logger.isLoggable(BasicLevel.DEBUG)) {
539                         logger.log(BasicLevel.DEBUG, "Exiting: ", e);
540                     }
541                     return;
542                 } catch (WorkException JavaDoc e) {
543                     logger.log(BasicLevel.ERROR, "Exception during work run: ", e);
544                 }
545             }
546         }
547
548     }
549
550 }
551
552
Popular Tags