1 package org.enhydra.shark.toolagent; 2 3 import java.util.HashMap ; 4 import java.util.Iterator ; 5 import java.util.List ; 6 import java.util.Map ; 7 8 import org.enhydra.shark.Shark; 9 import org.enhydra.shark.api.SharkTransaction; 10 import org.enhydra.shark.api.client.wfmodel.WfActivity; 11 import org.enhydra.shark.api.client.wfservice.AdminInterface; 12 import org.enhydra.shark.api.client.wfservice.ExecutionAdministration; 13 import org.enhydra.shark.api.internal.toolagent.AppParameter; 14 import org.enhydra.shark.api.internal.toolagent.ApplicationBusy; 15 import org.enhydra.shark.api.internal.toolagent.ApplicationNotDefined; 16 import org.enhydra.shark.api.internal.toolagent.ApplicationNotStarted; 17 import org.enhydra.shark.api.internal.toolagent.SessionHandle; 18 import org.enhydra.shark.api.internal.toolagent.ToolAgent; 19 import org.enhydra.shark.api.internal.toolagent.ToolAgentGeneralException; 20 import org.enhydra.shark.api.internal.working.CallbackUtilities; 21 import org.enhydra.shark.xpdl.XPDLConstants; 22 import org.enhydra.shark.xpdl.elements.ExtendedAttribute; 23 import org.enhydra.shark.xpdl.elements.ExtendedAttributes; 24 25 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 26 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 27 28 35 public class SchedulerToolAgent extends AbstractToolAgent { 36 37 private final static String TOOL_AGENT_CLASS_EXT_ATTR_NAME="ToolAgentClass"; 38 private final static String TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME="ToolAgentClassProxy"; 39 40 43 private void info(String infoString) { 44 if (null != cus) { 45 cus.info(infoString); 46 } 47 } 48 private void error(String infoString) { 49 if (null != cus) { 50 cus.error(infoString); 51 } 52 } 53 54 public String getInfo (SharkTransaction t) throws ToolAgentGeneralException { 55 String i= 56 "Wraps a ToolAgent standard call and executes them a separate"+ 57 "\nin thread-pool."+ 58 "\n"+ 59 "\nTo use define an ToolAgentClass extended attribute to the scheduler"+ 60 "\nanother extended attribute 'ToolAgentClassProxy' to the actual"+ 61 "\napplication to be called in a separet thread. For example: "+ 62 "\n<ExtendedAttributes>"+ 63 "\n <ExtendedAttribute Name=\"ToolAgentClass\" Value=\"org.enhydra.shark.toolagent.SchedulerToolAgent\"/>"+ 64 "\n <ExtendedAttribute Name=\"ToolAgentClassProxy\" Value=\"org.enhydra.shark.toolagent.BshToolAgent\"/>"+ 65 "\n <ExtendedAttribute Name=\"Script\" Value=\"System.out.println(\"I was called...\");\"/>"+ 66 "\n</ExtendedAttributes>"+ 67 "\n"+ 68 "\nTo be able to work with SchedulerToolAgent, you must define some "+ 69 "\nproperties, and here is a section from shark's configuration file \"Shark.conf\" "+ 70 "\nthat defines these properties:"+ 71 "\n# Configure number of threads to execute commands and admin user/password: "+ 72 "\nSchedulerToolAgent.threadPoolSize=3"+ 73 "\nSchedulerToolAgent.sharkUsername=admin"+ 74 "\nSchedulerToolAgent.sharkPassword=enhydra"+ 75 "\n" 76 ; 77 return i; 78 } 79 80 public void invokeApplication(SharkTransaction t, long handle, 81 String applicationName, String procInstId, String assId, 82 AppParameter[] parameters, Integer appMode) 83 throws ApplicationNotStarted, ApplicationNotDefined, 84 ApplicationBusy, ToolAgentGeneralException { 85 86 super.invokeApplication(t, handle, applicationName, procInstId, assId, 87 parameters, appMode); 88 try { 89 92 String extAttribs=(String )parameters[0].the_value; 93 ExtendedAttributes eas = readParamsFromExtAttributes(extAttribs); 94 ExtendedAttribute eaScheduler=eas.getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_EXT_ATTR_NAME); 95 String schedulerClassName = eaScheduler.getVValue(); 96 ExtendedAttribute eaProxy=eas.getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME); 97 String proxyClassName = eaProxy.getVValue(); 98 int idxSchedulerClassName = extAttribs.indexOf(schedulerClassName); 99 String newExtAttribs = extAttribs.substring(0,idxSchedulerClassName) 100 + proxyClassName 101 + extAttribs.substring(idxSchedulerClassName+schedulerClassName.length()); 102 parameters[0].the_value = newExtAttribs; 103 readParamsFromExtAttributes(newExtAttribs); 104 105 Class cls=Class.forName(proxyClassName); 106 ToolAgent ta=(ToolAgent)cls.newInstance(); 107 108 111 String sharkUsername = cus.getProperty("SchedulerToolAgent.sharkUsername", "admin"); 112 String sharkPassword = cus.getProperty("SchedulerToolAgent.sharkPassword", "enhydra"); 113 114 ta.configure(cus); 115 116 ToolAgentCmdProxy taCmdProxy = new ToolAgentCmdProxy( 117 cus, ta, proxyClassName, 118 username, password, engineName, scope, 119 handle, applicationName, procInstId, assId, parameters, 120 appMode); 121 122 SingletonPooledExecutor.getInstance(cus).execute(taCmdProxy); 123 124 128 status = AbstractToolAgent.APP_STATUS_FINISHED; 129 } catch (Throwable ex) { 130 error("SchedulerToolAgent terminated incorrectly: " + ex); 131 status = AbstractToolAgent.APP_STATUS_INVALID; 132 throw new ToolAgentGeneralException(ex); 133 } 134 } 135 } 136 137 144 class ToolAgentCmdProxy implements Runnable { 145 CallbackUtilities m_cus; 146 String m_taName; 147 ToolAgent m_ta; 148 String m_username; 149 String m_password; 150 String m_engineName; 151 String m_scope; 152 long m_handle; 153 String m_applicationName; 154 String m_procInstId; 155 String m_assId; 156 AppParameter[] m_parameters; 157 AppParameter[] m_proxyParameters; 158 Integer m_appMode; 159 160 ToolAgentCmdProxy(CallbackUtilities cus, 161 ToolAgent ta, String taName, 162 String username, String password, 163 String engineName, String scope, 164 long handle, 165 String applicationName, String procInstId, String assId, 166 AppParameter[] parameters, Integer appMode) { 167 m_cus = cus; 168 m_ta = ta; 169 m_taName = taName; 170 m_username = username; 171 m_password = password; 172 m_engineName = engineName; 173 m_scope = scope; 174 m_handle = handle; 175 m_applicationName = applicationName; 176 m_procInstId = procInstId; 177 m_assId = assId; 178 m_parameters = parameters; 179 m_appMode = appMode; 180 } 181 182 185 private void info(String infoString) { 186 if (null != m_cus) { 187 m_cus.info(infoString); 188 } 189 } 190 private void error(String infoString) { 191 if (null != m_cus) { 192 m_cus.error(infoString); 193 } 194 } 195 196 201 private Map getResults() throws Exception { 202 205 Map results = new HashMap (); 206 for (int i=0; i<m_parameters.length; ++i) { 207 AppParameter p = m_parameters[i]; 208 if (p.the_mode.equals(XPDLConstants.FORMAL_PARAMETER_MODE_INOUT) 209 || p.the_mode.equals(XPDLConstants.FORMAL_PARAMETER_MODE_OUT)) { 210 results.put(p.the_actual_name, 211 convertToProperType(p.the_value, p.the_class)); 212 } 213 } 214 215 return results; 216 } 217 218 private Object convertToProperType(Object toConvert, Class desiredType) 219 throws Exception { 220 if (null == toConvert || desiredType.isInstance(toConvert)) 221 return toConvert; 222 223 if (desiredType.equals(Integer .class)) { 224 return new Integer ((new Integer (toConvert.toString())).intValue()); 225 } else if (desiredType.equals(Long .class)) { 226 return new Long ((new Double (toConvert.toString())).longValue()); 227 } else if (desiredType.equals(Boolean .class)) { 228 return new Boolean (toConvert.toString()); 229 } else if (desiredType.equals(Double .class)) { 230 return new Double (toConvert.toString()); 231 } else if (desiredType.equals(java.util.Date .class)) { 232 return new java.util.Date (toConvert.toString()); 233 } 234 return toConvert; 235 } 236 237 240 public void run() { 241 Thread curThread = Thread.currentThread(); 242 String oldThreadName = curThread.getName(); 243 247 long status = AbstractToolAgent.APP_STATUS_INVALID; 248 249 SingletonPooledExecutor spe=null; 250 try { 251 spe = SingletonPooledExecutor.getInstance(m_cus); 252 } catch (Exception _) {} 253 if (null == spe) { 254 error("Unable to get thread-pool!"); 255 return; 256 } 257 258 Shark shark = Shark.getInstance(); 259 if (null == shark) { 260 error("Unable to get Shark engine instance!"); 261 spe.updateCompleteCount(this); 262 return; 263 } 264 265 SharkTransaction trans = null; 266 try { 267 curThread.setName(oldThreadName+"->"+m_taName); 268 SessionHandle taShandle; 269 270 trans = shark.createTransaction(); 271 272 taShandle=m_ta.connect(trans, m_username,m_password,m_engineName,m_scope); 273 274 280 m_ta.invokeApplication(trans, taShandle.getHandle(), null, 281 m_procInstId, m_assId, m_parameters, m_appMode); 282 283 status=m_ta.requestAppStatus(trans,taShandle.getHandle(),m_procInstId, 284 m_assId,m_parameters); 285 m_ta.disconnect(trans,taShandle); 286 287 int finishCount = spe.updateCompleteCount(this); 288 289 292 AdminInterface adminInterface = shark.getAdminInterface(); 293 ExecutionAdministration execAdmin = 294 adminInterface.getExecutionAdministration(); 295 execAdmin.connect(trans, spe.getSharkUsername(), spe.getSharkPassword(), 296 m_engineName, m_scope); 297 String activityId = adminInterface.getAdminMisc() 298 .getAssignmentActivityId(m_procInstId, m_assId); 299 300 WfActivity wfActivity = execAdmin.getActivity(trans, m_procInstId, activityId); 301 wfActivity.set_result(trans, getResults()); 302 303 if (0 == finishCount) { 304 wfActivity.complete(trans); 305 } 306 307 execAdmin.disconnect(trans); 308 trans.commit(); 309 } catch (Throwable ex) { 310 error("SchedulerToolAgent -> applicationProxy " + m_taName 311 + " terminated incorrectly: " + ex); 312 status = AbstractToolAgent.APP_STATUS_INVALID; 313 try { 314 shark.emptyCaches(trans); 315 trans.rollback(); 316 } catch (Exception _) { } 317 } finally { 318 try { shark.unlockProcesses(trans);} catch (Exception _){} 319 try {trans.release(); } catch (Exception _) {} 320 trans = null; 321 } 322 323 326 curThread.setName(oldThreadName); 327 } 328 329 332 public void cancel() { 333 337 } 338 339 342 public Object getAssId() { 343 return m_assId; 344 } 345 } 346 347 352 class SingletonPooledExecutor extends PooledExecutor { 353 private static final Object classLock = SingletonPooledExecutor.class; 354 355 356 private static final String SHUTDOWN_HOOK_THREAD_NAME = "SingletonPooledExecShutdownHook"; 357 358 366 private static boolean s_shutdown = false; 367 368 369 private static Thread s_shutdownHook; 370 371 374 private static SingletonPooledExecutor m_spe = null; 375 376 private static CallbackUtilities m_cus = null; 377 378 386 private Map m_assIdToCount = new HashMap (); 387 388 391 private String m_sharkUsername; 392 393 396 private String m_sharkPassword; 397 398 402 private SingletonPooledExecutor(LinkedQueue queue, int threadPoolSize) { 403 super(queue,threadPoolSize); 404 } 405 406 409 public String getSharkUsername() { 410 return m_sharkUsername; 411 } 412 413 416 public String getSharkPassword() { 417 return m_sharkPassword; 418 } 419 420 423 private void info(String infoString) { 424 if (null != m_cus) { 425 m_cus.info(infoString); 426 } else { 427 System.out.println(infoString); 428 } 429 } 430 private void error(String infoString) { 431 if (null != m_cus) { 432 m_cus.error(infoString); 433 } else { 434 System.out.println(infoString); 435 } 436 } 437 438 private static void shutdown() { 439 s_shutdown = true; 440 if (null != m_spe) { 441 m_spe.shutdownNow(); 442 try { 443 final long timeout = 2*1000L; 444 if (!m_spe.awaitTerminationAfterShutdown(timeout)) { 445 final int remainingThreads = m_spe.getPoolSize(); 446 if (0!= remainingThreads) { 447 m_spe.info("Threads remaining during shutdown: " 448 +remainingThreads); 449 } 450 } 451 452 List pendingTasks = m_spe.drain(); 453 for (Iterator itr = pendingTasks.iterator(); itr.hasNext(); ) { 454 ToolAgentCmdProxy obj = (ToolAgentCmdProxy)itr.next(); 455 m_spe.info("Pending task: "+obj.toString()); 456 obj.cancel(); 457 } 458 } catch (Exception ex) { 459 m_spe.error("Exception during thread pool shutdown: "+ex.toString()); 460 } 461 m_spe = null; 462 } 463 } 464 465 public static SingletonPooledExecutor getInstance(CallbackUtilities cus) throws Exception { 466 synchronized(classLock) { 467 if (null == m_spe) { 468 try { 469 472 m_cus = cus; 473 int threadPoolSize = 3; 474 try { 475 String threadPoolSizeStr = 476 cus.getProperty("SchedulerToolAgent.threadPoolSize",Integer.toString(3)); 477 threadPoolSize = Integer.parseInt(threadPoolSizeStr); 478 } catch (Exception ex) { 479 } 481 String sharkUsername = cus.getProperty("SchedulerToolAgent.sharkUsername", "admin"); 482 String sharkPassword = cus.getProperty("SchedulerToolAgent.sharkPassword", "enhydra"); 483 484 487 s_shutdownHook = 488 new Thread () { 489 public void run() { 490 SingletonPooledExecutor.shutdown(); 491 } 492 493 }; 494 s_shutdownHook.setName( 495 SingletonPooledExecutor.SHUTDOWN_HOOK_THREAD_NAME); 496 Runtime.getRuntime().addShutdownHook(s_shutdownHook); 497 498 m_spe = new SingletonPooledExecutor(new LinkedQueue(),threadPoolSize); 499 m_spe.waitWhenBlocked(); 500 m_spe.createThreads(threadPoolSize); 501 m_spe.m_sharkUsername = sharkUsername; 502 m_spe.m_sharkPassword = sharkPassword; 503 504 } catch (Exception ex) { 505 String errorMsg = "Exception during thread pool init "+ex.toString(); 506 if (null != m_cus) { 507 m_cus.error(errorMsg); 508 } else { 509 System.out.println(errorMsg); 510 } 511 if (null != s_shutdownHook) { 512 Runtime.getRuntime().removeShutdownHook(s_shutdownHook); 513 } 514 s_shutdownHook = null; 515 if (null != m_spe) { 516 SingletonPooledExecutor.shutdown(); 517 } 518 throw ex; 519 } 520 } 521 return m_spe; 522 } 523 } 524 525 526 529 public void execute(ToolAgentCmdProxy cmdProxy) throws InterruptedException { 530 synchronized (m_assIdToCount) { 531 535 Object assId = cmdProxy.getAssId(); 536 Integer curCnt = null == assId ? null 537 : (Integer )m_assIdToCount.get(assId); 538 int assIdCnt = null == curCnt ? 1 : curCnt.intValue()+1; 539 m_assIdToCount.put(assId, new Integer (assIdCnt)); 540 } 541 super.execute(cmdProxy); 542 } 543 544 548 public int updateCompleteCount(ToolAgentCmdProxy cmdProxy) { 549 int returnCnt; 550 synchronized (m_assIdToCount) { 551 Object assId = cmdProxy.getAssId(); 552 Integer curCnt = null == assId ? null 553 : (Integer )m_assIdToCount.get(assId); 554 if (null == curCnt) { 555 error("Unable to find cmd count for assId "+assId.toString()); 556 returnCnt=-1; 557 } 558 559 562 returnCnt = Math.max(curCnt.intValue()-1,0); 563 m_assIdToCount.put(assId, new Integer (returnCnt)); 564 } 565 566 return returnCnt; 567 } 568 569 } 570 | Popular Tags |