| 1 package org.enhydra.shark; 2 3 import java.util.ArrayList ; 4 import java.util.Collection ; 5 import java.util.HashMap ; 6 import java.util.HashSet ; 7 import java.util.Iterator ; 8 import java.util.List ; 9 import java.util.Map ; 10 import java.util.Set ; 11 12 import org.enhydra.shark.api.RootException; 13 import org.enhydra.shark.api.SharkTransaction; 14 import org.enhydra.shark.api.TransactionException; 15 import org.enhydra.shark.api.client.wfbase.BaseException; 16 import org.enhydra.shark.api.client.wfmodel.AlreadyRunning; 17 import org.enhydra.shark.api.client.wfmodel.AlreadySuspended; 18 import org.enhydra.shark.api.client.wfmodel.CannotResume; 19 import org.enhydra.shark.api.client.wfmodel.CannotStart; 20 import org.enhydra.shark.api.client.wfmodel.CannotStop; 21 import org.enhydra.shark.api.client.wfmodel.CannotSuspend; 22 import org.enhydra.shark.api.client.wfmodel.InvalidData; 23 import org.enhydra.shark.api.client.wfmodel.InvalidPerformer; 24 import org.enhydra.shark.api.client.wfmodel.InvalidState; 25 import org.enhydra.shark.api.client.wfmodel.NotRunning; 26 import org.enhydra.shark.api.client.wfmodel.NotSuspended; 27 import org.enhydra.shark.api.client.wfmodel.ResultNotAvailable; 28 import org.enhydra.shark.api.client.wfmodel.TransitionNotAllowed; 29 import org.enhydra.shark.api.client.wfmodel.UpdateNotAllowed; 30 import org.enhydra.shark.api.client.wfmodel.WfCreateProcessEventAudit; 31 import org.enhydra.shark.api.client.wfmodel.WfDataEventAudit; 32 import org.enhydra.shark.api.client.wfmodel.WfEventAudit; 33 import org.enhydra.shark.api.client.wfmodel.WfRequester; 34 import org.enhydra.shark.api.client.wfmodel.WfStateEventAudit; 35 import org.enhydra.shark.api.common.SharkConstants; 36 import org.enhydra.shark.api.internal.instancepersistence.ActivityPersistenceInterface; 37 import org.enhydra.shark.api.internal.instancepersistence.AndJoinEntryInterface; 38 import org.enhydra.shark.api.internal.instancepersistence.PersistenceException; 39 import org.enhydra.shark.api.internal.instancepersistence.PersistentManagerInterface; 40 import org.enhydra.shark.api.internal.instancepersistence.ProcessPersistenceInterface; 41 import org.enhydra.shark.api.internal.instancepersistence.ProcessVariablePersistenceInterface; 42 import org.enhydra.shark.api.internal.limitagent.LimitAgentException; 43 import org.enhydra.shark.api.internal.limitagent.LimitAgentManager; 44 import org.enhydra.shark.api.internal.scripting.Evaluator; 45 import org.enhydra.shark.api.internal.toolagent.ToolAgentGeneralException; 46 import org.enhydra.shark.api.internal.working.WfActivityInternal; 47 import org.enhydra.shark.api.internal.working.WfProcessInternal; 48 import org.enhydra.shark.api.internal.working.WfProcessMgrInternal; 49 import org.enhydra.shark.api.internal.working.WfRequesterInternal; 50 import org.enhydra.shark.utilities.MiscUtilities; 51 import org.enhydra.shark.xpdl.XMLCollectionElement; 52 import org.enhydra.shark.xpdl.XMLUtil; 53 import org.enhydra.shark.xpdl.XPDLConstants; 54 import org.enhydra.shark.xpdl.elements.Activities; 55 import org.enhydra.shark.xpdl.elements.Activity; 56 import org.enhydra.shark.xpdl.elements.ActivitySet; 57 import org.enhydra.shark.xpdl.elements.BlockActivity; 58 import org.enhydra.shark.xpdl.elements.Condition; 59 import org.enhydra.shark.xpdl.elements.ProcessHeader; 60 import org.enhydra.shark.xpdl.elements.Transition; 61 import org.enhydra.shark.xpdl.elements.Transitions; 62 import org.enhydra.shark.xpdl.elements.WorkflowProcess; 63 64 69 public class WfProcessImpl extends WfExecutionObjectImpl implements WfProcessInternal { 70 71 private String actRequesterId; 72 private String actRequestersProcessId; 73 74 private String resRequesterId; 75 private String managerName; 76 private String pkgId; 77 private String pDefId; 78 private String mgrVer; 79 80 protected Map processContext; 81 82 private Evaluator evaluator; 84 private List lastFinishedActivities=new ArrayList (); 85 private WfProcessMgrInternal manager; 86 private Map activeActivitiesMap; 87 private Map tmpActivitiesMap; 88 private boolean isRunning=false; 89 90 private long creationTime=Long.MAX_VALUE/2; 91 private long startTime=Long.MAX_VALUE/2; 92 93 protected WorkflowProcess xpdlProcess; 94 95 protected Set variableIdsToPersist=new HashSet (); 96 protected Map activityToFollowedTransitions=new HashMap (); 97 protected Map newActivityToFollowedTransitions=new HashMap (); 98 99 protected SharkTransaction initialTransaction; 100 protected Thread startingThread=null; 101 protected WfActivityInternal actRequester; 102 103 private boolean terminateOrAbortFromActivity=false; 104 105 private String externalRequesterClassName=null; 106 107 protected boolean justCreated=false; 108 protected boolean justCreatedVariables=false; 109 120 protected WfProcessImpl(SharkTransaction t, 121 WfProcessMgrInternal manager, 122 WfRequesterInternal requester, 123 String key) throws BaseException { 124 this.key=key; 125 this.manager=manager; 126 this.managerName=manager.name(t); 127 this.justCreated=true; 128 this.justCreatedVariables=true; 129 setXPDLAttribs(); 130 if (requester.getExternalRequester(t)!=null) { 131 this.externalRequesterClassName=requester.getExternalRequester(t).getClass().getName(); 132 } 133 if (requester !=null && requester instanceof WfActivityInternal) { 134 this.actRequesterId=((WfActivityInternal)requester).key(t); 135 this.actRequestersProcessId=((WfActivityInternal)requester).process_id(t); 136 this.actRequester=(WfActivityInternal)requester; 137 this.resRequesterId=this.actRequester.getResourceRequesterUsername(t); 138 this.initialTransaction=t; 139 } else if (requester !=null && requester instanceof WfDefaultRequester) { 140 this.resRequesterId=((WfDefaultRequester)requester).getResourceRequesterUsername(t); 141 } 142 143 SharkUtilities.addProcessToCache(t,this); 144 name=getProcessDefinition(t).getName(); 145 if (name.equals("")) { 146 name=getProcessDefinition(t).getId(); 147 } 148 ProcessHeader ph=((WorkflowProcess)getXPDLObject(t)).getProcessHeader(); 149 description=ph.getDescription(); 150 if (description!=null && description.length()>254) { 151 description=description.substring(0,253); 152 } 153 try { 154 priority=Integer.valueOf(ph.getPriority()).shortValue(); 155 } catch (Exception ex) { 156 priority=3; 157 } 158 159 lastStateTime = System.currentTimeMillis(); 160 creationTime= lastStateTime; 161 if (SharkEngineManager.getInstance().getEventAuditManager()!=null || externalRequesterClassName!=null) { 162 WfCreateProcessEventAudit cpea=SharkEngineManager 163 .getInstance() 164 .getObjectFactory() 165 .createCreateProcessEventAuditWrapper(t,this,requester); 166 if (externalRequesterClassName!=null) { 167 notifyRequester(t,cpea); 168 } 169 170 } 171 lastStateEventAudit=SharkEngineManager. 172 getInstance(). 173 getObjectFactory(). 174 createStateEventAuditWrapper(t, 175 this, 176 SharkConstants.EVENT_PROCESS_STATE_CHANGED, 177 null, 178 state); 179 if (externalRequesterClassName!=null) { 180 notifyRequester(t,lastStateEventAudit); 181 } 182 activeActivitiesMap=new HashMap (); 183 184 initializeProcessContext(t); 185 if (processContext.size()>0) { 186 variableIdsToPersist.addAll(getContext(t).keySet()); 187 188 if (SharkEngineManager.getInstance().getEventAuditManager()!=null || externalRequesterClassName!=null) { 189 WfDataEventAudit dea=SharkEngineManager 190 .getInstance() 191 .getObjectFactory() 192 .createDataEventAuditWrapper(t, 193 this, 194 SharkConstants.EVENT_PROCESS_CONTEXT_CHANGED, 195 null, 196 new HashMap (processContext)); 197 if (externalRequesterClassName!=null) { 198 notifyRequester(t, dea); 199 } 200 } 201 } 202 activityToFollowedTransitions=new HashMap (); 203 try { 204 persist(t); 205 persistProcessContext(t); 206 } catch (TransactionException tme) { 207 throw new BaseException(tme); 208 } 209 210 SharkEngineManager.getInstance().getCallbackUtilities().info("Process "+this+" is created"); 211 } 212 213 216 protected WfProcessImpl (ProcessPersistenceInterface po) { 217 restore(po); 218 } 219 220 229 public WfRequesterInternal requester (SharkTransaction t) throws BaseException { 230 WfRequesterInternal requester=null; 231 if (this.actRequesterId!=null) { 232 if (actRequester!=null && 233 ((initialTransaction!=null && initialTransaction.equals(t)) || 234 (startingThread!=null && startingThread.equals(Thread.currentThread())))) { 235 requester=actRequester; 236 } else { 237 WfProcessInternal pReq=SharkUtilities.getProcess(t,this.actRequestersProcessId); 238 if (pReq!=null) { 239 requester=pReq.getActivity(t,this.actRequesterId); 240 } 241 } 242 } 243 WfRequester extReq=null; 244 if (requester==null) { 245 if (externalRequesterClassName!=null) { 246 try { 247 extReq=(WfRequester)Class.forName(externalRequesterClassName).newInstance(); 248 } catch (Exception ex) { 249 SharkEngineManager.getInstance().getCallbackUtilities().warn("Can't create external requester - "+externalRequesterClassName+" is not in the classpath, or it doesn't have default constructor."); 250 } 251 } 252 if (this.resRequesterId!=null) { 253 requester=SharkEngineManager.getInstance().getObjectFactory().createDefaultRequester(this.resRequesterId,extReq); 254 } else { 255 System.err.println("Process Requester is missing - maybe the parent process is deleted. Empty requester will be returned !"); 257 requester=SharkEngineManager.getInstance().getObjectFactory().createDefaultRequester("",extReq); 258 259 } 260 } 261 return requester; 262 } 263 264 public void setExternalRequesterClassName (SharkTransaction t,String extReqClassName) throws BaseException { 265 this.externalRequesterClassName=extReqClassName; 266 try { 267 persist(t); 268 } catch (TransactionException tme) { 269 throw new BaseException(tme); 270 } 271 } 272 273 283 public int how_many_step (SharkTransaction t) throws BaseException { 284 return getActiveActivitiesMap(t).size(); 287 } 288 289 298 public WfProcessMgrInternal manager (SharkTransaction t) throws BaseException { 299 if (manager==null) { 300 manager=SharkUtilities.getProcessMgr(t, managerName); 301 if (manager==null) { 302 throw new BaseException("process "+this+" - can't find manager "+managerName); 303 } 304 } 305 return manager; 306 } 307 308 309 319 public Map result (SharkTransaction t) throws BaseException, ResultNotAvailable { 320 Map resultSigLHM = manager(t).result_signature(t); 321 Map results = new HashMap (); 322 323 if (resultSigLHM != null) { 324 Set resultKeys = resultSigLHM.keySet(); 325 Iterator i = resultKeys.iterator(); 326 while (i.hasNext()) { 327 String fpId =(String )i.next(); 328 try { 329 results.put(fpId,MiscUtilities.cloneWRD(getContext(t).get(fpId))); 330 } catch (Throwable thr) { 331 throw new BaseException(thr); 332 } 333 } 334 } 335 return results; 336 } 337 338 348 public void start (SharkTransaction t) throws BaseException, CannotStart, AlreadyRunning, ToolAgentGeneralException { 349 if (state(t).equals(SharkConstants.STATE_OPEN_RUNNING)) { 350 throw new AlreadyRunning("The process is already running - can't start again!"); 351 } 352 if (state(t).startsWith(SharkConstants.STATEPREFIX_CLOSED)) { 353 throw new CannotStart("The process is closed - can't start it!"); 354 } 355 356 if (getProcessDefinition(t).getStartingActivities().size()==0) { 357 throw new CannotStart("There are no starting activities in the process - can't start it!"); 358 } 359 360 try { 361 startingThread=Thread.currentThread(); 362 initialTransaction=null; 363 change_state(t,SharkConstants.STATE_OPEN_RUNNING); 364 SharkEngineManager.getInstance().getCallbackUtilities().info("Starting Process "+this); 365 run(t, null); 366 } catch (InvalidState is) { 367 throw new CannotStart(is); 368 } catch (TransitionNotAllowed tna) { 369 throw new CannotStart(tna); 370 } catch (ToolAgentGeneralException tage) { 371 try { 372 change_state(t,SharkConstants.STATE_CLOSED_TERMINATED); 373 } catch (Exception ex) { 374 throw new BaseException(ex); 375 } 376 throw tage; 377 } finally{ 378 actRequester=null; 379 startingThread=null; 380 } 381 382 } 383 384 394 protected void change_state (SharkTransaction t,String new_state) throws BaseException, InvalidState, TransitionNotAllowed { 395 if (!SharkUtilities.valid_process_states(state(t)).contains(new_state)) { 397 throw new TransitionNotAllowed("Can't change to state "+new_state+", from state "+state+" !"); 398 } 399 400 if (new_state.equals(SharkConstants.STATE_OPEN_RUNNING)) { 401 startTime = System.currentTimeMillis(); 402 if (SharkEngineManager.getInstance().getLimitAgentManager()!=null) { 408 this.activateLimitAgent(t); 409 } 410 } 411 String oldState=state; 413 state=new_state; 414 415 lastStateTime = System.currentTimeMillis(); 416 try { 417 persist(t); 418 } catch (TransactionException tme) { 419 throw new BaseException(tme); 420 } 421 422 String eventType=SharkConstants.EVENT_PROCESS_STATE_CHANGED; 423 lastStateEventAudit = SharkEngineManager 424 .getInstance() 425 .getObjectFactory() 426 .createStateEventAuditWrapper(t, this, eventType,oldState,new_state); 427 if (state.startsWith(SharkConstants.STATEPREFIX_CLOSED)) { 428 if (!terminateOrAbortFromActivity && (externalRequesterClassName!=null || actRequesterId!=null)) { 429 notifyRequester(t,lastStateEventAudit); 430 } 431 if (state.equals(SharkConstants.STATE_CLOSED_COMPLETED)) { 432 try { 433 delete(t); 434 } catch (TransactionException te) { 435 throw new BaseException(te); 436 } 437 } 438 439 LimitAgentManager mgr = SharkEngineManager.getInstance().getLimitAgentManager(); 441 if (mgr != null) { 442 try { 443 mgr.notifyStop(key,null); 444 } catch (LimitAgentException e) { 445 throw new BaseException(e); 446 } 447 } 448 } else { 449 if (externalRequesterClassName!=null) { 450 notifyRequester(t, lastStateEventAudit); 451 } 452 } 453 454 } 455 456 468 public void set_process_context (SharkTransaction t,Map newValue) throws BaseException, InvalidData, UpdateNotAllowed { 469 Map oldValues=new HashMap (); 470 Map newChanged=new HashMap (); 471 Iterator it=newValue.entrySet().iterator(); 472 while (it.hasNext()) { 473 Map.Entry me=(Map.Entry )it.next(); 474 String id=(String )me.getKey(); 475 Object val=me.getValue(); 476 if (getContext(t).containsKey(id)) { 479 Object oldVal=getContext(t).get(id); 481 if (SharkUtilities.checkDataType(t,getProcessDefinition(t),id,oldVal,val)) { 482 if ((oldVal!=null && !oldVal.equals(val)) || (oldVal==null && val!=null)) { 484 oldValues.put(id,oldVal); 485 newChanged.put(id,val); 486 } 487 } else { 488 throw new InvalidData("Invalid data type for process variable "+id); 489 } 490 } else { 491 throw new UpdateNotAllowed("Context attribute "+id+" does not exist in process context - adding new attributes to the process context is not allowed"); 492 } 493 } 494 495 if (newChanged.size()>0) { 496 getContext(t).putAll(newChanged); 497 variableIdsToPersist.addAll(newChanged.keySet()); 498 persistProcessContext(t); 499 if (SharkEngineManager.getInstance().getEventAuditManager()!=null || externalRequesterClassName!=null) { 500 boolean persistOldEventAuditData=new Boolean ( 501 SharkEngineManager 502 .getInstance() 503 .getCallbackUtilities() 504 .getProperty("PERSIST_OLD_EVENT_AUDIT_DATA","true")).booleanValue(); 505 if (!persistOldEventAuditData) { 506 oldValues=null; 507 } 508 WfDataEventAudit dea=SharkEngineManager.getInstance().getObjectFactory(). 509 createDataEventAuditWrapper(t, this, 510 SharkConstants.EVENT_PROCESS_CONTEXT_CHANGED, 511 oldValues, 512 newChanged); 513 if (externalRequesterClassName!=null) { 514 notifyRequester(t, dea); 515 } 516 } 517 } 518 } 519 520 523 public void resume(SharkTransaction t) throws BaseException, CannotResume, NotSuspended { 524 try { 525 if (!state(t).equals(SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED)) { 526 throw new NotSuspended("The process is not suspended - can't resume it!"); 527 } 528 WfRequesterInternal requester=requester(t); 531 if (requester!=null && (requester instanceof WfActivityInternal)) { 532 WfActivityInternal waImpl=(WfActivityInternal)requester; 533 if (waImpl.state(t).equals(SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED)) { 534 if (waImpl.isPerformerSynchronous(t)) { 536 SharkEngineManager.getInstance().getCallbackUtilities().error("Process"+toString()+" - Cannot resume because the requester activity is suspended"); 537 throw new CannotResume("Cannot resume - The requester activity is suspended"); 538 } 539 } 540 } 541 SharkEngineManager.getInstance().getCallbackUtilities().info("Resuming process "+this); 542 change_state(t, SharkConstants.STATE_OPEN_RUNNING); 543 544 Iterator it=getActiveActivities(t).iterator(); 545 while (it.hasNext()) { 546 WfActivityInternal act=(WfActivityInternal)it.next(); 547 if (act.state(t).equals(SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED)) { 548 if (act.block_activity_id(t)==null) { 549 act.resume(t); 550 } 551 } 552 } 553 } catch (InvalidState is) { 554 throw new CannotResume(is); 555 } catch (TransitionNotAllowed tna) { 556 throw new CannotResume(tna); 557 } 558 } 559 560 563 public void suspend(SharkTransaction t) throws BaseException, CannotSuspend, NotRunning, AlreadySuspended { 564 if (state(t).equals(SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED)) { 565 throw new AlreadySuspended("The process is already suspended - can't suspend it twice!"); 566 } 567 if (state(t).startsWith(SharkConstants.STATEPREFIX_OPEN_NOT_RUNNING)) { 568 throw new NotRunning("The process is not in the running state"); 569 } 570 try { 571 SharkEngineManager.getInstance().getCallbackUtilities().info("Suspending process "+this); 572 change_state(t,SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED); 573 574 Iterator it=getActiveActivities(t).iterator(); 575 while (it.hasNext()) { 576 WfActivityInternal act=(WfActivityInternal)it.next(); 577 String actState=act.state(t); 578 if (!actState.equals(SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED)) { 579 if (act.block_activity_id(t)==null) { 580 act.suspend(t); 581 } 582 } 583 } 584 } catch (InvalidState is) { 585 throw new CannotSuspend(is); 586 } catch (TransitionNotAllowed tna) { 587 throw new CannotSuspend(tna); 588 } 589 } 590 591 public void terminateFromActivity (SharkTransaction t) throws BaseException, CannotStop, NotRunning { 592 terminateOrAbortFromActivity=true; 593 terminate(t); 594 } 595 596 599 public void terminate(SharkTransaction t) throws BaseException, CannotStop, NotRunning { 600 try { 601 String stateStr = SharkConstants.STATE_CLOSED_TERMINATED; 602 if (!state(t).startsWith(SharkConstants.STATEPREFIX_OPEN)) { 603 throw new CannotStop("The process is already closed - can't terminate it!"); 604 } 605 SharkEngineManager.getInstance().getCallbackUtilities().info("Terminating process "+this); 606 607 change_state(t,stateStr); 608 609 Iterator it=getActiveActivities(t).iterator(); 610 while (it.hasNext()) { 611 WfActivityInternal act=(WfActivityInternal)it.next(); 612 if (act.block_activity_id(t)==null) { 613 act.terminateFromProcess(t); 614 } 615 } 616 lastFinishedActivities.clear(); 617 618 if (activeActivitiesMap!=null) { 619 activeActivitiesMap.clear(); 620 } 621 622 624 } catch (InvalidState is) { 625 throw new CannotStop(is); 626 } catch (TransitionNotAllowed tna) { 627 throw new CannotStop(tna); 628 } 631 } 632 633 public void abortFromActivity (SharkTransaction t) throws BaseException, CannotStop, NotRunning { 634 terminateOrAbortFromActivity=true; 635 abort(t); 636 } 637 638 647 public void abort(SharkTransaction t) throws BaseException, CannotStop, NotRunning { 648 String stateStr = SharkConstants.STATE_CLOSED_ABORTED; 649 if (!state(t).startsWith(SharkConstants.STATEPREFIX_OPEN)) { 650 throw new CannotStop("The process is already closed - can't abort it!"); 651 } 652 try { 653 SharkEngineManager.getInstance().getCallbackUtilities().info("Aborting process "+this); 654 change_state(t,stateStr); 655 656 Iterator it=getActiveActivities(t).iterator(); 657 658 while (it.hasNext()) { 659 WfActivityInternal act=(WfActivityInternal)it.next(); 660 if (act.block_activity_id(t)==null) { 661 act.abortFromProcess(t); 662 } 663 } 664 665 lastFinishedActivities.clear(); 666 667 if (activeActivitiesMap!=null) { 668 activeActivitiesMap.clear(); 669 } 670 671 673 } catch (InvalidState is) { 674 throw new CannotStop(is); 675 } catch (TransitionNotAllowed tna) { 676 throw new CannotStop(tna); 677 } 680 } 681 682 protected void run (SharkTransaction t, WfActivityInternal lastFinishedActivity) throws BaseException, ToolAgentGeneralException { 683 isRunning=true; 684 try { 685 if (lastFinishedActivity==null) { 686 List starts=getProcessDefinition(t).getStartingActivities(); 687 688 for (Iterator it=starts.iterator(); it.hasNext();) { 689 String asDefId=null; 690 Activity actDef=(Activity)it.next(); 691 Object owner=actDef.getParent().getParent(); 692 if (owner instanceof ActivitySet) { 693 asDefId=((ActivitySet)owner).getId(); 694 } 695 startActivity(t,asDefId,actDef,null); 696 } 697 } 699 while (lastFinishedActivities.size()>0) { 700 if (!state.equals(SharkConstants.STATE_OPEN_NOT_RUNNING_SUSPENDED)) { 701 if(!state.startsWith(SharkConstants.STATEPREFIX_CLOSED)) { 703 queueNext(t, (WfActivityInternal)lastFinishedActivities.get(0)); 704 } 706 lastFinishedActivities.remove(0); 707 } else { 708 return; 710 } 711 } 712 if (state.startsWith(SharkConstants.STATEPREFIX_CLOSED)) { 713 718 return; 719 } 720 } finally { 721 isRunning=false; 722 } 723 724 } 725 726 735 public void start_activity(SharkTransaction t, Str
|