1 31 32 package org.objectweb.proactive.core.body.future; 33 34 import org.objectweb.proactive.core.Constants; 35 import org.objectweb.proactive.core.ProActiveRuntimeException; 36 import org.objectweb.proactive.core.mop.ConstructionOfReifiedObjectFailedException; 37 import org.objectweb.proactive.core.mop.ConstructorCall; 38 import org.objectweb.proactive.core.mop.MOP; 39 import org.objectweb.proactive.core.mop.MethodCall; 40 import org.objectweb.proactive.core.mop.MethodCallExecutionFailedException; 41 import org.objectweb.proactive.core.mop.Proxy; 42 import org.objectweb.proactive.core.mop.StubObject; 43 import java.lang.reflect.InvocationTargetException ; 44 import org.objectweb.proactive.core.UniqueID; 45 import org.objectweb.proactive.Body; 46 import org.objectweb.proactive.ProActive; 47 import org.objectweb.proactive.core.body.LocalBodyStore; 48 import org.objectweb.proactive.core.body.UniversalBody; 49 import org.objectweb.proactive.core.event.FutureEvent; 50 51 52 59 public class FutureProxy implements Future, Proxy, java.io.Serializable { 60 61 65 68 public static final int RECYCLE_POOL_SIZE = 1000; 69 private static FutureProxy[] recyclePool; 70 71 74 private static boolean shouldPoolFutureProxyObjects; 75 76 private static int index; 77 78 80 private static FutureEventProducerImpl futureEventProducer; 81 82 83 87 90 protected Object target; 91 92 97 protected boolean migration; 98 99 102 protected boolean continuation; 103 104 107 protected UniqueID creatorID; 108 109 113 protected long ID; 114 115 118 protected UniqueID senderID; 119 120 123 protected boolean isAvailable; 124 125 128 protected boolean isException; 129 130 134 139 140 public FutureProxy() throws ConstructionOfReifiedObjectFailedException { 141 } 142 143 148 public FutureProxy(ConstructorCall c, Object [] p) throws ConstructionOfReifiedObjectFailedException { 149 this(); 151 } 152 153 157 161 public static boolean isAwaited(Object obj) { 162 if ((MOP.isReifiedObject(obj)) == false) 164 return false; 165 Proxy theProxy = ((StubObject) obj).getProxy(); 166 if (!(theProxy instanceof Future)) 168 return false; 169 return ((Future) theProxy).isAwaited(); 170 } 171 172 public synchronized static FutureProxy getFutureProxy() { 173 FutureProxy result; 174 if (shouldPoolFutureProxyObjects && (index > 0)) { 175 index--; 177 result = recyclePool[index]; 178 recyclePool[index] = null; 179 } else { 180 try { 181 result = new FutureProxy(); 182 } catch (ConstructionOfReifiedObjectFailedException e) { 183 result = null; 184 } 185 } 186 return result; 187 } 188 189 191 public static FutureEventProducer getFutureEventProducer() { 192 if (futureEventProducer == null) futureEventProducer = new FutureEventProducerImpl(); 193 return futureEventProducer; 194 } 195 196 200 public boolean equals(Object obj) { 201 if (isFutureObject(obj)) { 203 return (((StubObject) obj).getProxy().hashCode() == this.hashCode()); 204 } 205 return false; 206 } 207 208 212 220 221 public synchronized void receiveReply(Object obj) throws java.io.IOException { 222 if (target != null) { 223 throw new java.io.IOException ("FutureProxy receives a reply and this target field is not null"); 224 } 225 target = obj; 226 if (target != null) { 227 isException = (target instanceof Throwable ); 228 } 229 isAvailable = true; 230 this.notifyAll(); 231 } 232 233 238 public synchronized Throwable getRaisedException() { 239 waitFor(); 240 if (isException) 241 return (Throwable ) target; 242 return null; 243 } 244 245 249 public synchronized Object getResult() { 250 waitFor(); 251 return target; 252 } 253 254 255 public synchronized void setResult(Object o) { 256 target = o; 257 isAvailable = true; 258 } 259 260 264 public synchronized boolean isAwaited() { 265 return !isAvailable; 266 } 267 268 271 public synchronized void waitFor() { 272 273 if (isAvailable) return; 274 275 UniqueID id = null; 276 277 if (futureEventProducer != null) { 279 id = ProActive.getBodyOnThis().getID(); 280 if (LocalBodyStore.getInstance().getLocalBody(id) != null) { 281 futureEventProducer.notifyListeners(id, 283 getCreatorID(), FutureEvent.WAIT_BY_NECESSITY); 284 } else id = null; 285 } 286 while (!isAvailable) { 287 try { 288 this.wait(); 289 } catch (InterruptedException e) { 290 } 291 } 292 if (id != null) { 294 futureEventProducer.notifyListeners(id, 295 getCreatorID(), FutureEvent.RECEIVED_FUTURE_RESULT); 296 } 297 298 } 299 300 301 302 public long getID() { 303 return ID; 304 } 305 306 public void setID(long l) { 307 ID = l; 308 } 309 310 public void setCreatorID(UniqueID i) { 311 creatorID = i; 312 } 313 314 public UniqueID getCreatorID() { 315 return creatorID; 316 } 317 318 public void setSenderID(UniqueID i) { 319 senderID = i; 320 } 321 322 323 324 328 342 public Object reify(MethodCall c) throws InvocationTargetException { 343 Object result = null; 344 waitFor(); 355 356 if (this.isException) { 358 throw ((InvocationTargetException ) this.target); 359 } else { 360 try { 361 result = c.execute(this.target); 362 } catch (MethodCallExecutionFailedException e) { 363 throw new ProActiveRuntimeException("FutureProxy: Illegal arguments in call " + c.getName()); 364 } 365 } 366 367 if (target instanceof StubObject) { 369 Proxy p =((StubObject)target).getProxy(); 370 if ( p instanceof FutureProxy ) { 371 target = ((FutureProxy)p).target; 372 } 373 } 374 375 return result; 376 } 377 378 382 protected void finalize() { 383 returnFutureProxy(this); 384 } 385 386 protected void setMigrationTag() { 387 migration = true; 388 } 389 390 protected void unsetMigrationTag() { 391 migration = false; 392 } 393 394 public synchronized void setContinuationTag(){ 395 continuation = true; 396 } 397 398 public synchronized void unsetContinuationTag(){ 399 continuation = false; 400 } 401 402 403 404 405 409 private synchronized void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { 410 411 412 if (!continuation) { 415 continuation = (FuturePool.getBodyDestination()!=null); 416 } 417 418 if ((!migration) && (!continuation)) { 420 waitFor(); 421 } 422 423 if (continuation && isAwaited()) { 425 Body sender = LocalBodyStore.getInstance().getLocalBody(senderID); 427 if (sender == null) { 429 sender = LocalBodyStore.getInstance().getLocalHalfBody(senderID); 430 } 431 if (sender != null) { 432 UniversalBody dest = FuturePool.getBodyDestination(); 433 if (dest != null) { 434 sender.getFuturePool().addAutomaticContinuation(ID, creatorID, dest); 435 } 436 } } 438 439 out.writeObject(target); 441 out.writeBoolean(continuation); 443 out.writeLong(ID); 445 out.writeObject(creatorID); 447 448 out.writeBoolean(isException); 452 out.writeBoolean(isAvailable); 453 454 this.continuation = false; 456 } 457 458 459 private synchronized void readObject(java.io.ObjectInputStream in) throws java.io.IOException , ClassNotFoundException { 460 target = (Object ) in.readObject(); 461 continuation = (boolean) in.readBoolean(); 462 ID = (long) in.readLong(); 463 creatorID = (UniqueID) in.readObject(); 464 isException = (boolean) in.readBoolean(); 465 isAvailable = (boolean) in.readBoolean(); 466 467 if (continuation && isAwaited()) { 468 continuation = false; 469 FuturePool.registerIncomingFuture(this); 470 471 } 472 473 migration = false; 475 } 476 477 481 private static boolean isFutureObject(Object obj) { 482 if (!(MOP.isReifiedObject(obj))) 484 return false; 485 Class proxyclass = ((StubObject) obj).getProxy().getClass(); 490 Class [] ints = proxyclass.getInterfaces(); 491 for (int i = 0; i < ints.length; i++) { 492 if (Constants.FUTURE_PROXY_INTERFACE.isAssignableFrom(ints[i])) 493 return true; 494 } 495 return false; 496 } 497 498 private static synchronized void setShouldPoolFutureProxyObjects(boolean value) { 499 if (shouldPoolFutureProxyObjects == value) 500 return; 501 shouldPoolFutureProxyObjects = value; 502 if (shouldPoolFutureProxyObjects) { 503 recyclePool = new FutureProxy[RECYCLE_POOL_SIZE]; 505 index = 0; 506 } else { 507 recyclePool = null; 511 } 512 } 513 514 private static synchronized void returnFutureProxy(FutureProxy futureProxy) { 515 if (!shouldPoolFutureProxyObjects) 516 return; 517 if (recyclePool[index] == null) { 519 futureProxy.target = null; 524 futureProxy.isAvailable = false; 525 futureProxy.isException = false; 526 527 recyclePool[index] = futureProxy; 529 index++; 530 if (index == RECYCLE_POOL_SIZE) 531 index = RECYCLE_POOL_SIZE - 1; 532 } 533 } 534 535 536 537 538 public synchronized static int futureLength(Object future) { 544 int res = 0; 545 if ((MOP.isReifiedObject(future)) 546 && ((((StubObject) future).getProxy()) instanceof Future)) { 547 res++; 548 Future f = (Future) (((StubObject) future).getProxy()); 549 Object gna = f.getResult(); 550 while ((MOP.isReifiedObject(gna)) 551 && ((((StubObject) gna).getProxy()) instanceof Future)) { 552 f = (Future) (((StubObject) gna).getProxy()); 553 gna = f.getResult(); 554 res++; 555 } 556 } 557 return res; 558 } 559 560 561 } 562 | Popular Tags |