KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > enhydra > shark > toolagent > SchedulerToolAgent


1 package org.enhydra.shark.toolagent;
2
3 import java.util.HashMap JavaDoc;
4 import java.util.Iterator JavaDoc;
5 import java.util.List JavaDoc;
6 import java.util.Map JavaDoc;
7
8 import org.enhydra.shark.Shark;
9 import org.enhydra.shark.api.SharkTransaction;
10 import org.enhydra.shark.api.client.wfmodel.WfActivity;
11 import org.enhydra.shark.api.client.wfservice.AdminInterface;
12 import org.enhydra.shark.api.client.wfservice.ExecutionAdministration;
13 import org.enhydra.shark.api.internal.toolagent.AppParameter;
14 import org.enhydra.shark.api.internal.toolagent.ApplicationBusy;
15 import org.enhydra.shark.api.internal.toolagent.ApplicationNotDefined;
16 import org.enhydra.shark.api.internal.toolagent.ApplicationNotStarted;
17 import org.enhydra.shark.api.internal.toolagent.SessionHandle;
18 import org.enhydra.shark.api.internal.toolagent.ToolAgent;
19 import org.enhydra.shark.api.internal.toolagent.ToolAgentGeneralException;
20 import org.enhydra.shark.api.internal.working.CallbackUtilities;
21 import org.enhydra.shark.xpdl.XPDLConstants;
22 import org.enhydra.shark.xpdl.elements.ExtendedAttribute;
23 import org.enhydra.shark.xpdl.elements.ExtendedAttributes;
24
25 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
26 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
27
28 /**
29  * <P>
30  * Tool agent class to schedule a ToolAgent call in
31  * separate pool of threads
32  * </P>
33  * @author Abe Achkinaz, BDNACorp.com
34  */

35 public class SchedulerToolAgent extends AbstractToolAgent {
36
37    private final static String JavaDoc TOOL_AGENT_CLASS_EXT_ATTR_NAME="ToolAgentClass";
38    private final static String JavaDoc TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME="ToolAgentClassProxy";
39
40    /*
41     * Helper methods for logging
42     */

43    private void info(String JavaDoc infoString) {
44       if (null != cus) {
45          cus.info(infoString);
46       }
47    }
48    private void error(String JavaDoc infoString) {
49       if (null != cus) {
50          cus.error(infoString);
51       }
52    }
53
54    public String JavaDoc getInfo (SharkTransaction t) throws ToolAgentGeneralException {
55       String JavaDoc i=
56          "Wraps a ToolAgent standard call and executes them a separate"+
57          "\nin thread-pool."+
58          "\n"+
59          "\nTo use define an ToolAgentClass extended attribute to the scheduler"+
60          "\nanother extended attribute 'ToolAgentClassProxy' to the actual"+
61          "\napplication to be called in a separet thread. For example: "+
62          "\n<ExtendedAttributes>"+
63          "\n <ExtendedAttribute Name=\"ToolAgentClass\" Value=\"org.enhydra.shark.toolagent.SchedulerToolAgent\"/>"+
64          "\n <ExtendedAttribute Name=\"ToolAgentClassProxy\" Value=\"org.enhydra.shark.toolagent.BshToolAgent\"/>"+
65          "\n <ExtendedAttribute Name=\"Script\" Value=\"System.out.println(\"I was called...\");\"/>"+
66          "\n</ExtendedAttributes>"+
67          "\n"+
68          "\nTo be able to work with SchedulerToolAgent, you must define some "+
69          "\nproperties, and here is a section from shark's configuration file \"Shark.conf\" "+
70          "\nthat defines these properties:"+
71          "\n# Configure number of threads to execute commands and admin user/password: "+
72          "\nSchedulerToolAgent.threadPoolSize=3"+
73          "\nSchedulerToolAgent.sharkUsername=admin"+
74          "\nSchedulerToolAgent.sharkPassword=enhydra"+
75          "\n"
76          ;
77       return i;
78    }
79
80    public void invokeApplication(SharkTransaction t, long handle,
81                                  String JavaDoc applicationName, String JavaDoc procInstId, String JavaDoc assId,
82                                  AppParameter[] parameters, Integer JavaDoc appMode)
83       throws ApplicationNotStarted, ApplicationNotDefined,
84       ApplicationBusy, ToolAgentGeneralException {
85
86       super.invokeApplication(t, handle, applicationName, procInstId, assId,
87                               parameters, appMode);
88       try {
89          /*
90           * Get proxy class name and replace in parameter[0]:
91           */

92          String JavaDoc extAttribs=(String JavaDoc)parameters[0].the_value;
93          ExtendedAttributes eas = readParamsFromExtAttributes(extAttribs);
94          ExtendedAttribute eaScheduler=eas.getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_EXT_ATTR_NAME);
95          String JavaDoc schedulerClassName = eaScheduler.getVValue();
96          ExtendedAttribute eaProxy=eas.getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME);
97          String JavaDoc proxyClassName = eaProxy.getVValue();
98          int idxSchedulerClassName = extAttribs.indexOf(schedulerClassName);
99          String JavaDoc newExtAttribs = extAttribs.substring(0,idxSchedulerClassName)
100             + proxyClassName
101             + extAttribs.substring(idxSchedulerClassName+schedulerClassName.length());
102          parameters[0].the_value = newExtAttribs;
103          readParamsFromExtAttributes(newExtAttribs);
104
105          Class JavaDoc cls=Class.forName(proxyClassName);
106          ToolAgent ta=(ToolAgent)cls.newInstance();
107
108          /*
109           * Get shark's username and password to use to report back to engine
110           */

111          String JavaDoc sharkUsername = cus.getProperty("SchedulerToolAgent.sharkUsername", "admin");
112          String JavaDoc sharkPassword = cus.getProperty("SchedulerToolAgent.sharkPassword", "enhydra");
113
114          ta.configure(cus);
115
116          ToolAgentCmdProxy taCmdProxy = new ToolAgentCmdProxy(
117             cus, ta, proxyClassName,
118             username, password, engineName, scope,
119             handle, applicationName, procInstId, assId, parameters,
120             appMode);
121
122          SingletonPooledExecutor.getInstance(cus).execute(taCmdProxy);
123
124          /*
125           * Finished spawning the command, returned finished to engine
126           * otherwise it throws an exception!
127           */

128          status = AbstractToolAgent.APP_STATUS_FINISHED;
129       } catch (Throwable JavaDoc ex) {
130          error("SchedulerToolAgent terminated incorrectly: " + ex);
131          status = AbstractToolAgent.APP_STATUS_INVALID;
132          throw new ToolAgentGeneralException(ex);
133       }
134    }
135 }
136
137 /*
138  * <P>
139  * Wrap ToolAgent interface into runnable class used
140  * by SinglePooledExec
141  * </P>
142  * @author aachkinazi, BDNACorp.com
143  */

