KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > proactive > core > body > future > FuturePool


1 /*
2 * ################################################################
3 *
4 * ProActive: The Java(TM) library for Parallel, Distributed,
5 * Concurrent computing with Security and Mobility
6 *
7 * Copyright (C) 1997-2002 INRIA/University of Nice-Sophia Antipolis
8 * Contact: proactive-support@inria.fr
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
23 * USA
24 *
25 * Initial developer(s): The ProActive Team
26 * http://www.inria.fr/oasis/ProActive/contacts.html
27 * Contributor(s):
28 *
29 * ################################################################
30 */

31 package org.objectweb.proactive.core.body.future;
32
33 import org.objectweb.proactive.Body;
34 import org.objectweb.proactive.ProActive;
35 import org.objectweb.proactive.core.ProActiveRuntimeException;
36 import org.objectweb.proactive.core.UniqueID;
37 import org.objectweb.proactive.core.body.LocalBodyStore;
38 import org.objectweb.proactive.core.body.UniversalBody;
39 import org.objectweb.proactive.core.body.reply.Reply;
40 import org.objectweb.proactive.core.body.reply.ReplyImpl;
41 import org.objectweb.proactive.core.config.ProActiveConfiguration;
42 import org.objectweb.proactive.core.mop.Utils;
43
44 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
45 import org.objectweb.proactive.ext.security.SecurityNotAvailableException;
46
47
48 public class FuturePool extends Object JavaDoc implements java.io.Serializable JavaDoc {
49
50     protected boolean newState;
51
52     // table of future and ACs
53
private FutureMap futures;
54
55     // ID of the body corresponding to this futurePool
56
private UniqueID ownerBody;
57
58     // Active queue of AC services
59
private transient ActiveACQueue queueAC;
60
61     // toggle for enabling or disabling automatic continuation
62
private boolean acEnabled;
63
64     // table used for storing values which arrive in the futurePool BEFORE the registration
65
// of its corresponding future.
66
private java.util.HashMap JavaDoc valuesForFutures;
67
68     //
69
// -- CONSTRUCTORS -----------------------------------------------
70
//
71

72     public FuturePool() {
73         futures = new FutureMap();
74         valuesForFutures = new java.util.HashMap JavaDoc();
75         this.newState = false;
76         if ("enable".equals(ProActiveConfiguration.getACState()))
77             this.acEnabled = true;
78         else
79             this.acEnabled = false;
80         if (acEnabled) {
81             queueAC = new ActiveACQueue();
82             queueAC.start();
83         }
84     }
85
86     //
87
// -- STATIC ------------------------------------------------------
88
//
89

90     // this table is used to register destination before sending.
91
// So, a future could retreive its destination during serialization
92
// this table indexed by the thread which perform the registration.
93
static private java.util.Hashtable JavaDoc bodyDestination;
94
95     // to register in the table
96
static public void registerBodyDestination(UniversalBody dest) {
97         bodyDestination.put(Thread.currentThread(), dest);
98     }
99
100     // to clear an entry in the table
101
static public void removeBodyDestination() {
102         bodyDestination.remove(Thread.currentThread());
103     }
104
105     // to get a destination
106
static public UniversalBody getBodyDestination() {
107         return (UniversalBody) (bodyDestination.get(Thread.currentThread()));
108     }
109
110
111
112     // this table is used to register deserialized futures after receive
113
// So, futures to add in the local futurePool could be retreived
114
static private java.util.Hashtable JavaDoc incomingFutures;
115
116     // to register an incoming future in the table
117
public static void registerIncomingFuture(Future f) {
118         java.util.ArrayList JavaDoc listOfFutures = (java.util.ArrayList JavaDoc) incomingFutures.get(Thread.currentThread());
119         if (listOfFutures != null) {
120             listOfFutures.add(f);
121         } else {
122             java.util.ArrayList JavaDoc newListOfFutures = new java.util.ArrayList JavaDoc();
123             newListOfFutures.add(f);
124             incomingFutures.put(Thread.currentThread(), newListOfFutures);
125         }
126     }
127
128     // to remove an entry from the table
129
static public void removeIncomingFutures() {
130         incomingFutures.remove(Thread.currentThread());
131     }
132
133     // to get a list of incomingFutures
134
static public java.util.ArrayList JavaDoc getIncomingFutures() {
135         return (java.util.ArrayList JavaDoc) (incomingFutures.get(Thread.currentThread()));
136     }
137
138     // static init block
139
static {
140         bodyDestination = new java.util.Hashtable JavaDoc();
141         incomingFutures = new java.util.Hashtable JavaDoc();
142     }
143
144     //
145
// -- PUBLIC METHODS -----------------------------------------------
146
//
147

148     /**
149      * Setter of the ID of the body corresonding to this FuturePool
150      * @param i ID of the owner body.
151      */

152     public void setOwnerBody(UniqueID i) {
153         ownerBody = i;
154     }
155
156     /**
157      * Getter of the ID of the body corresonding to this FuturePool
158      */

159     public UniqueID getOwnerBody() {
160         return ownerBody;
161     }
162
163     /**
164      * To enable the automatic continuation behaviour for all futures in
165      * this FuturePool
166      * */

167     public void enableAC() {
168         this.queueAC = new ActiveACQueue();
169         this.queueAC.start();
170         this.acEnabled = true;
171     }
172
173     /**
174      * To disable the automatic continuation behaviour for all futures in
175      * this FuturePool
176      * */

177     public void disableAC() {
178         this.acEnabled = false;
179         this.queueAC.killMe();
180         this.queueAC = null;
181     }
182
183     /**
184      * Method called when a reply is recevied, ie a value is available for a future.
185      * This method perform local futures update, and put an ACService in the activeACqueue.
186      * @param id sequence id of the future to update
187      * @param creatorID ID of the body creator of the future to update
188      * @param result value to update with the futures
189      */

190     public synchronized void receiveFutureValue(long id, UniqueID creatorID, Object JavaDoc result) throws java.io.IOException JavaDoc {
191         
192         // get all aiwated futures
193
java.util.ArrayList JavaDoc futuresToUpdate = futures.getFuturesToUpdate(id, creatorID);
194
195         if (futuresToUpdate != null) {
196             Future future = (Future) (futuresToUpdate.get(0));
197             if (future != null) {
198                 future.receiveReply(result);
199             }
200             // if there are more than one future to update, we "give" deep copy
201
// of the result to the other futures to respect ProActive model
202
// We use here the migration tag to perform a simple serialization (ie
203
// without continuation side-effects)
204
setMigrationTag();
205             for (int i = 1; i < futuresToUpdate.size(); i++) {
206                 Future otherFuture = (Future) (futuresToUpdate.get(i));
207                 otherFuture.receiveReply(Utils.makeDeepCopy(result));
208             }
209             unsetMigrationTag();
210             stateChange();
211
212             // 2) create and put ACservices
213
if (acEnabled) {
214                 java.util.ArrayList JavaDoc bodiesToContinue = futures.getAutomaticContinuation(id, creatorID);
215                 if ((bodiesToContinue != null) && (bodiesToContinue.size() != 0)) {
216                     ProActiveSecurityManager psm = null;
217                      try {
218                          psm = ProActive.getBodyOnThis()
219                                         .getProActiveSecurityManager();
220                      } catch (SecurityNotAvailableException e) {
221                          psm = null;
222                      }
223                     queueAC.addACRequest(new ACService(bodiesToContinue, new ReplyImpl(creatorID, id, null, result,psm)));
224                 }
225             }
226
227             // 3) Remove futures from the futureMap
228
futures.removeFutures(id, creatorID);
229         } else {
230             // we have to store the result until future arrive
231
this.valuesForFutures.put(""+id+creatorID, result);
232         }
233     }
234
235
236     /**
237      * To put a future in the FutureMap
238      * @param id sequence id of the future
239      * @param creatorID UniqueID of the body which creates futureObject
240      * @param futureObject future to register
241      */

242     public synchronized void receiveFuture(Future futureObject) {
243         futureObject.setSenderID(ownerBody);
244         futures.receiveFuture(futureObject);
245         long id = futureObject.getID();
246         UniqueID creatorID = futureObject.getCreatorID();
247         if (valuesForFutures.get(""+id+creatorID) != null) {
248             try {
249                 this.receiveFutureValue(id, creatorID, valuesForFutures.remove(""+id+creatorID));
250             } catch (java.io.IOException JavaDoc e) {
251             }
252         }
253     }
254
255     /**
256      * To add an automatic contiunation, ie a destination body, for a particular future.
257      * @param id sequence id of the corresponding future
258      * @param creatorID UniqueID of the body which creates futureObject
259      * @param bodyDest body destination of this continuation
260      */

261     public void addAutomaticContinuation(long id, UniqueID creatorID, UniversalBody bodyDest) {
262         futures.addAutomaticContinuation(id, creatorID, bodyDest);
263     }
264
265
266
267     public synchronized void waitForReply() {
268         this.newState = false;
269         while (!newState) {
270             try {
271                 wait();
272             } catch (InterruptedException JavaDoc e) {
273                 e.printStackTrace();
274             }
275         }
276
277     }
278
279     /**
280      * To register a destination before sending a reques or a reply
281      * Registration key is the calling thread.
282      */

283     public void registerDestination(UniversalBody dest){
284         if (acEnabled)
285             FuturePool.registerBodyDestination(dest);
286     }
287
288     /**
289      * To clear registred destination for the calling thread.
290      */

291     public void removeDestination(){
292         if (acEnabled)
293             FuturePool.removeBodyDestination();
294     }
295
296
297     public void setMigrationTag() {
298         futures.setMigrationTag();
299     }
300
301     public void unsetMigrationTag() {
302         futures.unsetMigrationTag();
303     }
304
305     //
306
// -- PRIVATE METHODS -----------------------------------------------
307
//
308

309     private void stateChange() {
310         this.newState = true;
311         notifyAll();
312     }
313
314     //
315
// -- PRIVATE METHODS FOR SERIALIZATION -----------------------------------------------
316
//
317

318     private void writeObject(java.io.ObjectOutputStream JavaDoc out) throws java.io.IOException JavaDoc {
319         setMigrationTag();
320         out.defaultWriteObject();
321         if (acEnabled) {
322             // send the queue of AC requests
323
out.writeObject(queueAC.getQueue());
324             // stop the ActiveQueue thread
325
queueAC.killMe();
326         }
327     }
328
329     private void readObject(java.io.ObjectInputStream JavaDoc in) throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc {
330         in.defaultReadObject();
331         unsetMigrationTag();
332         if (acEnabled) {
333             // create a new ActiveACQueue
334
java.util.ArrayList JavaDoc queue = (java.util.ArrayList JavaDoc) (in.readObject());
335             queueAC = new ActiveACQueue(queue);
336             queueAC.start();
337         }
338         
339     }
340
341     
342     //--------------------------------INNER CLASS------------------------------------//
343

344     /**
345      * Active Queue for AC. This queue has his own thread to perform ACservices
346      * available in the queue. This thread is compliant with migration by using
347      * the threadStore of the body correponding to this FutureMap.
348      * Note that the ACServices are served in FIFO manner.
349      * @see ACservice
350      */

351     private class ActiveACQueue extends Thread JavaDoc {
352
353         private java.util.ArrayList JavaDoc queue;
354         private int counter;
355         private boolean kill;
356
357         //
358
// -- CONSTRUCTORS -----------------------------------------------
359
//
360

361         public ActiveACQueue() {
362             queue = new java.util.ArrayList JavaDoc();
363             counter = 0;
364             kill = false;
365             this.setName("Thread for AC");
366         }
367
368         public ActiveACQueue(java.util.ArrayList JavaDoc queue) {
369             this.queue = queue;
370             counter = queue.size();
371             kill = false;
372             this.setName("Thread for AC");
373         }
374
375         //
376
// -- PUBLIC METHODS -----------------------------------------------
377
//
378

379         /**
380          * return the current queue of ACServices to perform
381          */

382         public java.util.ArrayList JavaDoc getQueue() {
383             return queue;
384         }
385
386         /**
387          * Add a ACservice in the active queue.
388          */

389         public synchronized void addACRequest(ACService r) {
390             queue.add(r);
391             counter++;
392             notifyAll();
393         }
394
395         /**
396          * Return the oldest request in queue and remove it from the queue
397          */

398         public synchronized ACService removeACRequest() {
399             counter--;
400             return (ACService) (queue.remove(0));
401         }
402
403         /**
404          * To stop the thread.
405          */

406         public synchronized void killMe() {
407             kill = true;
408             notifyAll();
409         }
410
411         public void run() {
412             // get a reference on the owner body
413
// try until it's not null because deserialization of the body
414
// may be not finished when we restart the thread.
415
Body owner = null;
416             while (owner == null) {
417                 owner = LocalBodyStore.getInstance().getLocalBody(ownerBody);
418                 // it's a halfbody...
419
if (owner == null)
420                     owner = LocalBodyStore.getInstance().getLocalHalfBody(ownerBody);
421             }
422
423             while (true) {
424                 // if there is no AC to do, wait...
425
waitForAC();
426                 
427                 if (kill)
428                     break;
429                 
430                 // there are ACs to do !
431
try {
432                     // enter in the threadStore
433
owner.enterInThreadStore();
434
435                     // if body has migrated, kill the thread
436
if (kill)
437                         break;
438
439                     ACService toDo = this.removeACRequest();
440                     if (toDo != null) {
441                         toDo.doAutomaticContinuation();
442                     }
443
444                     // exit from the threadStore
445
owner.exitFromThreadStore();
446
447                 } catch (Exception JavaDoc e2) {
448                     // to unblock active object
449
owner.exitFromThreadStore();
450                     throw new ProActiveRuntimeException("Error while sending reply for AC ", e2);
451                 }
452             }
453         }
454
455         // synchronized wait on ACRequest queue
456
private synchronized void waitForAC() {
457             try {
458                 while ((counter == 0) && !kill) {
459                     wait();
460                 }
461             } catch (InterruptedException JavaDoc e) {
462                 e.printStackTrace();
463             }
464         }
465
466     }
467
468     /**
469      * A simple object for a request for an automatic continuation
470      * @see ActiveACQueue
471      */

472     private class ACService implements java.io.Serializable JavaDoc {
473
474         // bodies that have to be updated
475
private java.util.ArrayList JavaDoc dests;
476         // reply to send
477
private Reply reply;
478
479         //
480
// -- CONSTRUCTORS -----------------------------------------------
481
//
482

483         public ACService(java.util.ArrayList JavaDoc dests, Reply reply) {
484             this.dests = dests;
485             this.reply = reply;
486         }
487
488         //
489
// -- PUBLIC METHODS -----------------------------------------------
490
//
491

492         public void doAutomaticContinuation() throws java.io.IOException JavaDoc {
493             if (dests != null) {
494                 for (int i = 0; i < dests.size(); i++) {
495                     UniversalBody dest = (UniversalBody) (dests.get(i));
496                     registerDestination(dest);
497                     reply.send(dest);
498                     removeDestination();
499                 }
500             }
501         }
502     } //ACService
503

504 }
505
Popular Tags