1 31 package org.objectweb.proactive.core.group; 32 33 import java.lang.reflect.InvocationTargetException ; 34 import java.util.Collection ; 35 import java.util.Iterator ; 36 import java.util.ListIterator ; 37 import java.util.Vector ; 38 39 import org.apache.log4j.Logger; 40 import org.objectweb.proactive.Body; 41 import org.objectweb.proactive.ProActive; 42 import org.objectweb.proactive.core.UniqueID; 43 import org.objectweb.proactive.core.body.LocalBodyStore; 44 import org.objectweb.proactive.core.body.proxy.AbstractProxy; 45 import org.objectweb.proactive.core.component.ProActiveInterface; 46 import org.objectweb.proactive.core.group.spmd.MethodCallBarrier; 47 import org.objectweb.proactive.core.group.spmd.MethodCallSetSPMDGroup; 48 import org.objectweb.proactive.core.group.threadpool.ThreadPool; 49 import org.objectweb.proactive.core.mop.ConstructionOfReifiedObjectFailedException; 50 import org.objectweb.proactive.core.mop.ConstructorCall; 51 import org.objectweb.proactive.core.mop.MOP; 52 import org.objectweb.proactive.core.mop.MethodCall; 53 import org.objectweb.proactive.core.mop.StubObject; 54 55 56 public class ProxyForGroup extends AbstractProxy 57 implements org.objectweb.proactive.core.mop.Proxy, Group, 58 java.io.Serializable { 59 60 61 protected static Logger logger = Logger.getLogger(ProxyForGroup.class.getName()); 62 63 64 protected String className; 65 66 67 protected Vector memberList; 68 69 70 71 transient private UniqueID proxyForGroupID; 73 74 75 protected int waited = 0; 76 77 78 protected boolean dispatching = false; 79 80 81 protected boolean uniqueSerialization = false; 82 83 84 protected StubObject stub; 85 86 87 transient private ThreadPool threadpool; 88 89 90 public ProxyForGroup(String nameOfClass) 91 throws ConstructionOfReifiedObjectFailedException { 92 this.className = nameOfClass; 93 this.memberList = new Vector (); 94 this.proxyForGroupID = new UniqueID(); 95 this.threadpool = new ThreadPool(); 96 } 97 98 public ProxyForGroup(String nameOfClass, Integer size) 99 throws ConstructionOfReifiedObjectFailedException { 100 this.className = nameOfClass; 101 this.memberList = new Vector (size.intValue()); 102 this.proxyForGroupID = new UniqueID(); 103 this.threadpool = new ThreadPool(); 104 } 105 106 public ProxyForGroup() throws ConstructionOfReifiedObjectFailedException { 107 this.memberList = new Vector (); 108 this.proxyForGroupID = new UniqueID(); 109 this.threadpool = new ThreadPool(); 110 } 111 112 public ProxyForGroup(ConstructorCall c, Object [] p) 113 throws ConstructionOfReifiedObjectFailedException { 114 this.memberList = new Vector (); 115 this.proxyForGroupID = new UniqueID(); 116 this.threadpool = new ThreadPool(); 117 } 118 119 120 121 124 protected void setDispatchingOn() { 125 this.dispatching = true; 126 } 127 128 131 protected void setDispatchingOff() { 132 this.dispatching = false; 133 } 134 135 138 protected void setUniqueSerializationOn() { 139 this.uniqueSerialization = true; 140 } 141 142 145 protected void setUniqueSerializationOff() { 146 this.uniqueSerialization = false; 147 } 148 149 153 protected boolean isDispatchingOn() { 154 return this.dispatching; 155 } 156 157 private boolean isDispatchingCall(MethodCall mc) { 158 for (int i = 0; i < mc.getNumberOfParameter(); i++) 159 if (ProActiveGroup.isScatterGroupOn(mc.getParameter(i))) { 160 return true; 161 } 162 return false; 163 } 164 165 166 167 175 public Object reify(MethodCall mc) throws InvocationTargetException { 176 177 179 180 if ("toString".equals(mc.getName())) { 181 return this.toString(); 182 } 183 184 185 if ("hashCode".equals(mc.getName())) { 186 return new Integer (this.hashCode()); 187 } 188 189 190 Object result = null; 191 192 193 this.threadpool.checkNumberOfThreads(this.memberList.size()); 194 195 196 if (AbstractProxy.isOneWayCall(mc)) { 197 this.oneWayCallOnGroup(mc); 198 } 199 200 else if (mc.getReifiedMethod().getReturnType() == Void.TYPE) { 201 this.oneWayCallOnGroup(mc); 202 } 203 204 else { result = this.asynchronousCallOnGroup(mc); 206 } 207 208 209 this.threadpool.complete(); 210 211 return result; 212 } 213 214 215 216 221 protected synchronized Object asynchronousCallOnGroup(MethodCall mc) { 222 Object result; 223 Body body = ProActive.getBodyOnThis(); 224 225 try { 227 Object [] paramProxy = new Object [0]; 228 result = MOP.newInstance(mc.getReifiedMethod().getReturnType() 229 .getName(), null, 230 ProxyForGroup.class.getName(), paramProxy); 231 ((ProxyForGroup) ((StubObject) result).getProxy()).className = mc.getReifiedMethod() 232 .getReturnType() 233 .getName(); 234 } catch (Exception e) { 235 e.printStackTrace(); 236 return null; 237 } 238 239 int size = this.memberList.size(); 240 241 Vector memberListOfResultGroup = ((ProxyForGroup) ((StubObject) result).getProxy()).memberList; 243 for (int i = 0; i < size; i++) { 244 memberListOfResultGroup.add(null); 245 } 246 247 if (isDispatchingCall(mc) == false) { 249 if (uniqueSerialization) { 250 mc.transformEffectiveArgumentsIntoByteArray(); 251 } 252 for (int index = 0; index < this.memberList.size(); index++) 253 this.threadpool.addAJob(new ProcessForAsyncCall(this, 254 this.memberList, memberListOfResultGroup, index, mc, 255 body)); 256 } else { for (int index = 0; index < memberList.size(); index++) { 258 Object [] individualEffectiveArguments = new Object [mc.getNumberOfParameter()]; 259 for (int i = 0; i < mc.getNumberOfParameter(); i++) 260 if (ProActiveGroup.isScatterGroupOn(mc.getParameter(i))) { 261 individualEffectiveArguments[i] = ProActiveGroup.get(mc.getParameter( 262 i), 263 index % ProActiveGroup.size(mc.getParameter(i))); 264 } else { 265 individualEffectiveArguments[i] = mc.getParameter(i); 266 } 267 this.threadpool.addAJob(new ProcessForAsyncCall(this, 268 this.memberList, memberListOfResultGroup, index, 269 new MethodCall(mc.getReifiedMethod(), 270 individualEffectiveArguments), body)); 271 } 272 } 273 274 LocalBodyStore.getInstance().setCurrentThreadBody(body); 275 276 return result; 277 } 278 279 285 protected synchronized void addToListOfResult( 286 Vector memberListOfResultGroup, Object result, int index) { 287 memberListOfResultGroup.set(index, result); 288 } 289 290 291 292 296 protected synchronized void oneWayCallOnGroup(MethodCall mc) { 297 Body body = ProActive.getBodyOnThis(); 298 ExceptionList exceptionList = new ExceptionList(); 299 300 if (isDispatchingCall(mc) == false) { 302 if (uniqueSerialization) { 303 mc.transformEffectiveArgumentsIntoByteArray(); 304 } 305 for (int index = 0; index < this.memberList.size(); index++) 306 this.threadpool.addAJob(new ProcessForOneWayCall(this, 307 this.memberList, index, mc, body, exceptionList)); 308 } else { for (int index = 0; index < memberList.size(); index++) { 310 Object [] individualEffectiveArguments = new Object [mc.getNumberOfParameter()]; 311 for (int i = 0; i < mc.getNumberOfParameter(); i++) 312 if (ProActiveGroup.isScatterGroupOn(mc.getParameter(i))) { 313 individualEffectiveArguments[i] = ProActiveGroup.get(mc.getParameter( 314 i), 315 index % ProActiveGroup.size(mc.getParameter(i))); 316 } else { 317 individualEffectiveArguments[i] = mc.getParameter(i); 318 } 319 this.threadpool.addAJob(new ProcessForOneWayCall(this, 320 this.memberList, index, 321 new MethodCall(mc.getReifiedMethod(), 322 individualEffectiveArguments), body, exceptionList)); 323 } 324 } 325 326 LocalBodyStore.getInstance().setCurrentThreadBody(body); 327 328 if (exceptionList.size() != 0) { 329 throw exceptionList; 330 } 331 } 332 333 334 335 342 public boolean add(Object o) { 343 try { 344 if ((MOP.forName(this.className)).isAssignableFrom(o.getClass())) { 345 346 347 if (MOP.isReifiedObject(o)) { 348 boolean result = this.memberList.add(o); 349 350 351 if (o instanceof org.objectweb.proactive.core.group.GroupMember) { 352 ((org.objectweb.proactive.core.group.GroupMember) o).setMyGroup(stub); 353 ((org.objectweb.proactive.core.group.GroupMember) o).setMyRank(this.memberList.indexOf( 354 o)); 355 } 356 return result; 357 } 358 360 361 else if (o instanceof ProActiveInterface) { 362 return this.memberList.add(o); 363 } else if (o instanceof org.objectweb.proactive.core.group.ProxyForGroup) { 364 365 366 return this.memberList.addAll(((org.objectweb.proactive.core.group.ProxyForGroup) o).memberList); 367 } else { 368 return this.memberList.add(o); 369 } 370 } else { 371 if (logger.isInfoEnabled()) { 372 logger.info("uncompatible Object"); 373 } 374 return false; 375 } 376 } catch (java.lang.ClassNotFoundException e) { 377 if (logger.isInfoEnabled()) { 378 logger.info("Unknown class : " + this.className); 379 } 380 } 381 return true; 382 } 383 384 389 public boolean addAll(Collection c) { 390 boolean modified = false; 391 Iterator iterator = c.iterator(); 392 while (iterator.hasNext()) { 393 modified |= this.add(iterator.next()); 394 } 395 return modified; 396 } 397 398 402 public void clear() { 403 this.memberList.clear(); 404 } 405 406 410 public boolean contains(Object o) { 411 return this.memberList.contains(o); 412 } 413 414 419 public boolean containsAll(Collection c) { 420 boolean contained; 421 Iterator iterator = c.iterator(); 422 while (iterator.hasNext()) { 423 contained = this.contains(iterator.next()); 424 if (!contained) { 425 return false; 426 } 427 } 428 return true; 429 } 430 431 436 public boolean equals(Object o) { 437 if (o instanceof org.objectweb.proactive.core.group.ProxyForGroup) { 438 return this.proxyForGroupID.equals(((org.objectweb.proactive.core.group.ProxyForGroup) o).proxyForGroupID); 439 } else { 440 return false; 441 } 442 } 443 444 448 public int hashCode() { 449 return this.memberList.hashCode(); 450 } 451 452 456 public boolean isEmpty() { 457 return this.memberList.isEmpty(); 458 } 459 460 464 public Iterator iterator() { 465 return this.memberList.iterator(); 466 } 467 468 474 public boolean remove(Object o) { 475 return this.memberList.remove(o); 476 } 477 478 484 public boolean removeAll(Collection c) { 485 boolean modified = false; 486 Iterator iterator = c.iterator(); 487 while (iterator.hasNext()) { 488 modified |= this.remove(iterator.next()); 489 } 490 return modified; 491 } 492 493 499 public boolean retainAll(Collection c) { 500 boolean modified = false; 501 Iterator iterator = c.iterator(); 502 while (iterator.hasNext()) { 503 Object tmp = iterator.next(); 504 if (this.contains(tmp)) { 505 modified |= this.remove(tmp); 506 } 507 } 508 return modified; 509 } 510 511 515 public int size() { 516 return this.memberList.size(); 517 } 518 519 523 public Object [] toArray() { 524 return this.memberList.toArray(); 525 } 526 527 534 public Object [] toArray(Object [] a) { 535 return this.memberList.toArray(a); 536 } 537 538 539 540 548 public void addMerge(Object oGroup) { 549 try { 550 551 552 if ((MOP.isReifiedObject(oGroup)) && 553 ((MOP.forName(this.className)).isAssignableFrom( 554 oGroup.getClass()))) { 555 556 557 if (((StubObject) oGroup).getProxy() instanceof org.objectweb.proactive.core.group.ProxyForGroup) { 558 memberList.addAll(((ProxyForGroup) ((StubObject) oGroup).getProxy()).memberList); 559 } else { 560 this.add(oGroup); 561 } 562 } else if (oGroup instanceof org.objectweb.proactive.core.group.ProxyForGroup) { 563 memberList.addAll(((org.objectweb.proactive.core.group.ProxyForGroup) oGroup).memberList); 564 } 565 } catch (java.lang.ClassNotFoundException e) { 566 if (logger.isInfoEnabled()) { 567 logger.info("Unknown class : " + this.className); 568 } 569 } 570 } 571 572 578 public int indexOf(Object obj) { 579 return this.memberList.indexOf(obj); 580 } 581 582 586 public ListIterator listIterator() { 587 return this.memberList.listIterator(); 588 } 589 590 594 public void remove(int index) { 595 this.memberList.remove(index); 596 } 597 598 603 public Object get(int i) { 604 return this.memberList.get(i); 605 } 606 607 612 public Class getType() throws java.lang.ClassNotFoundException { 613 return MOP.forName(this.className); 614 } 615 616 620 public String getTypeName() { 621 return this.className; 622 } 623 624 628 public Object getGroupByType() { 629 Object result; 630 try { result = MOP.newInstance(this.className, null, 632 ProxyForGroup.class.getName(), null); 633 } catch (Exception e) { 634 e.printStackTrace(); 635 return null; 636 } 637 ProxyForGroup proxy = (ProxyForGroup) ((StubObject) result).getProxy(); 638 proxy.memberList = this.memberList; 639 proxy.className = this.className; 640 proxy.proxyForGroupID = this.proxyForGroupID; 641 proxy.waited = this.waited; 642 return result; 643 } 644 645 654 659 public Group union (Group g) { 660 try { 661 if ((MOP.forName(this.getTypeName())).isAssignableFrom(MOP.forName(g.getTypeName()))) { 662 ProxyForGroup result = new ProxyForGroup(this.getTypeName()); 663 Iterator it = this.iterator(); 665 while (it.hasNext()) { 666 result.add(it.next()); 667 } 668 it = g.iterator(); 670 while (it.hasNext()) { 671 result.add(it.next()); 672 } 673 return result; 674 } } 675 catch (ClassNotFoundException e) { e.printStackTrace(); } 676 catch (ConstructionOfReifiedObjectFailedException e) { e.printStackTrace(); } 677 return null; 679 } 680 681 686 public Group intersection (Group g) { 687 try { 688 if ((MOP.forName(this.getTypeName())).isAssignableFrom(MOP.forName(g.getTypeName()))) { 689 ProxyForGroup result = new ProxyForGroup(this.getTypeName()); 690 Object member; 691 Iterator it = this.iterator(); 692 while (it.hasNext()) { 694 member = it.next(); 695 if (g.indexOf(member) > -1) { 696 result.add(member); 697 } 698 } 699 return result; 700 } } 701 catch (ClassNotFoundException e) { e.printStackTrace(); } 702 catch (ConstructionOfReifiedObjectFailedException e) { e.printStackTrace(); } 703 return null; 705 } 706 707 712 public Group exclude (Group g) { 713 try { 714 if ((MOP.forName(this.getTypeName())).isAssignableFrom(MOP.forName(g.getTypeName()))) { 715 ProxyForGroup result = new ProxyForGroup(this.getTypeName()); 716 Object member; 717 Iterator it = this.iterator(); 718 while (it.hasNext()) { 719 member = it.next(); 720 if (g.indexOf(member) < 0) { 721 result.add(member); 722 } 723 } 724 return result; 725 } } 726 catch (ClassNotFoundException e) { e.printStackTrace(); } 727 catch (ConstructionOfReifiedObjectFailedException e) { e.printStackTrace(); } 728 return null; 730 } 731 732 737 public Group difference (Group g) { 738 try { 739 if ((MOP.forName(this.getTypeName())).isAssignableFrom(MOP.forName(g.getTypeName()))) { 740 ProxyForGroup result = new ProxyForGroup(this.getTypeName()); 741 Object member; 742 Iterator it = this.iterator(); 743 while (it.hasNext()) { 745 member = it.next(); 746 if (g.indexOf(member) < 0) { 747 result.add(member); 748 } 749 } 750 it = g.iterator(); 751 while (it.hasNext()) { 753 member = it.next(); 754 if (this.indexOf(member) < 0) { 755 result.add(member); 756 } 757 } 758 return result; 759 } } 760 catch (ClassNotFoundException e) { e.printStackTrace(); } 761 catch (ConstructionOfReifiedObjectFailedException e) { e.printStackTrace(); } 762 return null; 764 } 765 766 772 public Group range (int begin, int end) { 773 if (begin > end) { 775 return null; 776 } 777 if (begin < 0) { 778 begin = 0; 779 } 780 if (end > this.size()) { 781 end = this.size(); 782 } 783 try { 784 ProxyForGroup result = new ProxyForGroup(this.getTypeName()); 785 for (int i = begin ; i <= end ; i++) { 786 result.add(this.get(i)); 787 } 788 return result; 789 } 790 catch (ConstructionOfReifiedObjectFailedException e) { 791 e.printStackTrace(); 792 return null; 793 } 794 } 795 796 799 public void barrier () { 800 try { 801 this.reify(new MethodCallBarrier()); } 802 catch (InvocationTargetException e) { 803 logger.info("Unable to invoke the \"barrier\" method"); 804 e.printStackTrace(); 805 } 806 } 807 808 812 public void setSPMDGroup (Object spmdGroup) { 813 try { 814 this.reify(new MethodCallSetSPMDGroup(spmdGroup)); } 815 catch (InvocationTargetException e) { 816 logger.info("Unable to set the SPMD group"); 817 e.printStackTrace(); 818 } 819 } 820 821 824 public void display() { 825 logger.info("Number of member : " + memberList.size()); 826 for (int i = 0; i < memberList.size(); i++) 827 logger.info(" " + i + " : " + 828 memberList.get(i).getClass().getName()); 829 } 830 831 832 833 836 public void waitAll() { 837 ProActive.waitForAll(this.memberList); 838 } 839 840 843 public void waitOne() { 844 ProActive.waitForAny(this.memberList); 845 } 846 847 851 public void waitTheNth(int n) { 852 ProActive.waitFor(this.memberList.get(n)); 853 } 854 855 859 public void waitN(int n) { 860 for (int i = 0; i < n; i++) { 861 this.waitTheNth(i); 862 } 863 } 864 865 869 public Object waitAndGetOne() { 870 return this.memberList.get(ProActive.waitForAny(this.memberList)); 871 } 872 873 878 public Object waitAndGetTheNth(int n) { 879 ProActive.waitForTheNth(this.memberList, n); 880 return this.memberList.get(n); 881 } 882 883 887 public int waitOneAndGetIndex() { 888 int index = 0; 889 this.memberList.get(ProActive.waitForAny(this.memberList)); 890 while (ProActive.isAwaited(this.memberList.get(index))) { 891 index++; 892 } 893 return index; 894 } 895 896 900 public boolean allAwaited() { 901 for (int i = 0; i < this.memberList.size(); i++) 902 if (!(ProActive.isAwaited(this.memberList.get(i)))) { 903 return false; 904 } 905 return true; 906 } 907 908 912 public boolean allArrived() { 913 for (int i = 0; i < this.memberList.size(); i++) 914 if (ProActive.isAwaited(this.memberList.get(i))) { 915 return false; 916 } 917 return true; 918 } 919 920 925 public ExceptionList getExceptionList() { 926 ExceptionList exceptionList = new ExceptionList(); 927 for (int i = 0; i < this.memberList.size(); i++) { 928 if (this.memberList.get(i) instanceof Throwable ) { 929 exceptionList.add(new ExceptionInGroup(null, 930 (Throwable ) this.memberList.get(i))); 931 } 932 } 933 return exceptionList; 934 } 935 936 942 public void purgeExceptionAndNull() { 943 Iterator it = this.memberList.iterator(); 944 while (it.hasNext()) { 945 Object element = (Object ) it.next(); 946 if ((element instanceof Throwable ) || (element == null)) { 947 it.remove(); 948 } 949 } 950 } 951 952 956 public void setRatioNemberToThread(int i) { 957 this.threadpool.ratio(i); 958 } 959 960 961 962 968 protected void createMemberWithMultithread(String className, 969 Object [][] params, String [] nodeList) { 970 for (int i = 0; i < params.length; i++) { 972 this.memberList.add(null); 973 } 974 for (int i = 0; i < params.length; i++) { 975 this.threadpool.addAJob(new ProcessForGroupCreation(this, 976 className, params[i], nodeList[i % nodeList.length], i)); 977 } 978 this.threadpool.complete(); 979 } 980 981 986 protected void set(int index, Object o) { 987 this.memberList.set(index, o); 988 } 989 990 991 private void writeObject(java.io.ObjectOutputStream out) 992 throws java.io.IOException { 993 out.defaultWriteObject(); 995 } 996 997 private void readObject(java.io.ObjectInputStream in) 998 throws java.io.IOException , ClassNotFoundException { 999 in.defaultReadObject(); 1000 this.proxyForGroupID = new UniqueID(); 1001 this.threadpool = new ThreadPool(); 1002 } 1003 1004} 1005 | Popular Tags |