KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > javolution > realtime > ConcurrentContext


1 /*
2  * Javolution - Java(TM) Solution for Real-Time and Embedded Systems
3  * Copyright (C) 2005 - Javolution (http://javolution.org/)
4  * All rights reserved.
5  *
6  * Permission to use, copy, modify, and distribute this software is
7  * freely granted, provided that this notice is preserved.
8  */

9 package javolution.realtime;
10
11 import javolution.JavolutionError;
12
13 /**
14  * <p> This class represents a concurrent context; it is used to accelerate
15  * execution of concurrent algorithms on multi-processors systems.</p>
16  *
17  * <p> When a thread enters a concurrent context, it may execute multiple
18  * concurrent {@link Logic logics} by calling any of the
19  * <code>ConcurrentContext.execute(logic, arg0, arg1, ...)</code>
20  * static methods. Each logic is then executed by the current thread or by
21  * {@link ConcurrentThread concurrent threads} based upon availability.</p>
22  *
23  * <p> Only after all concurrent executions are completed, is the current
24  * thread allowed to exit the scope of the concurrent context
25  * (internal synchronization).</p>
26  *
27  * <p> Concurrent logics always execute within a {@link PoolContext} and
28  * do not generate garbage. {@link RealtimeObject Realtime objects} made
29  * available outside of the logic scope have therefore to be either
30  * {@link RealtimeObject#export exported} or {@link RealtimeObject#preserve
31  * preserved}.</p>
32  *
33  * <p> Concurrent contexts are easy to use, and provide automatic
34  * load-balancing between processors with almost no overhead. Here is
35  * an example of <b>concurrent/recursive/clean</b> (no garbage generated)
36  * implementation of the Karatsuba multiplication for large integers:<pre>
37  *
38  * public LargeInteger multiply(LargeInteger that) {
39  * if (that._size <= 1) {
40  * return multiply(that.longValue()); // Direct multiplication.
41  *
42  * } else { // Karatsuba multiplication in O(n<sup>Log(3)</sup>)
43  * int bitLength = this.bitLength();
44  * int n = (bitLength >> 1) + (bitLength & 1);
45  * LargeInteger b = this.shiftRight(n);
46  * LargeInteger a = this.minus(b.shiftLeft(n));
47  * LargeInteger d = that.shiftRight(n);
48  * LargeInteger c = that.minus(d.shiftLeft(n));
49  * StackReference&lt;LargeInteger&gt; ac = StackReference.newInstance();
50  * StackReference&lt;LargeInteger&gt; bd = StackReference.newInstance();
51  * StackReference&lt;LargeInteger&gt; abcd = StackReference.newInstance();
52  * ConcurrentContext.enter();
53  * try { // this = a + 2^n b, that = c + 2^n d
54  * ConcurrentContext.execute(MULTIPLY, a, c, ac);
55  * ConcurrentContext.execute(MULTIPLY, b, d, bd);
56  * ConcurrentContext.execute(MULTIPLY, a.plus(b), c.plus(d), abcd);
57  * } finally {
58  * ConcurrentContext.exit(); // Waits for all concurrent threads to complete.
59  * }
60  * return ac.get().plus(abcd.get().minus(ac.get()).minus(bd.get()).shiftLeft(n)).plus(bd.get().shiftLeft(2 * n));
61  * }
62  * }
63  * private static final Logic MULTIPLY = new Logic() {
64  * public void run(Object[] args) {
65  * LargeInteger left = (LargeInteger) args[0];
66  * LargeInteger right = (LargeInteger) args[1];
67  * StackReference result = (StackReference) args[2];
68  * result.set(left.times(right).export()); // Recursive.
69  * }
70  * };</pre>
71  *
72  * <p> Finally, it should be noted that concurrent contexts ensure the same
73  * behavior whether or not the execution is performed by the current
74  * thread or concurrent threads. In particular, the current {@link Context
75  * context} is inherited by concurrent threads and any exception raised
76  * during the concurrent logic executions is automatically propagated
77  * to the current thread.</p>
78  *
79  * @author <a HREF="mailto:jean-marie@dautelle.com">Jean-Marie Dautelle</a>
80  * @version 1.0, October 4, 2004
81  */