144 class ToolAgentCmdProxy implements Runnable JavaDoc {
145    CallbackUtilities m_cus;
146    String JavaDoc m_taName;
147    ToolAgent m_ta;
148    String JavaDoc m_username;
149    String JavaDoc m_password;
150    String JavaDoc m_engineName;
151    String JavaDoc m_scope;
152    long m_handle;
153    String JavaDoc m_applicationName;
154    String JavaDoc m_procInstId;
155    String JavaDoc m_assId;
156    AppParameter[] m_parameters;
157    AppParameter[] m_proxyParameters;
158    Integer JavaDoc m_appMode;
159
160    ToolAgentCmdProxy(CallbackUtilities cus,
161                      ToolAgent ta, String JavaDoc taName,
162                      String JavaDoc username, String JavaDoc password,
163                      String JavaDoc engineName, String JavaDoc scope,
164                      long handle,
165                      String JavaDoc applicationName, String JavaDoc procInstId, String JavaDoc assId,
166                      AppParameter[] parameters, Integer JavaDoc appMode) {
167       m_cus = cus;
168       m_ta = ta;
169       m_taName = taName;
170       m_username = username;
171       m_password = password;
172       m_engineName = engineName;
173       m_scope = scope;
174       m_handle = handle;
175       m_applicationName = applicationName;
176       m_procInstId = procInstId;
177       m_assId = assId;
178       m_parameters = parameters;
179       m_appMode = appMode;
180    }
181
182    /*
183     * Helper logging methods
184     */

185    private void info(String JavaDoc infoString) {
186       if (null != m_cus) {
187          m_cus.info(infoString);
188       }
189    }
190    private void error(String JavaDoc infoString) {
191       if (null != m_cus) {
192          m_cus.error(infoString);
193       }
194    }
195
196    /**
197     * Return IN_OUT and OUT values to
198     * the calling activity
199     * @return
200     */

201    private Map JavaDoc getResults() throws Exception JavaDoc {
202       /*
203        * Build activity result map,
204        */

205       Map JavaDoc results = new HashMap JavaDoc();
206       for (int i=0; i<m_parameters.length; ++i) {
207          AppParameter p = m_parameters[i];
208          if (p.the_mode.equals(XPDLConstants.FORMAL_PARAMETER_MODE_INOUT)
209              || p.the_mode.equals(XPDLConstants.FORMAL_PARAMETER_MODE_OUT)) {
210             results.put(p.the_actual_name,
211                         convertToProperType(p.the_value, p.the_class));
212          }
213       }
214
215       return results;
216    }
217
218    private Object JavaDoc convertToProperType(Object JavaDoc toConvert, Class JavaDoc desiredType)
219       throws Exception JavaDoc {
220       if (null == toConvert || desiredType.isInstance(toConvert))
221          return toConvert;
222
223       if (desiredType.equals(Integer JavaDoc.class)) {
224          return new Integer JavaDoc((new Integer JavaDoc(toConvert.toString())).intValue());
225       } else if (desiredType.equals(Long JavaDoc.class)) {
226          return new Long JavaDoc((new Double JavaDoc(toConvert.toString())).longValue());
227       } else if (desiredType.equals(Boolean JavaDoc.class)) {
228          return new Boolean JavaDoc(toConvert.toString());
229       } else if (desiredType.equals(Double JavaDoc.class)) {
230          return new Double JavaDoc(toConvert.toString());
231       } else if (desiredType.equals(java.util.Date JavaDoc.class)) {
232          return new java.util.Date JavaDoc(toConvert.toString());
233       }
234       return toConvert;
235    }
236
237    /* (non-Javadoc)
238     * @see java.lang.Runnable#run()
239     */

240    public void run() {
241       Thread JavaDoc curThread = Thread.currentThread();
242       String JavaDoc oldThreadName = curThread.getName();
243       /*
244        * Invoke query application and set state of activity in proxy
245        * thread context:
246        */

247       long status = AbstractToolAgent.APP_STATUS_INVALID;
248
249       SingletonPooledExecutor spe=null;
250       try {
251          spe = SingletonPooledExecutor.getInstance(m_cus);
252       } catch (Exception JavaDoc _) {}
253       if (null == spe) {
254          error("Unable to get thread-pool!");
255          return;
256       }
257
258       Shark shark = Shark.getInstance();
259       if (null == shark) {
260          error("Unable to get Shark engine instance!");
261          spe.updateCompleteCount(this);
262          return;
263       }
264
265       SharkTransaction trans = null;
266       try {
267          curThread.setName(oldThreadName+"->"+m_taName);
268          SessionHandle taShandle;
269
270          trans = shark.createTransaction();
271
272          taShandle=m_ta.connect(trans, m_username,m_password,m_engineName,m_scope);
273
274          /*
275           * Use null for appName because ToolAgent implementations check for appName
276           * being null to parse extended attributes and look for the name! This has
277           * the side-effect of initializing other ToolAgent specific extended attributes
278           * such as Script for BshToolAgent.
279           */

280          m_ta.invokeApplication(trans, taShandle.getHandle(), null,
281                                 m_procInstId, m_assId, m_parameters, m_appMode);
282
283          status=m_ta.requestAppStatus(trans,taShandle.getHandle(),m_procInstId,
284                                       m_assId,m_parameters);
285          m_ta.disconnect(trans,taShandle);
286
287          int finishCount = spe.updateCompleteCount(this);
288
289          /*
290           * Find activity and set results
291           */

292          AdminInterface adminInterface = shark.getAdminInterface();
293          ExecutionAdministration execAdmin =
294             adminInterface.getExecutionAdministration();
295          execAdmin.connect(trans, spe.getSharkUsername(), spe.getSharkPassword(),
296                            m_engineName, m_scope);
297          String JavaDoc activityId = adminInterface.getAdminMisc()
298             .getAssignmentActivityId(m_procInstId, m_assId);
299
300          WfActivity wfActivity = execAdmin.getActivity(trans, m_procInstId, activityId);
301          wfActivity.set_result(trans, getResults());
302
303          if (0 == finishCount) {
304             wfActivity.complete(trans);
305          }
306
307          execAdmin.disconnect(trans);
308          trans.commit();
309       } catch (Throwable JavaDoc ex) {
310          error("SchedulerToolAgent -> applicationProxy " + m_taName
311                   + " terminated incorrectly: " + ex);
312          status = AbstractToolAgent.APP_STATUS_INVALID;
313          try {
314             shark.emptyCaches(trans);
315             trans.rollback();
316          } catch (Exception JavaDoc _) { /* left blank */ }
317       } finally {
318          try { shark.unlockProcesses(trans);} catch (Exception JavaDoc _){}
319          try {trans.release(); } catch (Exception JavaDoc _) {}
320          trans = null;
321       }
322
323       /*
324        * Update activity with results and set complete state
325        */

326       curThread.setName(oldThreadName);
327    }
328
329    /**
330     * Cancel an activity step that never got scheduled
331     */

332    public void cancel() {
333       /*
334        * There is no generic way to handle cancelling a pending
335        * application that never got a thread to run. Do nothing for now!
336        */

337    }
338
339    /**
340     * @return
341     */

342    public Object JavaDoc getAssId() {
343       return m_assId;
344    }
345 }
346
347 /*
348  * Define SingletonPoolExecutor to manager threads used by
349  * SchedulerToolAgent and controlling wfActivityComplete.
350  * @author aachkinazi, BDNACorp.com
351  */

