|                                                                                                              1   package org.enhydra.shark.toolagent;
 2
 3   import java.util.HashMap
  ; 4   import java.util.Iterator
  ; 5   import java.util.List
  ; 6   import java.util.Map
  ; 7
 8   import org.enhydra.shark.Shark;
 9   import org.enhydra.shark.api.SharkTransaction;
 10  import org.enhydra.shark.api.client.wfmodel.WfActivity;
 11  import org.enhydra.shark.api.client.wfservice.AdminInterface;
 12  import org.enhydra.shark.api.client.wfservice.ExecutionAdministration;
 13  import org.enhydra.shark.api.internal.toolagent.AppParameter;
 14  import org.enhydra.shark.api.internal.toolagent.ApplicationBusy;
 15  import org.enhydra.shark.api.internal.toolagent.ApplicationNotDefined;
 16  import org.enhydra.shark.api.internal.toolagent.ApplicationNotStarted;
 17  import org.enhydra.shark.api.internal.toolagent.SessionHandle;
 18  import org.enhydra.shark.api.internal.toolagent.ToolAgent;
 19  import org.enhydra.shark.api.internal.toolagent.ToolAgentGeneralException;
 20  import org.enhydra.shark.api.internal.working.CallbackUtilities;
 21  import org.enhydra.shark.xpdl.XPDLConstants;
 22  import org.enhydra.shark.xpdl.elements.ExtendedAttribute;
 23  import org.enhydra.shark.xpdl.elements.ExtendedAttributes;
 24
 25  import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 26  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 27
 28
 35  public class SchedulerToolAgent extends AbstractToolAgent {
 36
 37     private final static String
  TOOL_AGENT_CLASS_EXT_ATTR_NAME="ToolAgentClass"; 38     private final static String
  TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME="ToolAgentClassProxy"; 39
 40
 43     private void info(String
  infoString) { 44        if (null != cus) {
 45           cus.info(infoString);
 46        }
 47     }
 48     private void error(String
  infoString) { 49        if (null != cus) {
 50           cus.error(infoString);
 51        }
 52     }
 53
 54     public String
  getInfo (SharkTransaction t) throws ToolAgentGeneralException { 55        String
  i= 56           "Wraps a ToolAgent standard call and executes them a separate"+
 57           "\nin thread-pool."+
 58           "\n"+
 59           "\nTo use define an ToolAgentClass extended attribute to the scheduler"+
 60           "\nanother extended attribute 'ToolAgentClassProxy' to the actual"+
 61           "\napplication to be called in a separet thread. For example: "+
 62           "\n<ExtendedAttributes>"+
 63           "\n  <ExtendedAttribute Name=\"ToolAgentClass\" Value=\"org.enhydra.shark.toolagent.SchedulerToolAgent\"/>"+
 64           "\n  <ExtendedAttribute Name=\"ToolAgentClassProxy\" Value=\"org.enhydra.shark.toolagent.BshToolAgent\"/>"+
 65           "\n  <ExtendedAttribute Name=\"Script\" Value=\"System.out.println(\"I was called...\");\"/>"+
 66           "\n</ExtendedAttributes>"+
 67           "\n"+
 68           "\nTo be able to work with SchedulerToolAgent, you must define some "+
 69           "\nproperties, and here is a section from shark's configuration file \"Shark.conf\" "+
 70           "\nthat defines these properties:"+
 71           "\n# Configure number of threads to execute commands and admin user/password: "+
 72           "\nSchedulerToolAgent.threadPoolSize=3"+
 73           "\nSchedulerToolAgent.sharkUsername=admin"+
 74           "\nSchedulerToolAgent.sharkPassword=enhydra"+
 75           "\n"
 76           ;
 77        return i;
 78     }
 79
 80     public void invokeApplication(SharkTransaction t, long handle,
 81                                   String
  applicationName, String  procInstId, String  assId, 82                                   AppParameter[] parameters, Integer
  appMode) 83        throws ApplicationNotStarted, ApplicationNotDefined,
 84        ApplicationBusy, ToolAgentGeneralException {
 85
 86        super.invokeApplication(t, handle, applicationName, procInstId, assId,
 87                                parameters, appMode);
 88        try {
 89
 92           String
  extAttribs=(String  )parameters[0].the_value; 93           ExtendedAttributes eas = readParamsFromExtAttributes(extAttribs);
 94           ExtendedAttribute eaScheduler=eas.getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_EXT_ATTR_NAME);
 95           String
  schedulerClassName = eaScheduler.getVValue(); 96           ExtendedAttribute eaProxy=eas.getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME);
 97           String
  proxyClassName = eaProxy.getVValue(); 98           int idxSchedulerClassName = extAttribs.indexOf(schedulerClassName);
 99           String
  newExtAttribs = extAttribs.substring(0,idxSchedulerClassName) 100             + proxyClassName
 101             + extAttribs.substring(idxSchedulerClassName+schedulerClassName.length());
 102          parameters[0].the_value = newExtAttribs;
 103          readParamsFromExtAttributes(newExtAttribs);
 104
 105          Class
  cls=Class.forName(proxyClassName); 106          ToolAgent ta=(ToolAgent)cls.newInstance();
 107
 108
 111          String
  sharkUsername = cus.getProperty("SchedulerToolAgent.sharkUsername", "admin"); 112          String
  sharkPassword = cus.getProperty("SchedulerToolAgent.sharkPassword", "enhydra"); 113
 114          ta.configure(cus);
 115
 116          ToolAgentCmdProxy taCmdProxy = new ToolAgentCmdProxy(
 117             cus, ta, proxyClassName,
 118             username, password, engineName, scope,
 119             handle, applicationName, procInstId, assId, parameters,
 120             appMode);
 121
 122          SingletonPooledExecutor.getInstance(cus).execute(taCmdProxy);
 123
 124
 128          status = AbstractToolAgent.APP_STATUS_FINISHED;
 129       } catch (Throwable
  ex) { 130          error("SchedulerToolAgent terminated incorrectly: " + ex);
 131          status = AbstractToolAgent.APP_STATUS_INVALID;
 132          throw new ToolAgentGeneralException(ex);
 133       }
 134    }
 135 }
 136
 137
 144 class ToolAgentCmdProxy implements Runnable
  { 145    CallbackUtilities   m_cus;
 146    String
  m_taName; 147    ToolAgent           m_ta;
 148    String
  m_username; 149    String
  m_password; 150    String
  m_engineName; 151    String
  m_scope; 152    long                m_handle;
 153    String
  m_applicationName; 154    String
  m_procInstId; 155    String
  m_assId; 156    AppParameter[]      m_parameters;
 157    AppParameter[]      m_proxyParameters;
 158    Integer
  m_appMode; 159
 160    ToolAgentCmdProxy(CallbackUtilities cus,
 161                      ToolAgent ta, String
  taName, 162                      String
  username, String  password, 163                      String
  engineName, String  scope, 164                      long handle,
 165                      String
  applicationName, String  procInstId, String  assId, 166                      AppParameter[] parameters, Integer
  appMode) { 167       m_cus = cus;
 168       m_ta = ta;
 169       m_taName = taName;
 170       m_username = username;
 171       m_password = password;
 172       m_engineName = engineName;
 173       m_scope = scope;
 174       m_handle = handle;
 175       m_applicationName = applicationName;
 176       m_procInstId = procInstId;
 177       m_assId = assId;
 178       m_parameters = parameters;
 179       m_appMode = appMode;
 180    }
 181
 182
 185    private void info(String
  infoString) { 186       if (null != m_cus) {
 187          m_cus.info(infoString);
 188       }
 189    }
 190    private void error(String
  infoString) { 191       if (null != m_cus) {
 192          m_cus.error(infoString);
 193       }
 194    }
 195
 196
 201    private Map
  getResults() throws Exception  { 202
 205       Map
  results = new HashMap  (); 206       for (int i=0; i<m_parameters.length; ++i) {
 207          AppParameter p = m_parameters[i];
 208          if (p.the_mode.equals(XPDLConstants.FORMAL_PARAMETER_MODE_INOUT)
 209              || p.the_mode.equals(XPDLConstants.FORMAL_PARAMETER_MODE_OUT)) {
 210             results.put(p.the_actual_name,
 211                         convertToProperType(p.the_value, p.the_class));
 212          }
 213       }
 214
 215       return results;
 216    }
 217
 218    private Object
  convertToProperType(Object  toConvert, Class  desiredType) 219       throws Exception
  { 220       if (null == toConvert || desiredType.isInstance(toConvert))
 221          return toConvert;
 222
 223       if (desiredType.equals(Integer
  .class)) { 224          return new Integer
  ((new Integer  (toConvert.toString())).intValue()); 225       } else if (desiredType.equals(Long
  .class)) { 226          return new Long
  ((new Double  (toConvert.toString())).longValue()); 227       } else if (desiredType.equals(Boolean
  .class)) { 228          return new Boolean
  (toConvert.toString()); 229       } else if (desiredType.equals(Double
  .class)) { 230          return new Double
  (toConvert.toString()); 231       } else if (desiredType.equals(java.util.Date
  .class)) { 232          return new java.util.Date
  (toConvert.toString()); 233       }
 234       return toConvert;
 235    }
 236
 237
 240    public void run() {
 241       Thread
  curThread = Thread.currentThread(); 242       String
  oldThreadName = curThread.getName(); 243
 247       long status = AbstractToolAgent.APP_STATUS_INVALID;
 248
 249       SingletonPooledExecutor spe=null;
 250       try {
 251          spe = SingletonPooledExecutor.getInstance(m_cus);
 252       } catch (Exception
  _) {} 253       if (null == spe) {
 254          error("Unable to get thread-pool!");
 255          return;
 256       }
 257
 258       Shark shark = Shark.getInstance();
 259       if (null == shark) {
 260          error("Unable to get Shark engine instance!");
 261          spe.updateCompleteCount(this);
 262          return;
 263       }
 264
 265       SharkTransaction trans = null;
 266       try {
 267          curThread.setName(oldThreadName+"->"+m_taName);
 268          SessionHandle taShandle;
 269
 270          trans = shark.createTransaction();
 271
 272          taShandle=m_ta.connect(trans, m_username,m_password,m_engineName,m_scope);
 273
 274
 280          m_ta.invokeApplication(trans, taShandle.getHandle(), null,
 281                                 m_procInstId, m_assId, m_parameters, m_appMode);
 282
 283          status=m_ta.requestAppStatus(trans,taShandle.getHandle(),m_procInstId,
 284                                       m_assId,m_parameters);
 285          m_ta.disconnect(trans,taShandle);
 286
 287          int finishCount = spe.updateCompleteCount(this);
 288
 289
 292          AdminInterface adminInterface = shark.getAdminInterface();
 293          ExecutionAdministration execAdmin =
 294             adminInterface.getExecutionAdministration();
 295          execAdmin.connect(trans, spe.getSharkUsername(), spe.getSharkPassword(),
 296                            m_engineName, m_scope);
 297          String
  activityId = adminInterface.getAdminMisc() 298             .getAssignmentActivityId(m_procInstId, m_assId);
 299
 300          WfActivity wfActivity = execAdmin.getActivity(trans, m_procInstId, activityId);
 301          wfActivity.set_result(trans, getResults());
 302
 303          if (0 == finishCount) {
 304             wfActivity.complete(trans);
 305          }
 306
 307          execAdmin.disconnect(trans);
 308          trans.commit();
 309       } catch (Throwable
  ex) { 310          error("SchedulerToolAgent -> applicationProxy " + m_taName
 311                   + " terminated incorrectly: " + ex);
 312          status = AbstractToolAgent.APP_STATUS_INVALID;
 313          try {
 314             shark.emptyCaches(trans);
 315             trans.rollback();
 316          } catch (Exception
  _) {  } 317       } finally {
 318          try { shark.unlockProcesses(trans);} catch (Exception
  _){} 319          try {trans.release(); } catch (Exception
  _) {} 320          trans = null;
 321       }
 322
 323
 326       curThread.setName(oldThreadName);
 327    }
 328
 329
 332    public void cancel() {
 333
 337    }
 338
 339
 342    public Object
  getAssId() { 343       return m_assId;
 344    }
 345 }
 346
 347
 352 class SingletonPooledExecutor extends PooledExecutor {
 353    private static final Object
  classLock = SingletonPooledExecutor.class; 354
 355
 356    private static final String
  SHUTDOWN_HOOK_THREAD_NAME = "SingletonPooledExecShutdownHook"; 357
 358
 366    private static boolean              s_shutdown = false;
 367
 368
 369    private static Thread
  s_shutdownHook; 370
 371
 374    private static SingletonPooledExecutor m_spe = null;
 375
 376    private static CallbackUtilities m_cus = null;
 377
 378
 386    private Map
  m_assIdToCount = new HashMap  (); 387
 388
 391    private String
  m_sharkUsername; 392
 393
 396    private String
  m_sharkPassword; 397
 398
 402    private SingletonPooledExecutor(LinkedQueue queue, int threadPoolSize) {
 403       super(queue,threadPoolSize);
 404    }
 405
 406
 409    public String
  getSharkUsername() { 410       return m_sharkUsername;
 411    }
 412
 413
 416    public String
  getSharkPassword() { 417       return m_sharkPassword;
 418    }
 419
 420
 423    private void info(String
  infoString) { 424       if (null != m_cus) {
 425          m_cus.info(infoString);
 426       } else {
 427          System.out.println(infoString);
 428       }
 429    }
 430    private void error(String
  infoString) { 431       if (null != m_cus) {
 432          m_cus.error(infoString);
 433       } else {
 434          System.out.println(infoString);
 435       }
 436    }
 437
 438    private static void shutdown() {
 439       s_shutdown = true;
 440       if (null != m_spe) {
 441          m_spe.shutdownNow();
 442          try {
 443             final long      timeout = 2*1000L;
 444             if (!m_spe.awaitTerminationAfterShutdown(timeout)) {
 445                final int   remainingThreads = m_spe.getPoolSize();
 446                if (0!= remainingThreads) {
 447                   m_spe.info("Threads remaining during shutdown: "
 448                                 +remainingThreads);
 449                }
 450             }
 451
 452             List
  pendingTasks = m_spe.drain(); 453             for (Iterator
  itr = pendingTasks.iterator(); itr.hasNext(); ) { 454                ToolAgentCmdProxy obj = (ToolAgentCmdProxy)itr.next();
 455                m_spe.info("Pending task: "+obj.toString());
 456                obj.cancel();
 457             }
 458          } catch (Exception
  ex) { 459             m_spe.error("Exception during thread pool shutdown: "+ex.toString());
 460          }
 461          m_spe = null;
 462       }
 463    }
 464
 465    public static SingletonPooledExecutor getInstance(CallbackUtilities cus) throws Exception
  { 466       synchronized(classLock) {
 467          if (null == m_spe) {
 468             try {
 469
 472                m_cus = cus;
 473                int threadPoolSize = 3;
 474                try {
 475                   String
  threadPoolSizeStr = 476                      cus.getProperty("SchedulerToolAgent.threadPoolSize",Integer.toString(3));
 477                   threadPoolSize = Integer.parseInt(threadPoolSizeStr);
 478                } catch (Exception
  ex) { 479                                  }
 481                String
  sharkUsername = cus.getProperty("SchedulerToolAgent.sharkUsername", "admin"); 482                String
  sharkPassword = cus.getProperty("SchedulerToolAgent.sharkPassword", "enhydra"); 483
 484
 487                s_shutdownHook =
 488                   new Thread
  () { 489                   public void run() {
 490                      SingletonPooledExecutor.shutdown();
 491                   }
 492
 493                };
 494                s_shutdownHook.setName(
 495                   SingletonPooledExecutor.SHUTDOWN_HOOK_THREAD_NAME);
 496                Runtime.getRuntime().addShutdownHook(s_shutdownHook);
 497
 498                m_spe = new SingletonPooledExecutor(new LinkedQueue(),threadPoolSize);
 499                m_spe.waitWhenBlocked();
 500                m_spe.createThreads(threadPoolSize);
 501                m_spe.m_sharkUsername = sharkUsername;
 502                m_spe.m_sharkPassword = sharkPassword;
 503
 504             } catch (Exception
  ex) { 505                String
  errorMsg = "Exception during thread pool init "+ex.toString(); 506                if (null != m_cus) {
 507                   m_cus.error(errorMsg);
 508                } else {
 509                   System.out.println(errorMsg);
 510                }
 511                if (null != s_shutdownHook) {
 512                   Runtime.getRuntime().removeShutdownHook(s_shutdownHook);
 513                }
 514                s_shutdownHook = null;
 515                if (null != m_spe) {
 516                   SingletonPooledExecutor.shutdown();
 517                }
 518                throw ex;
 519             }
 520          }
 521          return m_spe;
 522       }
 523    }
 524
 525
 526
 529    public void execute(ToolAgentCmdProxy cmdProxy) throws InterruptedException
  { 530       synchronized (m_assIdToCount) {
 531
 535          Object
  assId = cmdProxy.getAssId(); 536          Integer
  curCnt = null == assId ? null 537             : (Integer
  )m_assIdToCount.get(assId); 538          int assIdCnt = null == curCnt ? 1 : curCnt.intValue()+1;
 539          m_assIdToCount.put(assId, new Integer
  (assIdCnt)); 540       }
 541       super.execute(cmdProxy);
 542    }
 543
 544
 548    public int updateCompleteCount(ToolAgentCmdProxy cmdProxy) {
 549       int returnCnt;
 550       synchronized (m_assIdToCount) {
 551          Object
  assId = cmdProxy.getAssId(); 552          Integer
  curCnt = null == assId ? null 553             : (Integer
  )m_assIdToCount.get(assId); 554          if (null == curCnt) {
 555             error("Unable to find cmd count for assId "+assId.toString());
 556             returnCnt=-1;
 557          }
 558
 559
 562          returnCnt = Math.max(curCnt.intValue()-1,0);
 563          m_assIdToCount.put(assId, new Integer
  (returnCnt)); 564       }
 565
 566       return returnCnt;
 567    }
 568
 569 }
 570
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |