1 31 package org.objectweb.proactive.core.body.future; 32 33 import org.objectweb.proactive.Body; 34 import org.objectweb.proactive.ProActive; 35 import org.objectweb.proactive.core.ProActiveRuntimeException; 36 import org.objectweb.proactive.core.UniqueID; 37 import org.objectweb.proactive.core.body.LocalBodyStore; 38 import org.objectweb.proactive.core.body.UniversalBody; 39 import org.objectweb.proactive.core.body.reply.Reply; 40 import org.objectweb.proactive.core.body.reply.ReplyImpl; 41 import org.objectweb.proactive.core.config.ProActiveConfiguration; 42 import org.objectweb.proactive.core.mop.Utils; 43 44 import org.objectweb.proactive.ext.security.ProActiveSecurityManager; 45 import org.objectweb.proactive.ext.security.SecurityNotAvailableException; 46 47 48 public class FuturePool extends Object implements java.io.Serializable { 49 50 protected boolean newState; 51 52 private FutureMap futures; 54 55 private UniqueID ownerBody; 57 58 private transient ActiveACQueue queueAC; 60 61 private boolean acEnabled; 63 64 private java.util.HashMap valuesForFutures; 67 68 72 public FuturePool() { 73 futures = new FutureMap(); 74 valuesForFutures = new java.util.HashMap (); 75 this.newState = false; 76 if ("enable".equals(ProActiveConfiguration.getACState())) 77 this.acEnabled = true; 78 else 79 this.acEnabled = false; 80 if (acEnabled) { 81 queueAC = new ActiveACQueue(); 82 queueAC.start(); 83 } 84 } 85 86 90 static private java.util.Hashtable bodyDestination; 94 95 static public void registerBodyDestination(UniversalBody dest) { 97 bodyDestination.put(Thread.currentThread(), dest); 98 } 99 100 static public void removeBodyDestination() { 102 bodyDestination.remove(Thread.currentThread()); 103 } 104 105 static public UniversalBody getBodyDestination() { 107 return (UniversalBody) (bodyDestination.get(Thread.currentThread())); 108 } 109 110 111 112 static private java.util.Hashtable incomingFutures; 115 116 public static void registerIncomingFuture(Future f) { 118 java.util.ArrayList listOfFutures = (java.util.ArrayList ) incomingFutures.get(Thread.currentThread()); 119 if (listOfFutures != null) { 120 listOfFutures.add(f); 121 } else { 122 java.util.ArrayList newListOfFutures = new java.util.ArrayList (); 123 newListOfFutures.add(f); 124 incomingFutures.put(Thread.currentThread(), newListOfFutures); 125 } 126 } 127 128 static public void removeIncomingFutures() { 130 incomingFutures.remove(Thread.currentThread()); 131 } 132 133 static public java.util.ArrayList getIncomingFutures() { 135 return (java.util.ArrayList ) (incomingFutures.get(Thread.currentThread())); 136 } 137 138 static { 140 bodyDestination = new java.util.Hashtable (); 141 incomingFutures = new java.util.Hashtable (); 142 } 143 144 148 152 public void setOwnerBody(UniqueID i) { 153 ownerBody = i; 154 } 155 156 159 public UniqueID getOwnerBody() { 160 return ownerBody; 161 } 162 163 167 public void enableAC() { 168 this.queueAC = new ActiveACQueue(); 169 this.queueAC.start(); 170 this.acEnabled = true; 171 } 172 173 177 public void disableAC() { 178 this.acEnabled = false; 179 this.queueAC.killMe(); 180 this.queueAC = null; 181 } 182 183 190 public synchronized void receiveFutureValue(long id, UniqueID creatorID, Object result) throws java.io.IOException { 191 192 java.util.ArrayList futuresToUpdate = futures.getFuturesToUpdate(id, creatorID); 194 195 if (futuresToUpdate != null) { 196 Future future = (Future) (futuresToUpdate.get(0)); 197 if (future != null) { 198 future.receiveReply(result); 199 } 200 setMigrationTag(); 205 for (int i = 1; i < futuresToUpdate.size(); i++) { 206 Future otherFuture = (Future) (futuresToUpdate.get(i)); 207 otherFuture.receiveReply(Utils.makeDeepCopy(result)); 208 } 209 unsetMigrationTag(); 210 stateChange(); 211 212 if (acEnabled) { 214 java.util.ArrayList bodiesToContinue = futures.getAutomaticContinuation(id, creatorID); 215 if ((bodiesToContinue != null) && (bodiesToContinue.size() != 0)) { 216 ProActiveSecurityManager psm = null; 217 try { 218 psm = ProActive.getBodyOnThis() 219 .getProActiveSecurityManager(); 220 } catch (SecurityNotAvailableException e) { 221 psm = null; 222 } 223 queueAC.addACRequest(new ACService(bodiesToContinue, new ReplyImpl(creatorID, id, null, result,psm))); 224 } 225 } 226 227 futures.removeFutures(id, creatorID); 229 } else { 230 this.valuesForFutures.put(""+id+creatorID, result); 232 } 233 } 234 235 236 242 public synchronized void receiveFuture(Future futureObject) { 243 futureObject.setSenderID(ownerBody); 244 futures.receiveFuture(futureObject); 245 long id = futureObject.getID(); 246 UniqueID creatorID = futureObject.getCreatorID(); 247 if (valuesForFutures.get(""+id+creatorID) != null) { 248 try { 249 this.receiveFutureValue(id, creatorID, valuesForFutures.remove(""+id+creatorID)); 250 } catch (java.io.IOException e) { 251 } 252 } 253 } 254 255 261 public void addAutomaticContinuation(long id, UniqueID creatorID, UniversalBody bodyDest) { 262 futures.addAutomaticContinuation(id, creatorID, bodyDest); 263 } 264 265 266 267 public synchronized void waitForReply() { 268 this.newState = false; 269 while (!newState) { 270 try { 271 wait(); 272 } catch (InterruptedException e) { 273 e.printStackTrace(); 274 } 275 } 276 277 } 278 279 283 public void registerDestination(UniversalBody dest){ 284 if (acEnabled) 285 FuturePool.registerBodyDestination(dest); 286 } 287 288 291 public void removeDestination(){ 292 if (acEnabled) 293 FuturePool.removeBodyDestination(); 294 } 295 296 297 public void setMigrationTag() { 298 futures.setMigrationTag(); 299 } 300 301 public void unsetMigrationTag() { 302 futures.unsetMigrationTag(); 303 } 304 305 309 private void stateChange() { 310 this.newState = true; 311 notifyAll(); 312 } 313 314 318 private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { 319 setMigrationTag(); 320 out.defaultWriteObject(); 321 if (acEnabled) { 322 out.writeObject(queueAC.getQueue()); 324 queueAC.killMe(); 326 } 327 } 328 329 private void readObject(java.io.ObjectInputStream in) throws java.io.IOException , ClassNotFoundException { 330 in.defaultReadObject(); 331 unsetMigrationTag(); 332 if (acEnabled) { 333 java.util.ArrayList queue = (java.util.ArrayList ) (in.readObject()); 335 queueAC = new ActiveACQueue(queue); 336 queueAC.start(); 337 } 338 339 } 340 341 342 344 351 private class ActiveACQueue extends Thread { 352 353 private java.util.ArrayList queue; 354 private int counter; 355 private boolean kill; 356 357 361 public ActiveACQueue() { 362 queue = new java.util.ArrayList (); 363 counter = 0; 364 kill = false; 365 this.setName("Thread for AC"); 366 } 367 368 public ActiveACQueue(java.util.ArrayList queue) { 369 this.queue = queue; 370 counter = queue.size(); 371 kill = false; 372 this.setName("Thread for AC"); 373 } 374 375 379 382 public java.util.ArrayList getQueue() { 383 return queue; 384 } 385 386 389 public synchronized void addACRequest(ACService r) { 390 queue.add(r); 391 counter++; 392 notifyAll(); 393 } 394 395 398 public synchronized ACService removeACRequest() { 399 counter--; 400 return (ACService) (queue.remove(0)); 401 } 402 403 406 public synchronized void killMe() { 407 kill = true; 408 notifyAll(); 409 } 410 411 public void run() { 412 Body owner = null; 416 while (owner == null) { 417 owner = LocalBodyStore.getInstance().getLocalBody(ownerBody); 418 if (owner == null) 420 owner = LocalBodyStore.getInstance().getLocalHalfBody(ownerBody); 421 } 422 423 while (true) { 424 waitForAC(); 426 427 if (kill) 428 break; 429 430 try { 432 owner.enterInThreadStore(); 434 435 if (kill) 437 break; 438 439 ACService toDo = this.removeACRequest(); 440 if (toDo != null) { 441 toDo.doAutomaticContinuation(); 442 } 443 444 owner.exitFromThreadStore(); 446 447 } catch (Exception e2) { 448 owner.exitFromThreadStore(); 450 throw new ProActiveRuntimeException("Error while sending reply for AC ", e2); 451 } 452 } 453 } 454 455 private synchronized void waitForAC() { 457 try { 458 while ((counter == 0) && !kill) { 459 wait(); 460 } 461 } catch (InterruptedException e) { 462 e.printStackTrace(); 463 } 464 } 465 466 } 467 468 472 private class ACService implements java.io.Serializable { 473 474 private java.util.ArrayList dests; 476 private Reply reply; 478 479 483 public ACService(java.util.ArrayList dests, Reply reply) { 484 this.dests = dests; 485 this.reply = reply; 486 } 487 488 492 public void doAutomaticContinuation() throws java.io.IOException { 493 if (dests != null) { 494 for (int i = 0; i < dests.size(); i++) { 495 UniversalBody dest = (UniversalBody) (dests.get(i)); 496 registerDestination(dest); 497 reply.send(dest); 498 removeDestination(); 499 } 500 } 501 } 502 } 504 } 505 | Popular Tags |