352 class SingletonPooledExecutor extends PooledExecutor {
353    private static final Object JavaDoc classLock = SingletonPooledExecutor.class;
354
355    /** Shutdown hook thread name. */
356    private static final String JavaDoc SHUTDOWN_HOOK_THREAD_NAME = "SingletonPooledExecShutdownHook";
357
358    /**
359     * Global shutdown flag to expedite shutdown process. There
360     * is no need to synchronize access to this flag because the
361     * shutdown sequence does not rely or coordinate on the state
362     * of this flag. It simply indicates that the shutdown
363     * sequence has begun so the threads that happen to be in
364     * a good state to do so can shutdown early.
365     */

366    private static boolean s_shutdown = false;
367
368    /** Thread to run as shutdown hook. */
369    private static Thread JavaDoc s_shutdownHook;
370
371    /**
372     * Sigleton pool of threads
373     */

374    private static SingletonPooledExecutor m_spe = null;
375
376    private static CallbackUtilities m_cus = null;
377
378    /**
379     * Keep track of pending scheduled commands per assignment
380     * only call WfActivity.complete() when the count reaches 0. This
381     * handles the case where an activity has multiple SchedulerToolAgent
382     * steps.
383     * @see SingletonPooledExecutor#execute(ToolAgentCmdProxy)
384     * @see SingletonPooledExecutor#completeCmd(ToolAgentCmdProxy)
385     */

386    private Map JavaDoc m_assIdToCount = new HashMap JavaDoc();
387
388    /**
389     * Use this name to connect to Shark engine
390     */

391    private String JavaDoc m_sharkUsername;
392
393    /**
394     * Use this password to connect to Shark engine
395     */

396    private String JavaDoc m_sharkPassword;
397
398    /**
399     * @param queue
400     * @param threadPoolSize
401     */

402    private SingletonPooledExecutor(LinkedQueue queue, int threadPoolSize) {
403       super(queue,threadPoolSize);
404    }
405
406    /**
407     * @return
408     */

409    public String JavaDoc getSharkUsername() {
410       return m_sharkUsername;
411    }
412
413    /**
414     * @return
415     */

416    public String JavaDoc getSharkPassword() {
417       return m_sharkPassword;
418    }
419
420    /*
421     * Helper logging methods
422     */

423    private void info(String JavaDoc infoString) {
424       if (null != m_cus) {
425          m_cus.info(infoString);
426       } else {
427          System.out.println(infoString);
428       }
429    }
430    private void error(String JavaDoc infoString) {
431       if (null != m_cus) {
432          m_cus.error(infoString);
433       } else {
434          System.out.println(infoString);
435       }
436    }
437
438    private static void shutdown() {
439       s_shutdown = true;
440       if (null != m_spe) {
441          m_spe.shutdownNow();
442          try {
443             final long timeout = 2*1000L;
444             if (!m_spe.awaitTerminationAfterShutdown(timeout)) {
445                final int remainingThreads = m_spe.getPoolSize();
446                if (0!= remainingThreads) {
447                   m_spe.info("Threads remaining during shutdown: "
448                                 +remainingThreads);
449                }
450             }
451
452             List JavaDoc pendingTasks = m_spe.drain();
453             for (Iterator JavaDoc itr = pendingTasks.iterator(); itr.hasNext(); ) {
454                ToolAgentCmdProxy obj = (ToolAgentCmdProxy)itr.next();
455                m_spe.info("Pending task: "+obj.toString());
456                obj.cancel();
457             }
458          } catch (Exception JavaDoc ex) {
459             m_spe.error("Exception during thread pool shutdown: "+ex.toString());
460          }
461          m_spe = null;
462       }
463    }
464
465    public static SingletonPooledExecutor getInstance(CallbackUtilities cus) throws Exception JavaDoc {
466       synchronized(classLock) {
467          if (null == m_spe) {
468             try {
469                /*
470                 * Configure for the first time,
471                 */

472                m_cus = cus;
473                int threadPoolSize = 3;
474                try {
475                   String JavaDoc threadPoolSizeStr =
476                      cus.getProperty("SchedulerToolAgent.threadPoolSize",Integer.toString(3));
477                   threadPoolSize = Integer.parseInt(threadPoolSizeStr);
478                } catch (Exception JavaDoc ex) {
479                   // Left blank!
480
}
481                String JavaDoc sharkUsername = cus.getProperty("SchedulerToolAgent.sharkUsername", "admin");
482                String JavaDoc sharkPassword = cus.getProperty("SchedulerToolAgent.sharkPassword", "enhydra");
483
484                /*
485                 * Provide a shutdown hook to clean up thread pools
486                 */

487                s_shutdownHook =
488                   new Thread JavaDoc() {
489                   public void run() {
490                      SingletonPooledExecutor.shutdown();
491                   }
492
493                };
494                s_shutdownHook.setName(
495                   SingletonPooledExecutor.SHUTDOWN_HOOK_THREAD_NAME);
496                Runtime.getRuntime().addShutdownHook(s_shutdownHook);
497
498                m_spe = new SingletonPooledExecutor(new LinkedQueue(),threadPoolSize);
499                m_spe.waitWhenBlocked();
500                m_spe.createThreads(threadPoolSize);
501                m_spe.m_sharkUsername = sharkUsername;
502                m_spe.m_sharkPassword = sharkPassword;
503
504             } catch (Exception JavaDoc ex) {
505                String JavaDoc errorMsg = "Exception during thread pool init "+ex.toString();
506                if (null != m_cus) {
507                   m_cus.error(errorMsg);
508                } else {
509                   System.out.println(errorMsg);
510                }
511                if (null != s_shutdownHook) {
512                   Runtime.getRuntime().removeShutdownHook(s_shutdownHook);
513                }
514                s_shutdownHook = null;
515                if (null != m_spe) {
516                   SingletonPooledExecutor.shutdown();
517                }
518                throw ex;
519             }
520          }
521          return m_spe;
522       }
523    }
524
525
526    /* (non-Javadoc)
527     * @see EDU.oswego.cs.dl.util.concurrent.Executor#execute(java.lang.Runnable)
528     */

529    public void execute(ToolAgentCmdProxy cmdProxy) throws InterruptedException JavaDoc {
530       synchronized (m_assIdToCount) {
531          /*
532           * Keep track of outstanding SchedulerToolAgent calls
533           * per assId/Activity.
534           */

535          Object JavaDoc assId = cmdProxy.getAssId();
536          Integer JavaDoc curCnt = null == assId ? null
537             : (Integer JavaDoc)m_assIdToCount.get(assId);
538          int assIdCnt = null == curCnt ? 1 : curCnt.intValue()+1;
539          m_assIdToCount.put(assId, new Integer JavaDoc(assIdCnt));
540       }
541       super.execute(cmdProxy);
542    }
543
544    /**
545     * @param proxy
546     * @return
547     */

548    public int updateCompleteCount(ToolAgentCmdProxy cmdProxy) {
549       int returnCnt;
550       synchronized (m_assIdToCount) {
551          Object JavaDoc assId = cmdProxy.getAssId();
552          Integer JavaDoc curCnt = null == assId ? null
553             : (Integer JavaDoc)m_assIdToCount.get(assId);
554          if (null == curCnt) {
555             error("Unable to find cmd count for assId "+assId.toString());
556             returnCnt=-1;
557          }
558
559          /*
560           * Only complete the last scheduled activity
561           */

562          returnCnt = Math.max(curCnt.intValue()-1,0);
563          m_assIdToCount.put(assId, new Integer JavaDoc(returnCnt));
564       }
565
566       return returnCnt;
567    }
568
569 }
570
Popular Tags