KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > jonathan > libs > resources > JScheduler


1 /***
2  * Jonathan: an Open Distributed Processing Environment
3  * Copyright (C) 1999-2000 France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Release: 3.0
20  *
21  * Contact: jonathan@objectweb.org
22  *
23  * Author: Bruno Dumant
24  *
25  */

26
27
28 package org.objectweb.jonathan.libs.resources;
29
30 import org.objectweb.jonathan.apis.kernel.Context;
31 import org.objectweb.jonathan.apis.kernel.ContextFactory;
32 import org.objectweb.jonathan.apis.kernel.InternalException;
33 import org.objectweb.jonathan.apis.resources.Scheduler;
34 import org.objectweb.jonathan.apis.resources.Job;
35
36 import org.objectweb.util.monolog.api.BasicLevel;
37
38 /**
39  * Default {@link org.objectweb.jonathan.apis.resources.Scheduler scheduler}
40  * implementation.
41  * <p>
42  * This implementation has exactly the same behaviour as the default Java
43  * scheduling.
44  * <p>
45  * This scheduler manages a pool of re-usable threads. The size of this pool may
46  * be customized in the
47  * {@link org.objectweb.jonathan.apis.kernel.Kernel#newConfiguration(Class)
48  * bootstrap context}.
49  */

50 public class JScheduler implements Scheduler {
51
52    static int sched_nb = 0;
53
54    String JavaDoc sched_name;
55    int id = 0;
56
57    /** Max number of idle threads in the thread pool. */
58    int max_waiting;
59
60    /**
61     * Indicates whether warning message should be printed to stderr.
62     * <p>
63     * This value is defined under the name "/jonathan/JScheduler/verbose"
64     * in the {@link org.objectweb.jonathan.apis.kernel.Kernel#newConfiguration(Class)
65     * bootstrap context}.
66     * <p>
67     * This variable is not used in the current implementation.
68     */

69    public boolean verbose;
70
71    /** the pool of waiting jobs */
72    JJob[] pool;
73
74    /** number of threads waiting for a task. */
75    int waiting;
76
77    protected ContextFactory context_factory;
78    
79    /**
80     * Returns a new scheduler.
81     * @param context_factory a {@link ContextFactory context factory}.
82     */

83    public JScheduler(ContextFactory context_factory) {
84       this(5,false,context_factory);
85    }
86
87    /**
88     * Returns a new scheduler.
89     * @param max_waiting maximum number of idle threads
90     * @param verbose indicates whether warning messsages should be output on
91     stderr.
92     * @param context_factory a {@link ContextFactory context factory}.
93     */

94    public JScheduler(int max_waiting,boolean verbose,
95                      ContextFactory context_factory) {
96       this.max_waiting = max_waiting;
97       this.verbose = verbose;
98       this.context_factory = context_factory;
99       sched_name = "p" + (sched_nb++) + ".";
100       pool = new JJob[max_waiting];
101       waiting = 0;
102    }
103
104    /**
105     * Returns a new job created by the scheduler.
106     * @return a new job created by the scheduler.
107     */

108    public synchronized Job newJob() {
109       JJob job = null;
110       if (waiting == 0) {
111          // add a means to know whether there are more threads running than the
112
// pool size
113
job = new JJob(sched_name + (id++));
114       } else {
115          job = pool[--waiting];
116       }
117       return job;
118    }
119
120    /**
121     * Returns the currently executing job (the job performing the call).
122     * @return the currently executing job.
123     */

124    public Job getCurrent() {
125       if (Thread.currentThread() instanceof Job)
126          return (Job) Thread.currentThread();
127       else
128          return new KJob();
129    }
130
131    /**
132     * Calling this method gives the opportunity to the scheduler to re-schedule
133     * the currently executing jobs.
134     * <p>
135     * This implementation calls the static <code>yield()</code> method
136     * on the <code>Thread</code> class.
137     */

138    public void yield() {
139       Thread.yield();
140    }
141
142    /**
143     * Blocks the calling job until the {@link #notify(Object) <tt>notify</tt>} or
144     * {@link #notifyAll(Object) <tt>notifyAll</tt>} method is called providing the
145     * same lock identifier.
146     * <p>
147     * This implementation calls the standard <code>wait()</code> method
148     * on the provided <code>lock</code>.
149     *
150     * @param lock the lock identifier.
151     * @exception InterruptedException
152     */

153    public void wait(Object JavaDoc lock) throws InterruptedException JavaDoc {
154       lock.wait();
155    }
156
157    /**
158     * Blocks the calling job until the {@link #notify(Object) <tt>notify</tt>} or
159     * {@link #notifyAll(Object) <tt>notifyAll</tt>} method is called providing the
160     * same lock identifier.
161     * <p>
162     * This implementation calls the standard <code>wait()</code> method
163     * on the provided <code>lock</code>.
164     *
165     * @param lock the lock identifier.
166     * @param millis a time out in milliseconds.
167     * @exception InterruptedException
168     */

169    public void wait(Object JavaDoc lock,long millis) throws InterruptedException JavaDoc {
170       lock.wait(millis);
171    }
172
173    /**
174     * Unblocks a job {@link #wait(Object) waiting} on the lock.
175     * <p>
176     * This implementation calls the standard <code>notify()</code> method
177     * on the provided <code>lock</code>.
178     *
179     * @param lock the lock identifier.
180     */

181    public void notify(Object JavaDoc lock) {
182       lock.notify();
183    }
184
185    /**
186     * Unblocks all jobs {@link #wait(Object) waiting} on the lock.
187     * <p>
188     * This implementation calls the standard <code>notifyAll()</code> method
189     * on the provided <code>lock</code>.
190     *
191     * @param lock the lock identifier.
192     */

193    public void notifyAll(Object JavaDoc lock) {
194       lock.notifyAll();
195    }
196
197    /**
198     * Causes the calling job to be removed from the set of jobs managed by the
199     * target scheduler. It is necessary to call this method before every possibly
200     * blocking method so that the scheduler can give a chance to run to another job.
201     * <p>
202     * This implementation has nothing to do.
203     */

204    public void escape() {}
205
206    /**
207     * Causes a job {@link #escape() "escaped"} from the scheduler to be re-admitted
208     * in the set of jobs managed by the target scheduler.
209     * <p>
210     * This implementation has nothing to do.
211     */

212    public void enter() {}
213
214    boolean register(JJob job) {
215       if (waiting < pool.length) {
216          pool[waiting++] = job;
217          return true;
218       } else {
219          return false;
220       }
221    }
222
223    class JJob extends Thread JavaDoc implements Job {
224       Runnable JavaDoc task;
225       Context context;
226
227       JJob(String JavaDoc name) {
228          super(name);
229          setDaemon(true);
230          task = null;
231          context = null;
232          super.start();
233       }
234
235       /** Runs the task. */
236       public final void run() {
237          try {
238             synchronized (this) {
239                while (task == null) {
240                   this.wait();
241                }
242             }
243             while (true) {
244                //System.err.println(this + " running");
245
task.run();
246                if (context != null) {
247                   context.release();
248                   context = null;
249                }
250                //System.err.println(this + " done");
251
synchronized(JScheduler.this) {
252                   if (register(this)) {
253                      task = null;
254                   } else {
255                      return;
256                   }
257                }
258                if (task == null) {
259                   synchronized (this) {
260                      while (task == null) {
261                         this.wait();
262                      }
263                   }
264                }
265             }
266          } catch (InterruptedException JavaDoc e) {}
267       }
268
269       public synchronized void run(Runnable JavaDoc message) {
270          if (task != null) {
271             throw new InternalException("Can't use this job more than once");
272          } else {
273             task = message;
274             this.notify();
275          }
276       }
277
278       public Context getContext() {
279          if (context == null) {
280             context = context_factory.newContext();
281          }
282          return context;
283       }
284
285       public String JavaDoc toString() {
286          return getName();
287       }
288    }
289
290    private static ThreadLocal JavaDoc threadLocalContext = new ThreadLocal JavaDoc();
291
292    /**
293     * SylvainHack alert!
294     *
295     * In the case of servlet-driven communication the current thread
296     * is not a Job managed by this scheduler. In this case trying to
297     * get the current job context necessarily fails.
298     *
299     * This simply encapsulates an existing Thread into a Job. Thus,
300     * trying to access methods such as getCurrent() in JScheduler
301     * will simply create this Job...
302     *
303     * Note: set the context to be ThreadLocal or something...
304     */

305    class KJob implements Job {
306
307       public KJob() {
308       }
309
310       /*
311        * This is invalid. By definition a KJob encapsulates an
312        * already running thread, and should not be reused.
313        */

314       public void run(Runnable JavaDoc m) {
315          // System.err.println("KJob being asked to run a new task... Alert!");
316
// begin log
317
if ((LoggerProvider.logger != null)
318              &&(LoggerProvider.logger.isLoggable(BasicLevel.WARN))) {
319             LoggerProvider.logger.log(BasicLevel.WARN,"KJob being asked to run a new task... Alert!");
320          }
321          // end log
322
// sadly we can't throw an exception without changing the
323
// api...
324
}
325
326       public Context getContext() {
327          if (threadLocalContext.get() == null) {
328             threadLocalContext.set(context_factory.newContext());
329          }
330
331          return (Context)threadLocalContext.get();
332       }
333    }
334
335
336
337 }
338
339
340
Popular Tags