1 51 52 package org.objectweb.jass.as; 53 54 import java.io.Serializable ; 55 import java.util.HashMap ; 56 import java.util.HashSet ; 57 import java.util.Iterator ; 58 import java.util.LinkedList ; 59 import java.util.Set ; 60 61 import javax.activity.ActionErrorException ; 62 import javax.activity.ActionNotFoundException ; 63 import javax.activity.ActivityCoordinator ; 64 import javax.activity.ActivityNotProcessedException ; 65 import javax.activity.ActivityPendingException ; 66 import javax.activity.CompletionStatus ; 67 import javax.activity.ContextPendingException ; 68 import javax.activity.CoordinationInformation ; 69 import javax.activity.GlobalId ; 70 import javax.activity.InvalidActivityException ; 71 import javax.activity.InvalidStateException ; 72 import javax.activity.NotOriginatorException ; 73 import javax.activity.Outcome ; 74 import javax.activity.Signal ; 75 import javax.activity.SignalSetInactiveException ; 76 import javax.activity.SignalSetUnknownException ; 77 import javax.activity.Status ; 78 import javax.activity.SystemException ; 79 import javax.activity.coordination.Action; 80 import javax.activity.coordination.SignalSet; 81 import javax.transaction.Transaction ; 82 83 import org.apache.log4j.Logger; 84 85 import org.jboss.util.timeout.Timeout; 86 import org.jboss.util.timeout.TimeoutFactory; 87 import org.jboss.util.timeout.TimeoutTarget; 88 89 import org.objectweb.jass.as.exceptions.SignalSetNotRegisteredException; 90 import org.objectweb.jass.as.util.SignalSetInfo; 91 92 101 public class ActivityImpl 102 implements ActivityCoordinator , TimeoutTarget, Serializable { 103 104 106 108 private static Logger log = Logger.getLogger(ActivityImpl.class); 109 private ActivityIdImpl activityId; 111 private ActivityImpl superior = null; 113 private LinkedList childs = new LinkedList (); 115 private int status = Status.StatusNoActivity; 117 private int completionStatus = CompletionStatus.CompletionStatusFail; 119 private long start; 121 private Timeout timeout = null; 123 private boolean timedOut = false; 124 private HashMap signalSets = new HashMap (); 127 private String completionSSName = null; 129 private ActivityService manager = null; 131 private boolean locked = false; 134 private boolean done = false; 136 private HashSet threads = new HashSet (); 137 138 140 public ActivityImpl( 141 ActivityService activityService, 142 ActivityIdImpl activityId, 143 ActivityImpl superior, 144 int timeout) { 145 146 log.info(getThreadName() + " STARTING ACTIVITY " + activityId.print()); 147 this.manager = activityService; 148 this.activityId = activityId; 149 this.superior = superior; 150 start = System.currentTimeMillis(); 153 if (timeout != 0) { 155 this.timeout = 156 TimeoutFactory.createTimeout(start + (timeout * 1000), this); 157 log.info("\tTimeout = " + timeout); 158 } else 159 log.info("\tTimeout NOT set to this activity"); 160 status = Status.StatusActive; 162 log.info(getThreadName() + activityId.print() + " STARTED!!!"); 163 } 164 165 167 169 184 public void addAction(Action action, String signalSetName, int priority) 185 throws SignalSetUnknownException , SystemException , IllegalStateException { 186 187 SignalSetInfo ssInfo = null; 188 SignalSet signalSet = null; 189 190 log.info(activityId.print()); 191 if (!signalSets.containsKey(signalSetName)) { 193 194 log.info("\tSignalSet " + signalSetName + " not yet registered."); 195 log.info("\tRegistering SignalSet and adding action..."); 196 197 try { 198 signalSet = manager.getService().getSignalSet(signalSetName); 200 signalSet.setActivityCoordinator(this); 201 ssInfo = new SignalSetInfo(signalSet); 202 ssInfo.addAction(action, priority, false); 203 signalSets.put(signalSetName, ssInfo); 204 log.info("\tSignalSet " + signalSetName + " registered"); 205 } catch (SignalSetUnknownException e) { 206 log.warn("Null SignalSet"); 207 e.printStackTrace(); 208 } catch (Exception e) { 209 e.printStackTrace(); 210 } 211 } else { log.info("\tSignalSet "+ signalSetName + " already registered. Adding action..."); 213 ssInfo = (SignalSetInfo) signalSets.get(signalSetName); 214 ssInfo.addAction(action, priority, false); 215 signalSets.put(signalSetName, ssInfo); 216 } 217 log.info("\tAction added!!!"); 218 log.info("\tActions registered = " + ssInfo.getNumberRegisteredActions()); 219 } 220 221 238 public void addGlobalAction(Action action, int priority) 239 throws SystemException , java.lang.IllegalStateException { 240 241 Set keys = signalSets.keySet(); 243 Iterator it = keys.iterator(); 244 while (it.hasNext()) { 246 SignalSetInfo ssInfo = (SignalSetInfo) it.next(); 247 ssInfo.addAction(action, priority, true); 248 } 249 } 250 251 262 public void removeAction(Action action, String signalSetName) 263 throws ActionNotFoundException , SystemException , IllegalStateException { 264 265 boolean result = false; 266 267 if ((status == Status.StatusCompleting) 268 || (status == Status.StatusCompleted) 269 || (status == Status.StatusCompletingHeuristic)) 270 throw new IllegalStateException (); 271 272 if (!signalSets.containsKey(signalSetName)) 273 throw new SystemException ( 274 "SignalSet " + signalSetName + " not registered"); 275 result = 276 ((SignalSetInfo) signalSets.get(signalSetName)).removeAction( 277 action, 278 false); 279 280 if (!result) 281 throw new ActionNotFoundException (); 282 } 283 284 292 public void removeGlobalAction(Action action) 293 throws ActionNotFoundException , SystemException , IllegalStateException { 294 295 if ((status == Status.StatusCompleting) 296 || (status == Status.StatusCompleted) 297 || (status == Status.StatusCompletingHeuristic)) 298 throw new IllegalStateException (); 299 300 Set keys = signalSets.keySet(); 302 Iterator it = keys.iterator(); 303 while (it.hasNext()) { 305 SignalSetInfo ssInfo = (SignalSetInfo) it.next(); 306 ssInfo.removeAction(action, true); 307 } 308 } 309 310 316 public Action[] getActions(java.lang.String signalSetName) 317 throws SignalSetUnknownException , SystemException { 318 319 if (!signalSets.containsKey(signalSetName)) 320 throw new SignalSetUnknownException (); 321 322 return ((SignalSetInfo) signalSets.get(signalSetName)).getActions(); 323 } 324 325 330 public int getNumberRegisteredActions(String signalSetName) 331 throws SignalSetUnknownException , SystemException { 332 333 if (!signalSets.containsKey(signalSetName)) 334 throw new SignalSetUnknownException (); 335 336 return ((SignalSetInfo) signalSets.get(signalSetName)) 337 .getNumberRegisteredActions(); 338 } 339 340 348 public void setCompletionSignalSetName(String signalSetName) 349 throws SignalSetUnknownException , SystemException , IllegalStateException { 350 351 completionSSName = signalSetName; 352 } 353 354 358 public String getCompletionSignalSetName() throws SystemException { 359 360 return completionSSName; 361 } 362 363 367 public ActivityCoordinator getParent() throws SystemException { 368 369 return superior; 370 } 371 372 375 public GlobalId getGlobalId() throws SystemException { 376 377 return activityId; 378 } 379 380 383 public int getStatus() throws SystemException { 384 385 return status; 386 } 387 388 393 public int getParentStatus() throws SystemException { 394 395 if (superior != null) 396 return superior.getStatus(); 397 else 398 return status; 399 } 400 401 405 public String getName() throws SystemException { 406 return activityId.print(); 407 } 408 409 413 public boolean isSameActivity(ActivityCoordinator coord) 414 throws SystemException { 415 416 if (coord != null && coord instanceof ActivityImpl) 417 return this.activityId.equals(coord.getGlobalId()); 418 419 return false; 420 } 421 422 437 public Outcome completeActivity(int completionStatus) 438 throws 439 ActivityPendingException , 440 ContextPendingException , 441 NotOriginatorException , 442 InvalidStateException , 443 ActivityNotProcessedException , 444 SystemException { 445 this.completionStatus = completionStatus; 447 return completeActivity(); 449 } 450 451 454 public Outcome heuristicComplete(int completionStatus) 455 throws 456 ActivityPendingException , 457 ContextPendingException , 458 InvalidStateException , 459 ActivityNotProcessedException , 460 SystemException { 461 return null; 462 } 463 464 470 public Outcome processSignalSet(String signalSetName, int completionStatus) 471 throws 472 SignalSetUnknownException , 473 ActivityNotProcessedException , 474 InvalidActivityException , 475 SystemException { 476 477 SignalSet processingSS = null; 478 Signal signal = null; 479 Outcome out = null; 480 Action[] actions = null; 481 482 if (done) 484 throw new InvalidActivityException (); 485 if (!signalSets.containsKey(signalSetName)) 487 throw new SignalSetUnknownException (); 488 489 try { 491 processingSS = getSignalSet(signalSetName); 492 processingSS.setCompletionStatus(completionStatus, status); 493 } catch (SignalSetNotRegisteredException e) { 494 status = Status.StatusError; 495 throw new ActivityNotProcessedException (); 496 } 497 498 log.info("Processing " + signalSetName); 499 500 if ((actions = getActions(signalSetName)) == null) 501 log.info("\tNO actions registered with " + signalSetName); 502 else { try { 504 deliverSignals(processingSS, actions); 506 out = processingSS.getOutcome(); 508 } catch (Exception e) { 509 e.printStackTrace(); 510 } 511 } 512 513 return out; 514 } 515 516 518 521 public void timedOut(Timeout timeout) { 522 try { 523 lock(); 524 log.warn("Activity " + activityId.print() + " TIMED OUT !!!!!"); 525 log.warn(" Status = " + getStringStatus(status)); 526 if (this.timeout == null) 527 return; this.timeout = null; 529 timedOut = true; 530 switch (status) { 531 case Status.StatusCompleting : 532 case Status.StatusCompleted : 533 return; 535 case Status.StatusActive : 536 case Status.StatusError : 537 markChildActivitiesWithFail(); 540 return; 541 default : 542 log.warn("Unknown status at timeout, Activity = " + toString()); 543 status = Status.StatusUnknown; 544 return; 545 } 546 } finally { 547 unlock(); 548 } 549 } 550 551 553 556 public void associateCurrentThread() { 557 558 log.info( 559 "Associating thread " 560 + getThreadName() 561 + " with activity " 562 + activityId.print()); 563 threads.add(Thread.currentThread()); 564 log.warn("Associated threads = " + threads.size()); 565 } 566 567 570 public void disassociateCurrentThread() { 571 572 log.warn( 573 "Disassociating thread " 574 + getThreadName() 575 + " from activity " 576 + activityId.print()); 577 threads.remove(Thread.currentThread()); 578 log.warn("Associated threads = " + threads.size()); 579 Thread.interrupted(); 580 } 581 582 588 public String getStringStatus(int status) { 589 590 switch (status) { 591 case Status.StatusActive : 592 return "STATUS_ACTIVE"; 593 case Status.StatusCompleted : 594 return "STATUS_COMPLETED"; 595 case Status.StatusCompleting : 596 return "STATUS_COMPLETING"; 597 case Status.StatusCompletingHeuristic : 598 return "STATUS_COMPLETING_HEURISTIC"; 599 case Status.StatusError : 600 return "STATUS_ERROR"; 601 case Status.StatusNoActivity : 602 return "STATUS_NO_ACTIVITY"; 603 case Status.StatusUnknown : 604 return "STATUS_UNKNOWN"; 605 606 default : 607 return "STATUS_UNKNOWN(" + status + ")"; 608 } 609 } 610 616 public String getStringCompletionStatus(int completionStatus) { 617 618 switch (completionStatus) { 619 case CompletionStatus.CompletionStatusFail : 620 return "COMPLETION_STATUS_FAIL"; 621 case CompletionStatus.CompletionStatusFailOnly : 622 return "COMPLETION_STATUS_FAIL_ONLY"; 623 case CompletionStatus.CompletionStatusSuccess : 624 return "COMPLETION_STATUS_SUCCESS"; 625 626 default : 627 return "COMPLETION_STATUS_UNKNOWN(" + completionStatus + ")"; 628 } 629 } 630 631 633 636 protected Outcome completeActivity() 637 throws 638 ActivityPendingException , 639 ContextPendingException , 640 NotOriginatorException , 641 InvalidStateException , 642 ActivityNotProcessedException , 643 SystemException { 644 645 SignalSet processingSS = null; 646 Outcome out = null; 647 Action[] actions = null; 648 649 log.info(getThreadName() + " COMPLETING ACTIVITY " + activityId.print()); 650 651 if ((completionStatus == CompletionStatus.CompletionStatusSuccess) 652 && (childs.size() > 0)) 653 throw new ContextPendingException (); 654 if (threads.size() > 1) 655 throw new ActivityPendingException (); 656 657 status = Status.StatusCompleting; 658 659 if (timedOut) completionStatus = CompletionStatus.CompletionStatusFail; 661 if (timeout != null) 662 timeout.cancel(); 663 664 log.info("\tWITH " + getStringCompletionStatus(completionStatus)); 665 666 if (completionSSName == null) { 667 completionSSName = manager.getService().getCompletionSignalSetName(); 669 if (completionSSName == null) { 671 log.info("\tThere is no completion SignalSet to process."); 672 log.info( 674 getThreadName() + " " + activityId.print() + " COMPLETED!!!"); 675 instanceDone(); 676 return out; 677 } 678 } 679 680 try { 682 if (signalSets.containsKey(completionSSName)) { 683 log.debug("SignalSet " + completionSSName + " already registered"); 684 processingSS = getSignalSet(completionSSName); 685 actions = getActions(completionSSName); 686 } else { 687 log.debug("SignalSet " + completionSSName + " not registered"); 688 processingSS = manager.getService().getSignalSet(completionSSName); 689 processingSS.setActivityCoordinator(this); 690 } 691 } catch (Exception e) { 692 status = Status.StatusError; 695 throw new ActivityNotProcessedException (); 696 } 697 698 log.info("\tProcessing Completion SignalSet -> " + completionSSName); 699 processingSS.setCompletionStatus(completionStatus, status); 700 701 try { 702 if (actions == null) 703 log.info("\tNO actions registered"); 704 else { 705 deliverSignals(processingSS, actions); 706 int signalSetCompStat = processingSS.getCompletionStatus(); 708 if ((signalSetCompStat 709 != CompletionStatus.CompletionStatusFailOnly) 710 && (completionStatus == CompletionStatus.CompletionStatusFailOnly)) 711 throw new InvalidStateException (); 712 completionStatus = signalSetCompStat; 713 out = processingSS.getOutcome(); 715 } 716 processingSS.destroy(); 717 log.info("\tCompletion SignalSet processing completed."); 718 log.info( 719 "\tFinal CompletionStatus = " 720 + getStringCompletionStatus(completionStatus)); 721 status = Status.StatusCompleted; 722 } catch (Exception e) { 723 e.printStackTrace(); 724 } finally { 725 log.info(getThreadName() + " " + activityId.print() + " COMPLETED!!!"); 727 instanceDone(); 728 } 729 return out; 730 } 731 732 738 private void deliverSignals(SignalSet signalSet, Action[] actions) 739 throws ActionErrorException { 740 741 Signal signal = null; 742 CoordinationInformation info = null; 743 Outcome out = null; 744 745 while ((signal = signalSet.getSignal()) != null) { 746 int i = 0; 747 boolean nextSignal = false; 748 log.info("\tSending " + signal.getName() + " signal..."); 749 log.info("\t# of registered actions = " + actions.length); 750 while ((i < actions.length) && (nextSignal == false)) { 751 if (actions[i] != null) { 752 log.debug("\tSending number: " + i); 753 out = actions[i].processSignal(signal); 754 try { 756 info = signalSet.setResponse(out); 757 } catch (SignalSetInactiveException e) { 758 e.printStackTrace(); 759 } 760 if ((info != null) && (info.isActionInterested() == false)) { 761 SignalSetInfo ssi = 764 ((SignalSetInfo) signalSets 765 .get(signalSet.getSignalSetName())); 766 ssi.removeAction(actions[i], false); 767 actions[i] = null; 769 } 770 if ((info != null) && (info.useNextSignal() == true)) { 771 nextSignal = true; 772 break; } 774 } 775 i = i + 1; 776 } 777 } 778 } 779 780 783 protected void setCompletionStatus(int completionStatus) 784 throws InvalidStateException { 785 786 if (this.completionStatus == CompletionStatus.CompletionStatusFailOnly) 787 throw new InvalidStateException (); 788 this.completionStatus = completionStatus; 789 } 790 791 794 protected int getCompletionStatus() { 795 return completionStatus; 796 } 797 798 802 protected void addChild(ActivityImpl child) { 803 childs.add(child); 804 } 805 806 810 protected void removeChild(ActivityImpl child) { 811 childs.remove(child); 812 } 813 814 818 protected ActivityImpl[] getChilds() { 819 ActivityImpl[] array = null; 820 821 if (childs.size() > 0) 822 array = (ActivityImpl[]) childs.toArray(new ActivityImpl[childs.size()]); 823 return array; 824 } 825 826 828 835 private SignalSet getSignalSet(String signalSetName) 836 throws SignalSetNotRegisteredException { 837 838 SignalSetInfo signalSet = (SignalSetInfo) signalSets.get(signalSetName); 839 if (signalSet == null) 840 throw new SignalSetNotRegisteredException(); 841 return signalSet.getSignalSet(); 842 } 843 844 848 private void markChildActivitiesWithFail() { 849 ActivityImpl child = null; 850 if (childs != null) { 851 try { 852 log.debug("Marking with FAIL childs... of " + getName()); 853 Iterator it = childs.iterator(); 854 while (it.hasNext()) { 855 child = (ActivityImpl) it.next(); 856 child.setCompletionStatus(CompletionStatus.CompletionStatusFail); 857 log.debug("Child " + child.getName() + " marked!!!"); 858 child.markChildActivitiesWithFail(); 859 } 860 } catch (SystemException e) { 861 e.printStackTrace(); 862 } catch (InvalidStateException e) { 863 log.debug("Child " + child + "is already marked as FAIL ONLY"); 864 } 865 } 866 } 867 868 871 private synchronized void lock() { 872 if (done) 873 throw new IllegalStateException ("Activity has terminated"); 874 if (locked) { 875 log.warn("Lock contention, activity=" + toString()); 876 while (locked) { 878 try { 879 wait(); 883 } catch (InterruptedException ex) { 884 } 886 if (done) 887 throw new IllegalStateException ("Activity has now terminated"); 888 } 889 } 890 locked = true; 891 } 892 893 896 private synchronized void unlock() { 897 if (!locked) { 898 log.warn( 899 "Unlocking, but not locked, activity=" + toString(), 900 new Throwable ("[Stack trace]")); 901 } 902 locked = false; 903 notify(); 904 } 905 906 909 private synchronized void instanceDone() { 910 if (superior != null) 912 superior.removeChild(this); 913 manager.releaseActivityImpl(this); 915 status = Status.StatusNoActivity; 917 notifyAll(); 919 done = true; 921 } 922 923 927 private String getThreadName() { 928 return Thread.currentThread().getName(); 929 } 930 931 933 935 private Transaction associatedTransaction = null; 936 937 public void setTransaction(Transaction tx) { 938 associatedTransaction = tx; 939 } 940 941 public Transaction getTransaction() { 942 return associatedTransaction; 943 } 944 945 } | Popular Tags |