82 public final class ConcurrentContext extends Context {
83
84     /**
85      * Holds the queue size for concurrent logic executions.
86      */

87     private static final int QUEUE_SIZE = 256;
88
89     /**
90      * Holds the maximum number of arguments.
91      */

92     private static final int ARGS_SIZE = 6;
93
94     /**
95      * Holds the pending logics.
96      */

97     private Logic[] _logics = new Logic[QUEUE_SIZE];
98
99     /**
100      * Holds the pending logics arguments.
101      */

102     private Object[][] _args = new Object[QUEUE_SIZE][];
103
104     /**
105      * Holds the arguments pool.
106      */

107     private Object[][][] _argsPool = new Object[QUEUE_SIZE][ARGS_SIZE][];
108
109     /**
110      * Holds the number of pending logics.
111      */

112     private int _logicsCount;
113
114     /**
115      * Indicates if local concurrency is enabled.
116      */

117     private static final LocalReference ENABLED
118         = new LocalReference(new Boolean(true));
119
120     /**
121      * Holds the concurrency of this context (number of concurrent thread
122      * executing).
123      */

124     private int _concurrency;
125
126     /**
127      * Holds the number of threads having completed their execution
128      * (including the current thread).
129      */

130     private int _threadsDone;
131
132     /**
133      * Holds any error occuring during concurrent execution.
134      */

135     private Throwable _error;
136
137     /**
138      * Default constructor.
139      */

140     ConcurrentContext() {
141     }
142
143     /**
144      * Enables/disables {@link LocalContext local} concurrency.
145      *
146      * @param enabled <code>true</code> if concurrency is locally enabled;
147      * <code>false</code> otherwise.
148      */

149     public static void setEnabled(boolean enabled) {
150         ENABLED.set(new Boolean(enabled));
151     }
152
153     /**
154      * Indicates if concurrency is {@link LocalContext locally} enabled
155      * (default <code>true</code>).
156      *
157      * @return <code>true</code> if concurrency is locally enabled;
158      * <code>false</code> otherwise.
159      */

160     public static boolean isEnabled() {
161         return ((Boolean) ENABLED.get()).booleanValue();
162     }
163
164     /**
165      * Enters a {@link ConcurrentContext}.
166      */

167     public static void enter() {
168         ConcurrentContext ctx = (ConcurrentContext) push(CONCURRENT_CONTEXT_CLASS);
169         if (ctx == null) {
170             ctx = new ConcurrentContext();
171             push(ctx);
172         }
173     }
174     private static final Class CONCURRENT_CONTEXT_CLASS = new ConcurrentContext().getClass();
175
176     /**
177      * Executes the specified logic by a {@link ConcurrentThread} when possible.
178      * The specified logic is always executed within a {@link PoolContext}
179      * and inherits the context of the parent thread. Any exception or error
180      * during execution will be propagated to the current thread upon
181      * {@link #exit} of the concurrent context.
182      *
183      * @param logic the logic to execute concurrently when possible.
184      * @throws ClassCastException if the current context is not a
185      * {@link ConcurrentContext}.
186      */

187     public static void execute(Logic logic) {
188         ConcurrentContext ctx = (ConcurrentContext) currentContext();
189         if (ctx._logicsCount >= QUEUE_SIZE) {
190             ctx.flush();
191         }
192         ctx._args[ctx._logicsCount] = Logic.NO_ARG;
193         ctx._logics[ctx._logicsCount++] = logic;
194     }
195
196     /**
197      * Executes the specified logic with the specified argument.
198      *
199      * @param logic the logic to execute concurrently when possible.
200      * @param arg0 the logic argument.
201      * @throws ClassCastException if the current context is not a
202      * {@link ConcurrentContext}.
203      * @see #execute(ConcurrentContext.Logic)
204      */

205     public static void execute(Logic logic, Object arg0) {
206         ConcurrentContext ctx = (ConcurrentContext) currentContext();
207         if (ctx._logicsCount >= QUEUE_SIZE) {
208             ctx.flush();
209         }
210         Object[] args = ctx.getArgs(1);
211         args[0] = arg0;
212         ctx._logics[ctx._logicsCount++] = logic;
213     }
214
215     /**
216      * Executes the specified logic with the specified two arguments.
217      *
218      * @param logic the logic to execute concurrently when possible.
219      * @param arg0 the first argument.
220      * @param arg1 the second argument.
221      * @throws ClassCastException if the current context is not a
222      * {@link ConcurrentContext}.
223      * @see #execute(ConcurrentContext.Logic)
224      */

225     public static void execute(Logic logic, Object arg0, Object arg1) {
226         ConcurrentContext ctx = (ConcurrentContext) currentContext();
227         if (ctx._logicsCount >= QUEUE_SIZE) {
228             ctx.flush();
229         }
230         Object[] args = ctx.getArgs(2);
231         args[0] = arg0;
232         args[1] = arg1;
233         ctx._logics[ctx._logicsCount++] = logic;
234     }
235
236     /**
237      * Executes the specified logic with the specified three arguments.
238      *
239      * @param logic the logic to execute concurrently when possible.
240      * @param arg0 the first argument.
241      * @param arg1 the second argument.
242      * @param arg2 the third argument.
243      * @throws ClassCastException if the current context is not a
244      * {@link ConcurrentContext}.
245      * @see #execute(ConcurrentContext.Logic)
246      */

247     public static void execute(Logic logic, Object arg0, Object arg1,
248             Object arg2) {
249         ConcurrentContext ctx = (ConcurrentContext) currentContext();
250         if (ctx._logicsCount >= QUEUE_SIZE) {
251             ctx.flush();
252         }
253         Object[] args = ctx.getArgs(3);
254         args[0] = arg0;
255         args[1] = arg1;
256         args[2] = arg2;
257         ctx._logics[ctx._logicsCount++] = logic;
258     }
259
260     /**
261      * Executes the specified logic with the specified four arguments.
262      *
263      * @param logic the logic to execute concurrently when possible.
264      * @param arg0 the first argument.
265      * @param arg1 the second argument.
266      * @param arg2 the third argument.
267      * @param arg3 the fourth argument.
268      * @throws ClassCastException if the current context is not a
269      * {@link ConcurrentContext}.
270      * @see #execute(ConcurrentContext.Logic)
271      */

272     public static void execute(Logic logic, Object arg0, Object arg1,
273             Object arg2, Object arg3) {
274         ConcurrentContext ctx = (ConcurrentContext) currentContext();
275         if (ctx._logicsCount >= QUEUE_SIZE) {
276             ctx.flush();
277         }
278         Object[] args = ctx.getArgs(4);
279         args[0] = arg0;
280         args[1] = arg1;
281         args[2] = arg2;
282         args[3] = arg3;
283         ctx._logics[ctx._logicsCount++] = logic;
284     }
285
286     /**
287      * Executes the specified logic with the specified five arguments.
288      *
289      * @param logic the logic to execute concurrently when possible.
290      * @param arg0 the first argument.
291      * @param arg1 the second argument.
292      * @param arg2 the third argument.
293      * @param arg3 the fourth argument.
294      * @param arg4 the fifth argument.
295      * @throws ClassCastException if the current context is not a
296      * {@link ConcurrentContext}.
297      * @see #execute(ConcurrentContext.Logic)
298      */

299     public static void execute(Logic logic, Object arg0, Object arg1,
300             Object arg2, Object arg3, Object arg4) {
301         ConcurrentContext ctx = (ConcurrentContext) currentContext();
302         if (ctx._logicsCount >= QUEUE_SIZE) {
303             ctx.flush();
304         }
305         Object[] args = ctx.getArgs(5);
306         args[0] = arg0;
307         args[1] = arg1;
308         args[2] = arg2;
309         args[3] = arg3;
310         args[4] = arg4;
311         ctx._logics[ctx._logicsCount++] = logic;
312     }
313
314     /**
315      * Executes the specified logic with the specified six arguments.
316      *
317      * @param logic the logic to execute concurrently when possible.
318      * @param arg0 the first argument.
319      * @param arg1 the second argument.
320      * @param arg2 the third argument.
321      * @param arg3 the fourth argument.
322      * @param arg4 the fifth argument.
323      * @param arg5 the sixth argument.
324      * @throws ClassCastException if the current context is not a
325      * {@link ConcurrentContext}.
326      * @see #execute(ConcurrentContext.Logic)
327      */

328     public static void execute(Logic logic, Object arg0, Object arg1,
329             Object arg2, Object arg3, Object arg4, Object arg5) {
330         ConcurrentContext ctx = (ConcurrentContext) currentContext();
331         if (ctx._logicsCount >= QUEUE_SIZE) {
332             ctx.flush();
333         }
334         Object[] args = ctx.getArgs(6);
335         args[0] = arg0;
336         args[1] = arg1;
337         args[2] = arg2;
338         args[3] = arg3;
339         args[4] = arg4;
340         args[5] = arg5;
341         ctx._logics[ctx._logicsCount++] = logic;
342     }
343
344     /**
345      * Exits the {@link ConcurrentContext}. This method blocks until all
346      * concurrent executions within the current context are completed.
347      * Errors and exceptions raised in concurrent threads are propagated here.
348      *
349      * @throws ClassCastException if the current context is not a
350      * {@link ConcurrentContext}.
351      * @throws ConcurrentException propagates any error or exception raised
352      * during the execution of a concurrent logic.
353      */

354     public static void exit() throws ConcurrentException {
355         ConcurrentContext ctx = (ConcurrentContext) Context.currentContext();
356         ctx.flush(); // Executes remaining logics.
357
Context.pop();
358
359         // Propagates any concurrent error to current thread.
360
if (ctx._error != null) {
361             ConcurrentException error = new ConcurrentException(ctx._error);
362             ctx._error = null; // Resets error flag.
363
throw error;
364         }
365     }
366
367     /**
368      * Stores the first error occuring during a concurrent execution.
369      *
370      * @param error an error raised while executing some concurrent logic.
371      */

372     synchronized void setError(Throwable error) {
373         if (_error == null) { // First error.
374
_error = error;
375         } // Else ignores subsequent errors.
376
}
377
378     /**
379      * Executes the next pending logic.
380      *
381      * @return <code>true</code> if some logics has been executed;
382      * <code>false</code> if there is no pending logic to execute.
383      */

384     boolean executeNext() {
385         int index;
386         synchronized (this) {
387             if (_logicsCount > 0) {
388                 index = --_logicsCount;
389             } else {
390                 _threadsDone++;
391                 this.notify();
392                 return false;
393             }
394         }
395         try {
396             Object[] args = _args[index];
397             _logics[index].run(args);
398             _logics[index] = null;
399             for (int j = args.length; j > 0;) {
400                 args[--j] = null;
401             }
402         } catch (Throwable error) {
403             setError(error);
404         }
405         return true;
406     }
407
408     /**
409      * Executes all pending logics (blocking).
410      */

411     private void flush() {
412         // Current thread enters inner pool context in order to ensure
413
// that concurrent threads have no access to its local pools.
414
PoolContext.enter();
415         try {
416             _concurrency = ConcurrentContext.isEnabled() ? ConcurrentThread
417                     .execute(this) : 0;
418             while (executeNext()) {
419                 ((PoolContext) Context.currentContext()).recyclePools();
420             }
421             synchronized (this) {
422                 while (_threadsDone <= _concurrency) {
423                     this.wait();
424                 } // Exit when _threadsDone = _concurrency + 1 (current thread)
425
}
426         } catch (InterruptedException e) {
427             throw new JavolutionError(e);
428         } finally {
429             _threadsDone = 0;
430             PoolContext.exit();
431         }
432     }
433
434     /**
435      * Gets the next arguments array.
436      *
437      * @param length the array length (> 0).
438      * @return the next arguments array available.
439      */

440     private Object[] getArgs(int length) {
441         Object[] args = _argsPool[_logicsCount][length - 1];
442         if (args == null) {
443             args = new Object[length];
444             _argsPool[_logicsCount][length - 1] = args;
445         }
446         _args[_logicsCount] = args;
447         return args;
448     }
449
450     // Implements abstract method.
451
protected void dispose() {
452         _logics = null;
453         _args = null;
454         _argsPool = null;
455     }
456
457     /**
458      * <p> This abstract class represents some parameterized code which may be
459      * executed concurrently.</p>
460      */

461     public static abstract class Logic implements Runnable {
462
463         /**
464          * Executes this logic with no arguments.
465          */

466         public final void run() {
467             run(NO_ARG);
468         }
469
470         private static final Object[] NO_ARG = new Object[0];
471
472         /**
473          * Executes this logic with the specified arguments.
474          *
475          * @param args the arguments. The number of arguments depends upon
476          * the <code>ConcurrentContext.execute</code> method which
477          * has been called (e.g. if {@link ConcurrentContext#execute(
478          * ConcurrentContext.Logic, Object, Object)},
479          * has been called, then <code>(args.length == 2)</code>).
480          */

481         public abstract void run(Object[] args);
482     }
483
484 }
Popular Tags