1 64 65 package com.jcorporate.expresso.core.utility; 66 67 import com.jcorporate.expresso.core.db.DBConnection; 68 import com.jcorporate.expresso.core.db.DBConnectionPool; 69 import com.jcorporate.expresso.core.db.DBException; 70 import com.jcorporate.expresso.core.dbobj.NextNumber; 71 import com.jcorporate.expresso.core.dbobj.SecuredDBObject; 72 import com.jcorporate.expresso.core.job.Job; 73 import com.jcorporate.expresso.core.job.ServerException; 74 import com.jcorporate.expresso.core.misc.ConfigManager; 75 import com.jcorporate.expresso.core.misc.DateTime; 76 import com.jcorporate.expresso.core.misc.StringUtil; 77 import com.jcorporate.expresso.core.registry.ExpressoThread; 78 import com.jcorporate.expresso.core.registry.MutableRequestRegistry; 79 import com.jcorporate.expresso.core.security.SuperUser; 80 import com.jcorporate.expresso.kernel.LogManager; 81 import com.jcorporate.expresso.kernel.util.FastStringBuffer; 82 import com.jcorporate.expresso.services.crontab.Crontab; 83 import com.jcorporate.expresso.services.crontab.CrontabEntry; 84 import com.jcorporate.expresso.services.crontab.CrontabListenerI; 85 import com.jcorporate.expresso.services.dbobj.JobHandlerControl; 86 import com.jcorporate.expresso.services.dbobj.JobHandlerRegistry; 87 import com.jcorporate.expresso.services.dbobj.JobQueue; 88 import com.jcorporate.expresso.services.dbobj.Setup; 89 import org.apache.log4j.Logger; 90 91 import java.io.BufferedReader ; 92 import java.io.File ; 93 import java.io.IOException ; 94 import java.io.InputStreamReader ; 95 import java.util.Enumeration ; 96 import java.util.Hashtable ; 97 import java.util.Iterator ; 98 import java.util.StringTokenizer ; 99 import java.util.Vector ; 100 101 102 111 public class JobHandler extends ExpressoThread { 112 115 protected static Vector serverList = new Vector (3); 116 117 120 protected static final Logger log = Logger.getLogger(JobHandler.class); 121 122 125 private static JobHandlerRegistry m_jobHandlerRegistry = null; 126 127 130 private static JobHandlerControl m_jobHandlerControl = null; 131 132 135 protected Crontab m_cronMgr = null; 136 137 140 protected String m_jobHandlerStatus = "unknown"; 141 142 145 protected boolean running; 146 147 150 protected long m_serverID = -1; 151 152 155 private String dbName = null; 156 157 160 private String m_qScanCriteria = null; 161 162 166 private int interval = 30; 167 168 171 private int maxJobs = 3; 172 173 180 public JobHandler(String dbName) throws DBException, ServerException { 181 if (log.isDebugEnabled()) { 182 log.debug("Job Handler for context '" + dbName + "' instantiated."); 183 } 184 185 setDataContext(dbName); 186 187 188 String intervalString = Setup.getValueRequired(getDataContext(), 189 "TimerInterval"); 190 191 if (!intervalString.equals("")) { 192 try { 193 interval = new Integer (intervalString).intValue(); 194 } catch (NumberFormatException ne) { 195 throw new ServerException("Cannot convert " + intervalString + 196 " to a number"); 197 } 198 } 199 200 201 String maxJobsString = Setup.getValueRequired(getDataContext(), 202 "MaxJobs"); 203 204 if (!maxJobsString.equals("")) { 205 try { 206 maxJobs = new Integer (maxJobsString).intValue(); 207 } catch (NumberFormatException ne) { 208 throw new ServerException("Cannot convert " + maxJobsString + 209 " to a number"); 210 } 211 } 212 213 214 registerJobHandler(); 215 216 synchronized (JobHandler.class) { 218 m_cronMgr = new Crontab(); 219 m_jobHandlerControl = new JobHandlerControl(); 220 m_jobHandlerControl.setDataContext(dbName); 221 } 222 } 223 224 229 public void setDataContext(String newDBName) { 230 if (StringUtil.notNull(newDBName).equals("")) { 231 log.error("Must specify db/context"); 232 dbName = "default"; 233 } else { 234 dbName = newDBName; 235 } 236 } 237 238 243 public void setID(long num) { 244 m_serverID = num; 245 } 246 247 252 public long getID() { 253 return m_serverID; 254 } 255 256 257 264 public Crontab getCronManager() { 265 return this.m_cronMgr; 266 } 267 268 274 public Vector getServerList() throws ServerException { 275 return serverList; 276 } 277 278 284 public static void main(String [] args) { 285 System.out.println("JobHandler"); 286 287 Hashtable commandArgs = new Hashtable (10); 288 String paramCode = null; 289 String paramValue = null; 290 291 for (int i = 0; i < args.length; i++) { 292 StringTokenizer stk = new StringTokenizer (args[i], "="); 293 294 if (!stk.hasMoreTokens()) { 295 usage(); 296 } 297 298 paramCode = stk.nextToken(); 299 300 if (!stk.hasMoreTokens()) { 301 usage(); 302 } 303 304 paramValue = stk.nextToken(); 305 commandArgs.put(paramCode, paramValue); 306 } 307 308 309 try { 310 String logDir = (String ) commandArgs.get("logDir"); 311 String logConfig = (String ) commandArgs.get("logConfig"); 312 313 if (logConfig == null) { 314 logConfig = (String ) commandArgs.get("configDir") + 315 "/expressoLogging.xml"; 316 } 317 318 new LogManager(logConfig, logDir); 319 320 ConfigManager.load(getConfigDir(commandArgs)); 322 323 ConfigManager.setWebAppDir(getAppDir(commandArgs)); 325 326 ConfigManager.dbInitialize(); 328 329 String dbName = (String ) commandArgs.get("db"); 331 332 if (dbName == null) { 333 dbName = "default"; 334 } 335 336 resetQ(dbName); 337 log.info("Jobhandler running on database '" + dbName + "'"); 338 log.info("JobHandler running..."); 339 340 JobHandler ts = new JobHandler(dbName); 341 ts.run(); 342 } catch (Exception de) { 343 de.printStackTrace(System.err); 344 log.error("Server Error", de); 345 System.exit(1); 346 } 347 } 348 349 355 public static void resetQ(String dbName) throws DBException { 356 JobQueue jql = new JobQueue(SecuredDBObject.SYSTEM_ACCOUNT); 358 jql.setDataContext(dbName); 359 jql.setField("StatusCode", JobQueue.JOB_STATUS_RUNNING); 360 361 JobQueue jq = null; 362 363 for (Iterator e = jql.searchAndRetrieveList().iterator(); e.hasNext();) { 364 jq = (JobQueue) e.next(); 365 jq.setField("StatusCode", JobQueue.JOB_STATUS_AVAILABLE); 366 jq.setField("ServerID", 0); 367 jq.update(); 368 } 369 370 JobHandlerControl jhc = new JobHandlerControl(SecuredDBObject.SYSTEM_ACCOUNT); 372 jhc.setDataContext(dbName); 373 jhc.setField(JobHandlerControl.FLD_STATUS_CODE, 374 JobHandlerControl.JOB_STATUS_RUNNING); 375 376 JobHandlerControl oneEntry = null; 377 378 for (Iterator e = jhc.searchAndRetrieveList().iterator(); e.hasNext();) { 379 oneEntry = (JobHandlerControl) e.next(); 380 oneEntry.setField(JobHandlerControl.FLD_STATUS_CODE, 381 JobHandlerControl.JOB_STATUS_AVAILABLE); 382 oneEntry.setField(JobHandlerControl.FLD_SERVERID, 0); 383 oneEntry.update(); 384 } 385 } 386 387 394 public String getStatus() { 395 return m_jobHandlerStatus; 396 } 397 398 401 public synchronized void registerJobHandler() { 402 synchronized (JobHandler.class) { 403 if (m_jobHandlerRegistry == null) { 404 try { 405 String osName = SystemInfo.getOSName(); 406 String hostName = SystemInfo.getHostName(); 407 408 if (log.isInfoEnabled()) { 409 log.info("Registering Job Handler:"); 410 log.info("host: " + hostName + " serverID: " + getID() + 411 " OS: " + osName); 412 } 413 414 m_jobHandlerRegistry = new JobHandlerRegistry(); 415 m_jobHandlerRegistry.setDataContext(this.getDataContext()); 416 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_OSNAME, 417 osName); 418 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_HOSTNAME, 419 hostName); 420 421 boolean found = m_jobHandlerRegistry.find(); 422 String serverID = null; 423 long uniqNum = -1; 424 425 if (!found) { 426 NextNumber myNext = NextNumber.getInstance(); 427 uniqNum = myNext.getNext(getDataContext(), 428 m_jobHandlerRegistry, 429 JobHandlerRegistry.FLD_SERVERID); 430 setID(uniqNum); 431 serverID = String.valueOf(getID()); 432 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_SERVERID, 433 serverID); 434 m_jobHandlerRegistry.add(); 435 } else { 436 serverID = m_jobHandlerRegistry.getField(JobHandlerRegistry.FLD_SERVERID); 437 uniqNum = Long.valueOf(serverID).longValue(); 438 setID(uniqNum); 439 } 440 441 if (getID() <= 0) { 442 if (log.isInfoEnabled()) { 443 log.error("Unable to allocate server ID"); 444 } 445 } else { 446 if (log.isInfoEnabled()) { 447 log.info("Job Handler Server Instance ID:" + 448 uniqNum); 449 } 450 } 451 452 setStatus("Enabled"); 453 } catch (DBException dbe) { 454 log.error("Cannot registry Job Handler, reason: "); 455 dbe.printStackTrace(); 456 } 457 } 458 } 459 } 460 461 464 public void run() { 465 super.run(); 466 467 if (log.isDebugEnabled()) { 468 log.debug("Job Handler for '" + getDataContext() + "' started."); 469 } 470 471 this.running = true; 472 473 new MutableRequestRegistry(dbName, SuperUser.SUPER_USER); 476 477 try { 478 resetCurrentJobs(); 479 480 while (running) { 481 try { 482 updateJobHandlerStatus(); 483 checkQueue(); 484 checkControlQueue(); 485 } catch (ServerException ae) { 486 log.error("Unable to check queue or run entry. " + 488 "Entry retained for retry", ae); 489 490 log.error(ae); 494 495 } 499 500 yield(); 501 sleep(interval * 1000); 502 } 503 } catch (InterruptedException ie) { 504 log.info("JobHandler was interrupted."); } catch (Throwable t) { 506 log.error("Unable to start job handler correctly", t); 507 } 508 } 509 510 513 public void shutDown() { 514 m_cronMgr.removeAllCrontabEntries(); 515 this.running = false; 516 517 if (log.isInfoEnabled()) { 518 log.info("JobHandler for context '" + getDataContext() + 519 "' shutting down."); 520 } 521 interrupt(); 522 } 523 524 529 protected synchronized void resetCurrentJobs() throws DBException { 530 531 JobQueue jql = new JobQueue(SecuredDBObject.SYSTEM_ACCOUNT); 532 jql.setDataContext(getDataContext()); 533 534 try { 535 jql.setField(JobQueue.FLD_STATUS_CODE, JobQueue.JOB_STATUS_RUNNING); 536 } catch (DBException ex) { 537 try { 545 sleep(1000); 546 } catch (InterruptedException ex1) { 547 } 548 549 jql.setField(JobQueue.FLD_STATUS_CODE, JobQueue.JOB_STATUS_RUNNING); 550 } 551 552 JobQueue jq = null; 553 554 for (Iterator j = jql.searchAndRetrieveList().iterator(); j.hasNext();) { 555 jq = (JobQueue) j.next(); 556 jq.setField("StatusCode", JobQueue.JOB_STATUS_AVAILABLE); 557 jq.setField("ServerID", 0); 558 jq.update(); 559 } 560 } 561 562 569 private static String getAppDir(Hashtable commandArgs) 570 throws IOException { 571 String webAppDir = StringUtil.notNull((String ) commandArgs.get("webAppDir")); 572 573 if (webAppDir.equals("")) { 574 boolean noDir = true; 575 System.out.println("No webAppDir=xxx command line argument was found."); 576 577 while (noDir) { 578 System.out.print("Please enter Expresso web application root directory:"); 579 580 BufferedReader br = new BufferedReader (new InputStreamReader (System.in)); 581 webAppDir = br.readLine(); 582 583 if (StringUtil.notNull(webAppDir).equals("")) { 584 System.out.println("You must enter a directory name, not blank"); 585 586 continue; 587 } 588 589 File theDir = new File (webAppDir); 590 591 if (!theDir.isDirectory()) { 592 System.out.println(webAppDir + 593 " is not a valid directory. "); 594 595 continue; 596 } else { 597 noDir = false; 598 } 599 } 600 } 601 602 return webAppDir; 603 } 604 605 613 private static String getConfigDir(Hashtable commandArgs) 614 throws IOException { 615 String configDir = StringUtil.notNull((String ) commandArgs.get("configDir")); 616 617 if (configDir.equals("")) { 618 boolean noDir = true; 619 System.out.println("No configDir=xxx command line argument was found."); 620 621 while (noDir) { 622 System.out.print("Please enter Expresso configuration directory:"); 623 624 BufferedReader br = new BufferedReader (new InputStreamReader (System.in)); 625 configDir = br.readLine(); 626 627 if (StringUtil.notNull(configDir).equals("")) { 628 System.out.println("You must enter a directory name, not blank"); 629 630 continue; 631 } 632 633 File theDir = new File (configDir); 634 635 if (!theDir.isDirectory()) { 636 System.out.println(configDir + 637 " is not a valid directory. "); 638 639 continue; 640 } else { 641 noDir = false; 642 } 643 } 644 } 645 646 return configDir; 647 } 648 649 private int getCurrJobNum() { 650 int load = getServerListSize(); 651 652 return load; 653 } 654 655 658 private String getDataContext() { 659 if (StringUtil.notNull(dbName).equals("")) { 660 throw new IllegalArgumentException ("No db/context was set"); 661 } 662 663 return dbName; 664 } 665 666 672 private synchronized String getQueueScanCriteria() { 673 if (m_qScanCriteria == null) { 674 FastStringBuffer fsb = new FastStringBuffer(256); 675 fsb.append("StatusCode='"); 676 fsb.append(JobQueue.JOB_STATUS_AVAILABLE); 677 fsb.append("' AND ServerID=0 AND JobOSName in ("); 678 fsb.append("'"); 679 fsb.append(SystemInfo.getOSName()); 680 fsb.append("','"); 681 fsb.append(SystemInfo.getOSClass()); 682 fsb.append("','any')"); 683 684 m_qScanCriteria = fsb.toString(); 689 } 690 691 return m_qScanCriteria; 692 } 693 694 private int getServerListSize() { 695 int jobListSize = 0; 696 697 try { 698 jobListSize = getServerList().size(); 699 } catch (ServerException se) { 700 log.error("Cannot get size of server job list, reason:"); 701 se.printStackTrace(); 702 } 703 704 return jobListSize; 705 } 706 707 private void setStatus(String status) { 708 m_jobHandlerStatus = status; 709 } 710 711 private static String getTimeStamp() { 712 try { 713 String timeStamp = DateTime.getDateTimeForDB("default"); 714 715 return timeStamp; 716 } catch (DBException dbe) { 717 return ""; 718 } 719 } 720 721 726 private synchronized void checkControlQueue() throws DBException { 727 synchronized (JobHandler.class) { 728 String sID = String.valueOf(getID()); 729 730 try { 731 m_jobHandlerControl.setField(JobHandlerControl.FLD_SERVERID, sID); 732 m_jobHandlerControl.setField(JobHandlerControl.FLD_STATUS_CODE, 733 JobHandlerControl.JOB_STATUS_AVAILABLE); 734 m_jobHandlerControl.setDataContext(this.getDataContext()); 735 } catch (DBException ex) { 736 log.error("Error checking control queue", ex); 737 throw ex; 738 } 739 740 for (Iterator e = m_jobHandlerControl.searchAndRetrieveList(JobHandlerControl.FLD_REQUEST_TIME).iterator(); 741 e.hasNext();) { 742 JobHandlerControl ctrlEntry = (JobHandlerControl) e.next(); 743 String operation = ctrlEntry.getField(JobHandlerControl.FLD_COMMAND); 744 745 if (!operation.equals("")) { 746 ctrlEntry.setField(JobHandlerControl.FLD_STATUS_CODE, 748 JobHandlerControl.JOB_STATUS_RUNNING); 749 ctrlEntry.update(); 750 751 if (log.isInfoEnabled()) { 752 log.info("Going to proceed with Job Handler Operation: " + 753 operation); 754 } 755 756 doOperation(operation); 757 } 758 759 ctrlEntry.setField(JobHandlerControl.FLD_STATUS_CODE, 760 JobHandlerControl.JOB_STATUS_COMPLETED); 761 ctrlEntry.update(); 762 } 763 } 764 } 765 766 769 private synchronized void checkQueue() throws ServerException { 770 if (log.isDebugEnabled()) { 771 System.out.print("."); 772 } 773 774 try { 775 JobQueue myQueueList = new JobQueue(SecuredDBObject.SYSTEM_ACCOUNT); 776 myQueueList.setDataContext(getDataContext()); 777 778 JobQueue oneQueueEntry = null; 779 String crt = getQueueScanCriteria(); 780 myQueueList.setCustomWhereClause(crt); 781 782 for (Iterator e = myQueueList.searchAndRetrieveList("Priority|JobNumber").iterator(); e.hasNext();) { 783 oneQueueEntry = (JobQueue) e.next(); 784 785 if (log.isInfoEnabled()) { 786 log.info("Processing queue entry " + 787 oneQueueEntry.getField("JobNumber") + " from user " + 788 oneQueueEntry.getField(JobQueue.FLD_UID)); 789 } 790 791 String job = oneQueueEntry.getField("JobNumber"); 792 793 if (log.isInfoEnabled()) { 794 log.info("Processing Job " + job); 795 } 796 797 try { 798 oneQueueEntry.setField(JobQueue.FLD_JOBNUMBER, job); 799 oneQueueEntry.setField(JobQueue.FLD_STATUS_CODE, 800 JobQueue.JOB_STATUS_RUNNING); 801 oneQueueEntry.setField(JobQueue.FLD_SERVERID, 802 Long.toString(getID())); 803 } catch (DBException ex) { 804 log.error("Error setting Job Queue Entry fields", ex); 805 throw ex; 806 } 807 808 if (!markJob(job)) { 809 log.warn("Unable to lock job " + job); 810 811 return; 812 } 813 814 String myServerName = null; 815 816 try { 817 myServerName = oneQueueEntry.getField("JobCode"); 818 819 if (myServerName == null) { 820 throw new ServerException("Job name to run was null"); 821 } 822 823 if (log.isInfoEnabled()) { 824 log.info("Loading Job " + myServerName); 825 } 826 827 final Job myServer = Job.instantiate(myServerName); 828 829 if (log.isInfoEnabled()) { 830 log.info("Loaded Job " + myServerName); 831 } 832 833 if (myServer.multiThreaded()) { 834 if (getServerListSize() >= maxJobs) { 835 if (log.isInfoEnabled()) { 836 log.info("There are currently " + 837 getServerListSize() + 838 " jobs in job list, max jobs is " + 839 maxJobs + 840 ". Waiting for current jobs to complete"); 841 } 842 843 waitForAllJobs(); 844 } 845 846 myServer.setQueue(oneQueueEntry); 847 serverList.addElement(myServer); 848 849 if (oneQueueEntry.useCron()) { 852 String cronEntry = oneQueueEntry.getCronEntry(); 853 StringTokenizer st = new StringTokenizer (cronEntry, 854 ","); 855 856 int minute = 0; 857 int hour = 0; 858 int dayOfMonth = 0; 859 int month = 0; 860 int dayOfWeek = 0; 861 int year = 0; 862 minute = Integer.valueOf(st.nextToken()).intValue(); 863 hour = Integer.valueOf(st.nextToken()).intValue(); 864 dayOfMonth = Integer.valueOf(st.nextToken()) 865 .intValue(); 866 month = Integer.valueOf(st.nextToken()).intValue(); 867 dayOfWeek = Integer.valueOf(st.nextToken()) 868 .intValue(); 869 year = Integer.valueOf(st.nextToken()).intValue(); 870 871 myServer.setUseCron(true); 873 874 CrontabEntry cronID = m_cronMgr.addCrontabEntry(minute, 875 hour, dayOfMonth, month, dayOfWeek, year, myServer.getClass().getName(), 876 new CrontabListenerI() { 877 public void handleCrontabEntry(CrontabEntry entry) { 878 log.info("Starting Cron process"); 879 myServer.start(); 881 } 882 }, myServer.getJobNumber()); 883 884 myServer.setCronAlarmEntry(cronID); 885 886 if (log.isInfoEnabled()) { 887 log.info("Job '" + myServerName + 888 "' submitted to the CRON"); 889 } 890 } else { 891 if (log.isInfoEnabled()) { 892 log.info("Job '" + myServerName + "' begins"); 893 } 894 895 myServer.start(); 896 } 897 } else { 898 if (getServerListSize() > 0) { 899 if (log.isInfoEnabled()) { 900 log.info("Job class " + myServerName + 901 " is single-threaded, waiting for all " + 902 "current jobs to complete"); 903 } 904 905 waitForAllJobs(); 906 } 907 908 myServer.setQueue(oneQueueEntry); 909 serverList.addElement(myServer); 910 911 if (oneQueueEntry.useCron()) { 912 String cronEntry = oneQueueEntry.getCronEntry(); 913 StringTokenizer st = new StringTokenizer (cronEntry, 914 ","); 915 int minute = Integer.valueOf(st.nextToken()) 916 .intValue(); 917 int hour = Integer.valueOf(st.nextToken()).intValue(); 918 int dayOfMonth = Integer.valueOf(st.nextToken()) 919 .intValue(); 920 int month = Integer.valueOf(st.nextToken()) 921 .intValue(); 922 int dayOfWeek = Integer.valueOf(st.nextToken()) 923 .intValue(); 924 int year = Integer.valueOf(st.nextToken()).intValue(); 925 926 myServer.setUseCron(true); 928 929 CrontabEntry cronID = m_cronMgr.addCrontabEntry(minute, 931 hour, dayOfMonth, month, dayOfWeek, year, myServer.getClass().getName(), 932 new CrontabListenerI() { 933 public void handleCrontabEntry(CrontabEntry entry) { 934 log.info("Running Cron process"); 935 myServer.run(); 937 } 938 }, myServer.getJobNumber()); 939 940 myServer.setCronAlarmEntry(cronID); 941 942 if (log.isInfoEnabled()) { 943 log.info("Job '" + myServerName + 944 "' submitted to the CRON"); 945 } 946 } else { 947 try { 948 if (log.isInfoEnabled()) { 949 log.info("Running (and waiting for) Job " + 950 myServerName); 951 } 952 953 myServer.run(); 954 955 if (log.isInfoEnabled()) { 956 log.info("Job class " + myServerName + 957 " completed"); 958 } 959 } catch (Exception exp) { 960 log.error("Exception caught while running Job.", 961 exp); 962 } 963 } 964 } 965 966 967 yield(); 968 } catch (Throwable eo) { 969 log.error("Error loading job class", eo); 970 throw new ServerException("Exception loading " + 971 "job class '" + myServerName + 972 "'- see detailed message in log", eo); 973 } 974 } 975 976 977 if (!serverList.isEmpty()) { 978 cleanFinishedJobs(); 979 } 980 981 removeCronJobForRemovedQueueEntry(); 982 983 } catch (DBException de) { 984 log.error(de); 985 throw new ServerException("Database Exception checking" + "queue", 986 de); 987 } 988 } 989 990 private void cleanFinishedJobs() { 991 992 993 Job oneService = null; 994 995 synchronized (Job.class) { 996 for (Enumeration ee = serverList.elements(); ee.hasMoreElements();) { 997 oneService = (Job) ee.nextElement(); 998 999 if (!(oneService.isAlive() || oneService.useCron())) { 1002 log.info("Job " + oneService.getClass().getName() + 1003 " is completed"); 1004 serverList.removeElement(oneService); 1005 } 1006 } 1007 } 1008 } 1009 1010 1016 private void removeCronJobForRemovedQueueEntry() throws DBException { 1017 JobQueue jobQueue = new JobQueue(); 1018 for (Iterator i = m_cronMgr.getAllEntries().iterator(); i.hasNext();) { 1019 CrontabEntry oneCronEntry = (CrontabEntry) i.next(); 1020 1021 String jobNumber = oneCronEntry.getJobNumber(); 1022 if (jobNumber != null) { 1024 jobQueue.clear(); 1025 jobQueue.setField(JobQueue.FLD_JOBNUMBER, jobNumber); 1026 if (!jobQueue.find()) { 1027 m_cronMgr.removeCrontabEntry(oneCronEntry); 1028 } 1029 } 1030 } 1031 } 1032 1033 private void doOperation(String oper) throws DBException { 1034 if (log.isInfoEnabled()) { 1035 log.info("Starting operation: " + oper); 1036 } 1037 1038 StringTokenizer st = new StringTokenizer (oper, ", "); 1039 String cmd = st.nextToken(); 1040 1041 if (cmd.startsWith(JobHandlerControl.STOP_COMMAND)) { 1042 Job oneService = null; 1043 1044 for (Enumeration ee = serverList.elements(); ee.hasMoreElements();) { 1045 oneService = (Job) ee.nextElement(); 1046 1047 String pid = oneService.getJobNumber(); 1048 1049 if (oper.indexOf(pid) != -1) { 1050 oneService.interrupt(); 1051 1052 if (oneService.useCron()) { 1054 CrontabEntry cronID = (CrontabEntry) oneService.getCronAlarmEntry(); 1055 m_cronMgr.removeCrontabEntry(cronID); 1056 } 1057 1058 JobQueue jq = oneService.getJobQueueEntry(); 1060 jq.setJobStatus(JobQueue.JOB_STATUS_STOPPED); 1061 jq.update(); 1062 1063 serverList.removeElement(oneService); 1065 log.info("Job was killed by request, job number: " + pid); 1066 } 1067 } 1068 } else if (cmd.startsWith(JobHandlerControl.RESTART_COMMAND)) { 1069 Job oneService = null; 1070 1071 for (Enumeration ee = serverList.elements(); ee.hasMoreElements();) { 1072 oneService = (Job) ee.nextElement(); 1073 1074 String pid = oneService.getJobNumber(); 1075 1076 if (oper.indexOf(pid) != -1) { 1077 oneService.interrupt(); 1078 1079 if (oneService.useCron()) { 1081 CrontabEntry cronID = (CrontabEntry) oneService.getCronAlarmEntry(); 1082 m_cronMgr.removeCrontabEntry(cronID); 1083 } 1084 1085 serverList.removeElement(oneService); 1086 1087 JobQueue jq = oneService.getJobQueueEntry(); 1089 jq.setJobStatus(JobQueue.JOB_STATUS_AVAILABLE); 1090 jq.update(); 1091 1092 if (log.isInfoEnabled()) { 1094 log.info("Job was restarted by request, job number: " + 1095 pid); 1096 } 1097 } 1098 } 1099 } 1100 } 1101 1102 1108 private synchronized boolean markJob(String jobNumber) { 1109 DBConnectionPool myPool = null; 1110 DBConnection myConnection = null; 1111 1112 try { 1113 myPool = DBConnectionPool.getInstance(dbName); 1114 myConnection = myPool.getConnection(getClass().getName()); 1115 1116 FastStringBuffer fsb = new FastStringBuffer(128); 1117 fsb.append("UPDATE JOBQUEUE SET StatusCode ='"); 1118 fsb.append(JobQueue.JOB_STATUS_RUNNING); 1119 fsb.append("', ServerID = "); 1120 fsb.append(getID()); 1121 fsb.append(" WHERE JobNumber = "); 1122 fsb.append(jobNumber); 1123 fsb.append(" AND StatusCode = 'A'"); 1124 fsb.append(" AND ServerID = 0"); 1125 1126 myConnection.executeUpdate(fsb.toString()); 1132 1133 if (myConnection.getUpdateCount() != 1) { 1134 log.warn("Unable to process job " + jobNumber + 1135 ", job no longer available."); 1136 myPool.release(myConnection); 1137 1138 return false; 1139 } 1140 1141 } catch (DBException de) { 1142 log.warn("Unable to lock job", de); 1143 1144 return false; 1145 } finally { 1146 if (myConnection != null) { 1147 myPool.release(myConnection); 1148 } 1149 } 1150 1151 return true; 1152 } 1153 1154 1157 private synchronized void updateJobHandlerStatus() { 1158 try { 1159 String timeStamp = getTimeStamp(); 1160 String status = getStatus(); 1161 String id = String.valueOf(getID()); 1162 String freeMem = String.valueOf(SystemInfo.getFreeMem()); 1163 String totalMem = String.valueOf(SystemInfo.getTotalMem()); 1164 String usedMem = String.valueOf(SystemInfo.getUsedMem()); 1165 String powerF = String.valueOf(SystemInfo.getPowerFactor()); 1166 String loadAverage = String.valueOf(SystemInfo.getLoadAverage()); 1167 String jobNum = String.valueOf(getCurrJobNum()); 1168 1169 synchronized (JobHandler.class) { 1170 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_SERVERID, 1171 id); 1172 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_TIMESTAMP, 1173 timeStamp); 1174 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_STATUS, 1175 status); 1176 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_FREEMEM, 1177 freeMem); 1178 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_TOTALMEM, 1179 totalMem); 1180 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_USEDMEM, 1181 usedMem); 1182 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_POWERFACTOR, 1183 powerF); 1184 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_LOAD_AVERAGE, 1185 loadAverage); 1186 m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_CURR_JOBNUM, 1187 jobNum); 1188 m_jobHandlerRegistry.setCheckZeroUpdate(false); 1189 m_jobHandlerRegistry.update(); 1190 1191 if (log.isDebugEnabled()) { 1192 log.debug("JobHandler : " + SystemInfo.getHostName() + "/" + 1193 getID() + "/" + getDataContext() + " is alive"); 1194 } 1195 } 1196 } catch (DBException dbe) { 1197 log.error("Cannot update Registry for Job Handler, Reason: ", dbe); 1198 } 1199 } 1200 1201 1204 private static void usage() { 1205 System.out.println("Usage: JobHandler arg=value ... "); 1206 System.out.println("Where arguments are: configDir, db, [logDir], [logConfig], and webAppDir"); 1207 System.exit(1); 1208 } 1209 1210 1216 private void waitForAllJobs() throws InterruptedException { 1217 while (true) { 1218 1219 int aliveCount = 0; 1220 Job oneService = null; 1221 1222 for (Enumeration ee = serverList.elements(); ee.hasMoreElements();) { 1223 oneService = (Job) ee.nextElement(); 1224 1225 if (oneService.isAlive()) { 1226 aliveCount++; 1227 } 1228 } 1229 1230 1231 if (aliveCount == 0) { 1232 log.info("All jobs complete"); 1233 1234 return; 1235 } 1236 1237 if (log.isInfoEnabled()) { 1238 log.info("There are still " + aliveCount + 1239 " active jobs. Waiting for all jobs to complete"); 1240 } 1241 1242 yield(); 1243 sleep(interval * 1000); 1244 } 1245 } 1246} 1247 | Popular Tags |