1 17 18 21 package org.quartz.simpl; 22 23 import java.util.ArrayList ; 24 import java.util.Comparator ; 25 import java.util.Date ; 26 import java.util.HashMap ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.Set ; 30 import java.util.TreeSet ; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.quartz.Calendar; 35 import org.quartz.JobDataMap; 36 import org.quartz.JobDetail; 37 import org.quartz.JobPersistenceException; 38 import org.quartz.ObjectAlreadyExistsException; 39 import org.quartz.SchedulerException; 40 import org.quartz.Trigger; 41 import org.quartz.core.SchedulingContext; 42 import org.quartz.spi.ClassLoadHelper; 43 import org.quartz.spi.JobStore; 44 import org.quartz.spi.SchedulerSignaler; 45 import org.quartz.spi.TriggerFiredBundle; 46 47 64 public class RAMJobStore implements JobStore { 65 66 73 74 protected HashMap jobsByFQN = new HashMap (1000); 75 76 protected HashMap triggersByFQN = new HashMap (1000); 77 78 protected HashMap jobsByGroup = new HashMap (25); 79 80 protected HashMap triggersByGroup = new HashMap (25); 81 82 protected TreeSet timeTriggers = new TreeSet (new TriggerComparator()); 83 84 protected HashMap calendarsByName = new HashMap (25); 85 86 protected ArrayList triggers = new ArrayList (1000); 87 88 protected final Object jobLock = new Object (); 89 90 protected final Object triggerLock = new Object (); 91 92 protected HashSet pausedTriggerGroups = new HashSet (); 93 94 protected HashSet blockedJobs = new HashSet (); 95 96 protected long misfireThreshold = 5000l; 97 98 protected SchedulerSignaler signaler; 99 100 private final Log log = LogFactory.getLog(getClass()); 101 102 109 110 115 public RAMJobStore() { 116 } 117 118 125 126 protected Log getLog() { 127 return log; 128 } 129 130 136 public void initialize(ClassLoadHelper loadHelper, 137 SchedulerSignaler signaler) { 138 139 this.signaler = signaler; 140 141 getLog().info("RAMJobStore initialized."); 142 } 143 144 public void schedulerStarted() throws SchedulerException { 145 } 147 148 public long getMisfireThreshold() { 149 return misfireThreshold; 150 } 151 152 159 public void setMisfireThreshold(long misfireThreshold) { 160 if (misfireThreshold < 1) { 161 throw new IllegalArgumentException ("Misfirethreashold must be larger than 0"); 162 } 163 this.misfireThreshold = misfireThreshold; 164 } 165 166 173 public void shutdown() { 174 } 175 176 public boolean supportsPersistence() { 177 return false; 178 } 179 180 193 public void storeJobAndTrigger(SchedulingContext ctxt, JobDetail newJob, 194 Trigger newTrigger) throws JobPersistenceException { 195 storeJob(ctxt, newJob, false); 196 storeTrigger(ctxt, newTrigger, false); 197 } 198 199 214 public void storeJob(SchedulingContext ctxt, JobDetail newJob, 215 boolean replaceExisting) throws ObjectAlreadyExistsException { 216 JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); 217 218 boolean repl = false; 219 220 if (jobsByFQN.get(jw.key) != null) { 221 if (!replaceExisting) { 222 throw new ObjectAlreadyExistsException(newJob); 223 } 224 repl = true; 225 } 226 227 synchronized (jobLock) { 228 if (!repl) { 229 HashMap grpMap = (HashMap ) jobsByGroup.get(newJob.getGroup()); 231 if (grpMap == null) { 232 grpMap = new HashMap (100); 233 jobsByGroup.put(newJob.getGroup(), grpMap); 234 } 235 grpMap.put(newJob.getName(), jw); 237 jobsByFQN.put(jw.key, jw); 239 } else { 240 JobWrapper orig = (JobWrapper) jobsByFQN.get(jw.key); 242 orig.jobDetail = jw.jobDetail; } 244 } 245 } 246 247 261 public boolean removeJob(SchedulingContext ctxt, String jobName, 262 String groupName) { 263 String key = JobWrapper.getJobNameKey(jobName, groupName); 264 265 boolean found = false; 266 267 Trigger[] trigger = getTriggersForJob(ctxt, jobName, 268 groupName); 269 for (int i = 0; i < trigger.length; i++) { 270 Trigger trig = trigger[i]; 271 this.removeTrigger(ctxt, trig.getName(), trig.getGroup()); 272 found = true; 273 } 274 synchronized (jobLock) { 275 found = (jobsByFQN.remove(key) != null) | found; 276 if (found) { 277 278 HashMap grpMap = (HashMap ) jobsByGroup.get(groupName); 279 if (grpMap != null) { 280 grpMap.remove(jobName); 281 if (grpMap.size() == 0) { 282 jobsByGroup.remove(groupName); 283 } 284 } 285 } 286 } 287 288 return found; 289 } 290 291 308 public void storeTrigger(SchedulingContext ctxt, Trigger newTrigger, 309 boolean replaceExisting) throws JobPersistenceException { 310 TriggerWrapper tw = new TriggerWrapper((Trigger)newTrigger.clone()); 311 312 if (triggersByFQN.get(tw.key) != null) { 313 if (!replaceExisting) { 314 throw new ObjectAlreadyExistsException(newTrigger); 315 } 316 317 removeTrigger(ctxt, newTrigger.getName(), newTrigger.getGroup()); 318 } 319 320 if (retrieveJob(ctxt, newTrigger.getJobName(), newTrigger.getJobGroup()) == null) { 321 throw new JobPersistenceException("The job (" 322 + newTrigger.getFullJobName() 323 + ") referenced by the trigger does not exist."); 324 } 325 326 synchronized (triggerLock) { 327 triggers.add(tw); 329 HashMap grpMap = (HashMap ) triggersByGroup.get(newTrigger 331 .getGroup()); 332 if (grpMap == null) { 333 grpMap = new HashMap (100); 334 triggersByGroup.put(newTrigger.getGroup(), grpMap); 335 } 336 grpMap.put(newTrigger.getName(), tw); 337 triggersByFQN.put(tw.key, tw); 339 340 synchronized (pausedTriggerGroups) { 341 if (pausedTriggerGroups.contains(newTrigger.getGroup())) { 342 tw.state = TriggerWrapper.STATE_PAUSED; 343 if (blockedJobs.contains(tw.jobKey)) { 344 tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED; 345 } 346 } else if (blockedJobs.contains(tw.jobKey)) { 347 tw.state = TriggerWrapper.STATE_BLOCKED; 348 } else { 349 timeTriggers.add(tw); 350 } 351 } 352 } 353 } 354 355 368 public boolean removeTrigger(SchedulingContext ctxt, String triggerName, 369 String groupName) { 370 String key = TriggerWrapper.getTriggerNameKey(triggerName, groupName); 371 372 boolean found = false; 373 374 synchronized (triggerLock) { 375 found = (triggersByFQN.remove(key) == null) ? false : true; 377 if (found) { 378 TriggerWrapper tw = null; 379 HashMap grpMap = (HashMap ) triggersByGroup.get(groupName); 381 if (grpMap != null) { 382 grpMap.remove(triggerName); 383 if (grpMap.size() == 0) { 384 triggersByGroup.remove(groupName); 385 } 386 } 387 Iterator tgs = triggers.iterator(); 389 while (tgs.hasNext()) { 390 tw = (TriggerWrapper) tgs.next(); 391 if (key.equals(tw.key)) { 392 tgs.remove(); 393 break; 394 } 395 } 396 timeTriggers.remove(tw); 397 398 JobWrapper jw = (JobWrapper) jobsByFQN.get(JobWrapper 399 .getJobNameKey(tw.trigger.getJobName(), tw.trigger 400 .getJobGroup())); 401 Trigger[] trigs = getTriggersForJob(ctxt, tw.trigger 402 .getJobName(), tw.trigger.getJobGroup()); 403 if ((trigs == null || trigs.length == 0) && !jw.jobDetail.isDurable()) { 404 removeJob(ctxt, tw.trigger.getJobName(), tw.trigger 405 .getJobGroup()); 406 } 407 } 408 } 409 410 return found; 411 } 412 413 414 417 public boolean replaceTrigger(SchedulingContext ctxt, String triggerName, String groupName, Trigger newTrigger) throws JobPersistenceException { 418 String key = TriggerWrapper.getTriggerNameKey(triggerName, groupName); 419 420 boolean found = false; 421 422 synchronized (triggerLock) { 423 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.remove(key); 425 found = ( tw == null) ? false : true; 426 427 if (found) { 428 429 if (!tw.getTrigger().getJobName().equals(newTrigger.getJobName()) || 430 !tw.getTrigger().getJobGroup().equals(newTrigger.getJobGroup())) { 431 throw new JobPersistenceException("New trigger is not related to the same job as the old trigger."); 432 } 433 434 tw = null; 435 HashMap grpMap = (HashMap ) triggersByGroup.get(groupName); 437 if (grpMap != null) { 438 grpMap.remove(triggerName); 439 if (grpMap.size() == 0) { 440 triggersByGroup.remove(groupName); 441 } 442 } 443 Iterator tgs = triggers.iterator(); 445 while (tgs.hasNext()) { 446 tw = (TriggerWrapper) tgs.next(); 447 if (key.equals(tw.key)) { 448 tgs.remove(); 449 break; 450 } 451 } 452 timeTriggers.remove(tw); 453 454 try { 455 storeTrigger(ctxt, newTrigger, false); 456 } catch(JobPersistenceException jpe) { 457 storeTrigger(ctxt, tw.getTrigger(), false); throw jpe; 459 } 460 } 461 } 462 463 return found; 464 } 465 466 478 public JobDetail retrieveJob(SchedulingContext ctxt, String jobName, 479 String groupName) { 480 JobWrapper jw = (JobWrapper) jobsByFQN.get(JobWrapper.getJobNameKey( 481 jobName, groupName)); 482 483 return (jw != null) ? (JobDetail)jw.jobDetail.clone() : null; 484 } 485 486 498 public Trigger retrieveTrigger(SchedulingContext ctxt, String triggerName, 499 String groupName) { 500 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 501 .getTriggerNameKey(triggerName, groupName)); 502 503 return (tw != null) ? (Trigger)tw.getTrigger().clone() : null; 504 } 505 506 518 public int getTriggerState(SchedulingContext ctxt, String triggerName, 519 String groupName) throws JobPersistenceException { 520 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 521 .getTriggerNameKey(triggerName, groupName)); 522 if (tw == null) { 523 return Trigger.STATE_NONE; 524 } 525 526 if (tw.state == TriggerWrapper.STATE_COMPLETE) { 527 return Trigger.STATE_COMPLETE; 528 } 529 530 if (tw.state == TriggerWrapper.STATE_PAUSED) { 531 return Trigger.STATE_PAUSED; 532 } 533 534 if (tw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) { 535 return Trigger.STATE_PAUSED; 536 } 537 538 if (tw.state == TriggerWrapper.STATE_BLOCKED) { 539 return Trigger.STATE_BLOCKED; 540 } 541 542 if (tw.state == TriggerWrapper.STATE_ERROR) { 543 return Trigger.STATE_ERROR; 544 } 545 546 return Trigger.STATE_NORMAL; 547 } 548 549 569 public void storeCalendar(SchedulingContext ctxt, String name, 570 Calendar calendar, boolean replaceExisting, boolean updateTriggers) 571 throws ObjectAlreadyExistsException { 572 Object obj = calendarsByName.get(name); 573 574 if (obj != null && replaceExisting == false) { 575 throw new ObjectAlreadyExistsException( 576 "Calendar with name '" + name + "' already exists."); 577 } else if (obj != null) { 578 calendarsByName.remove(name); 579 } 580 581 calendarsByName.put(name, calendar); 582 583 if(obj != null && updateTriggers) { 584 synchronized (triggerLock) { 585 Iterator trigs = getTriggerWrappersForCalendar(name).iterator(); 586 while (trigs.hasNext()) { 587 TriggerWrapper tw = (TriggerWrapper) trigs.next(); 588 Trigger trig = tw.getTrigger(); 589 boolean removed = timeTriggers.remove(tw); 590 591 trig.updateWithNewCalendar(calendar, getMisfireThreshold()); 592 593 if(removed) { 594 timeTriggers.add(tw); 595 } 596 } 597 } 598 } 599 } 600 601 616 public boolean removeCalendar(SchedulingContext ctxt, String calName) 617 throws JobPersistenceException { 618 int numRefs = 0; 619 620 synchronized (triggerLock) { 621 Iterator itr = triggers.iterator(); 622 while (itr.hasNext()) { 623 Trigger trigg = ((TriggerWrapper) itr.next()).trigger; 624 if (trigg.getCalendarName() != null 625 && trigg.getCalendarName().equals(calName)) { 626 numRefs++; 627 } 628 } 629 } 630 631 if (numRefs > 0) { 632 throw new JobPersistenceException( 633 "Calender cannot be removed if it referenced by a Trigger!"); 634 } 635 636 return (calendarsByName.remove(calName) != null); 637 } 638 639 649 public Calendar retrieveCalendar(SchedulingContext ctxt, String calName) { 650 return (Calendar) calendarsByName.get(calName); 651 } 652 653 659 public int getNumberOfJobs(SchedulingContext ctxt) { 660 return jobsByFQN.size(); 661 } 662 663 669 public int getNumberOfTriggers(SchedulingContext ctxt) { 670 return triggers.size(); 671 } 672 673 679 public int getNumberOfCalendars(SchedulingContext ctxt) { 680 return calendarsByName.size(); 681 } 682 683 689 public String [] getJobNames(SchedulingContext ctxt, String groupName) { 690 String [] outList = null; 691 HashMap grpMap = (HashMap ) jobsByGroup.get(groupName); 692 if (grpMap != null) { 693 synchronized (jobLock) { 694 outList = new String [grpMap.size()]; 695 int outListPos = 0; 696 697 for (Iterator valueIter = grpMap.values().iterator(); valueIter.hasNext();) { 698 JobWrapper jw = (JobWrapper)valueIter.next(); 699 700 if (jw != null) { 701 outList[outListPos++] = jw.jobDetail.getName(); 702 } 703 } 704 } 705 } else { 706 outList = new String [0]; 707 } 708 709 return outList; 710 } 711 712 723 public String [] getCalendarNames(SchedulingContext ctxt) { 724 Set names = calendarsByName.keySet(); 725 return (String []) names.toArray(new String [names.size()]); 726 } 727 728 734 public String [] getTriggerNames(SchedulingContext ctxt, String groupName) { 735 String [] outList = null; 736 HashMap grpMap = (HashMap ) triggersByGroup.get(groupName); 737 if (grpMap != null) { 738 synchronized (triggerLock) { 739 outList = new String [grpMap.size()]; 740 int outListPos = 0; 741 742 for (Iterator valueIter = grpMap.values().iterator(); valueIter.hasNext();) { 743 TriggerWrapper tw = (TriggerWrapper) valueIter.next(); 744 745 if (tw != null) { 746 outList[outListPos++] = tw.trigger.getName(); 747 } 748 } 749 } 750 } else { 751 outList = new String [0]; 752 } 753 754 return outList; 755 } 756 757 763 public String [] getJobGroupNames(SchedulingContext ctxt) { 764 String [] outList = null; 765 766 synchronized (jobLock) { 767 outList = new String [jobsByGroup.size()]; 768 int outListPos = 0; 769 Iterator keys = jobsByGroup.keySet().iterator(); 770 while (keys.hasNext()) { 771 outList[outListPos++] = (String ) keys.next(); 772 } 773 } 774 775 return outList; 776 } 777 778 784 public String [] getTriggerGroupNames(SchedulingContext ctxt) { 785 String [] outList = null; 786 787 synchronized (triggerLock) { 788 outList = new String [triggersByGroup.size()]; 789 int outListPos = 0; 790 Iterator keys = triggersByGroup.keySet().iterator(); 791 while (keys.hasNext()) { 792 outList[outListPos++] = (String ) keys.next(); 793 } 794 } 795 796 return outList; 797 } 798 799 808 public Trigger[] getTriggersForJob(SchedulingContext ctxt, String jobName, 809 String groupName) { 810 ArrayList trigList = new ArrayList (); 811 812 String jobKey = JobWrapper.getJobNameKey(jobName, groupName); 813 synchronized (triggerLock) { 814 for (int i = 0; i < triggers.size(); i++) { 815 TriggerWrapper tw = (TriggerWrapper) triggers.get(i); 816 if (tw.jobKey.equals(jobKey)) { 817 trigList.add(tw.trigger.clone()); 818 } 819 } 820 } 821 822 return (Trigger[]) trigList.toArray(new Trigger[trigList.size()]); 823 } 824 825 protected ArrayList getTriggerWrappersForJob(String jobName, String groupName) { 826 ArrayList trigList = new ArrayList (); 827 828 String jobKey = JobWrapper.getJobNameKey(jobName, groupName); 829 synchronized (triggerLock) { 830 for (int i = 0; i < triggers.size(); i++) { 831 TriggerWrapper tw = (TriggerWrapper) triggers.get(i); 832 if (tw.jobKey.equals(jobKey)) { 833 trigList.add(tw); 834 } 835 } 836 } 837 838 return trigList; 839 } 840 841 protected ArrayList getTriggerWrappersForCalendar(String calName) { 842 ArrayList trigList = new ArrayList (); 843 844 synchronized (triggerLock) { 845 for (int i = 0; i < triggers.size(); i++) { 846 TriggerWrapper tw = (TriggerWrapper) triggers.get(i); 847 String tcalName = tw.getTrigger().getCalendarName(); 848 if (tcalName != null && tcalName.equals(calName)) { 849 trigList.add(tw); 850 } 851 } 852 } 853 854 return trigList; 855 } 856 857 863 public void pauseTrigger(SchedulingContext ctxt, String triggerName, 864 String groupName) { 865 866 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 867 .getTriggerNameKey(triggerName, groupName)); 868 869 if (tw == null || tw.trigger == null) { 871 return; 872 } 873 874 if (tw.state == TriggerWrapper.STATE_COMPLETE) { 876 return; 877 } 878 879 synchronized (triggerLock) { 880 if(tw.state == TriggerWrapper.STATE_BLOCKED) { 881 tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED; 882 } else { 883 tw.state = TriggerWrapper.STATE_PAUSED; 884 } 885 886 timeTriggers.remove(tw); 887 } 888 } 889 890 902 public void pauseTriggerGroup(SchedulingContext ctxt, String groupName) { 903 904 synchronized (pausedTriggerGroups) { 905 if (pausedTriggerGroups.contains(groupName)) { 906 return; 907 } 908 909 pausedTriggerGroups.add(groupName); 910 String [] names = getTriggerNames(ctxt, groupName); 911 912 for (int i = 0; i < names.length; i++) { 913 pauseTrigger(ctxt, names[i], groupName); 914 } 915 } 916 } 917 918 925 public void pauseJob(SchedulingContext ctxt, String jobName, 926 String groupName) { 927 synchronized (pausedTriggerGroups) { 928 Trigger[] triggers = getTriggersForJob(ctxt, jobName, groupName); 929 for (int j = 0; j < triggers.length; j++) { 930 pauseTrigger(ctxt, triggers[j].getName(), triggers[j].getGroup()); 931 } 932 } 933 } 934 935 948 public void pauseJobGroup(SchedulingContext ctxt, String groupName) { 949 synchronized (pausedTriggerGroups) { 950 String [] jobNames = getJobNames(ctxt, groupName); 951 952 for (int i = 0; i < jobNames.length; i++) { 953 Trigger[] triggers = getTriggersForJob(ctxt, jobNames[i], 954 groupName); 955 for (int j = 0; j < triggers.length; j++) { 956 pauseTrigger(ctxt, triggers[j].getName(), 957 triggers[j].getGroup()); 958 } 959 } 960 } 961 } 962 963 975 public void resumeTrigger(SchedulingContext ctxt, String triggerName, 976 String groupName) { 977 978 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 979 .getTriggerNameKey(triggerName, groupName)); 980 981 Trigger trig = tw.getTrigger(); 982 983 if (tw == null || tw.trigger == null) { 985 return; 986 } 987 if (tw.state != TriggerWrapper.STATE_PAUSED && 989 tw.state != TriggerWrapper.STATE_PAUSED_BLOCKED) { 990 return; 991 } 992 993 synchronized (triggerLock) { 994 if(blockedJobs.contains( JobWrapper.getJobNameKey(trig.getJobName(), trig.getJobGroup()) )) { 995 tw.state = TriggerWrapper.STATE_BLOCKED; 996 } else { 997 tw.state = TriggerWrapper.STATE_WAITING; 998 } 999 1000 applyMisfire(tw); 1001 1002 if (tw.state == TriggerWrapper.STATE_WAITING) { 1003 timeTriggers.add(tw); 1004 } 1005 } 1006 } 1007 1008 1020 public void resumeTriggerGroup(SchedulingContext ctxt, String groupName) { 1021 1022 synchronized (pausedTriggerGroups) { 1023 String [] names = getTriggerNames(ctxt, groupName); 1024 1025 for (int i = 0; i < names.length; i++) { 1026 resumeTrigger(ctxt, names[i], groupName); 1027 } 1028 pausedTriggerGroups.remove(groupName); 1029 } 1030 } 1031 1032 1045 public void resumeJob(SchedulingContext ctxt, String jobName, 1046 String groupName) { 1047 1048 synchronized (pausedTriggerGroups) { 1049 Trigger[] triggers = getTriggersForJob(ctxt, jobName, groupName); 1050 for (int j = 0; j < triggers.length; j++) { 1051 resumeTrigger(ctxt, triggers[j].getName(), triggers[j].getGroup()); 1052 } 1053 } 1054 } 1055 1056 1069 public void resumeJobGroup(SchedulingContext ctxt, String groupName) { 1070 synchronized (pausedTriggerGroups) { 1071 String [] jobNames = getJobNames(ctxt, groupName); 1072 1073 for (int i = 0; i < jobNames.length; i++) { 1074 Trigger[] triggers = getTriggersForJob(ctxt, jobNames[i], 1075 groupName); 1076 for (int j = 0; j < triggers.length; j++) { 1077 resumeTrigger(ctxt, triggers[j].getName(), 1078 triggers[j].getGroup()); 1079 } 1080 } 1081 } 1082 } 1083 1084 1098 public void pauseAll(SchedulingContext ctxt) { 1099 1100 synchronized (pausedTriggerGroups) { 1101 String [] names = getTriggerGroupNames(ctxt); 1102 1103 for (int i = 0; i < names.length; i++) { 1104 pauseTriggerGroup(ctxt, names[i]); 1105 } 1106 } 1107 } 1108 1109 1122 public void resumeAll(SchedulingContext ctxt) { 1123 1124 synchronized (pausedTriggerGroups) { 1125 String [] names = getTriggerGroupNames(ctxt); 1126 1127 for (int i = 0; i < names.length; i++) { 1128 resumeTriggerGroup(ctxt, names[i]); 1129 } 1130 } 1131 } 1132 1133 protected boolean applyMisfire(TriggerWrapper tw) { 1134 1135 long misfireTime = System.currentTimeMillis(); 1136 if (getMisfireThreshold() > 0) { 1137 misfireTime -= getMisfireThreshold(); 1138 } 1139 1140 java.util.Date tnft = tw.trigger.getNextFireTime(); 1141 if (tnft.getTime() > misfireTime) { return false; } 1142 1143 Calendar cal = null; 1144 if (tw.trigger.getCalendarName() != null) { 1145 cal = retrieveCalendar(null, tw.trigger.getCalendarName()); 1146 } 1147 1148 signaler.notifyTriggerListenersMisfired((Trigger)tw.trigger.clone()); 1149 1150 tw.trigger.updateAfterMisfire(cal); 1151 1152 if (tw.trigger.getNextFireTime() == null) { 1153 tw.state = TriggerWrapper.STATE_COMPLETE; 1154 synchronized (triggerLock) { 1155 timeTriggers.remove(tw); 1156 } 1157 } else if (tnft.equals(tw.trigger.getNextFireTime())) { 1158 return false; 1159 } 1160 1161 return true; 1162 } 1163 1164 private static long ftrCtr = System.currentTimeMillis(); 1165 1166 protected synchronized String getFiredTriggerRecordId() { 1167 return String.valueOf(ftrCtr++); 1168 } 1169 1170 1178 public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan) { 1179 TriggerWrapper tw = null; 1180 1181 synchronized (triggerLock) { 1182 1183 while (tw == null) { 1184 try { 1185 tw = (TriggerWrapper) timeTriggers.first(); 1186 } catch (java.util.NoSuchElementException nsee) { 1187 return null; 1188 } 1189 1190 if (tw == null) { 1191 return null; 1192 } 1193 1194 if (tw.trigger.getNextFireTime() == null) { 1195 timeTriggers.remove(tw); 1196 tw = null; 1197 continue; 1198 } 1199 1200 timeTriggers.remove(tw); 1201 1202 if (applyMisfire(tw)) { 1203 if (tw.trigger.getNextFireTime() != null) { 1204 timeTriggers.add(tw); 1205 } 1206 tw = null; 1207 continue; 1208 } 1209 1210 if(tw.trigger.getNextFireTime().getTime() > noLaterThan) { 1211 timeTriggers.add(tw); 1212 return null; 1213 } 1214 1215 tw.state = TriggerWrapper.STATE_ACQUIRED; 1216 1217 tw.trigger.setFireInstanceId(getFiredTriggerRecordId()); 1218 Trigger trig = (Trigger) tw.trigger.clone(); 1219 return trig; 1220 } 1221 } 1222 1223 return null; 1224 } 1225 1226 1233 public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger) { 1234 synchronized (triggerLock) { 1235 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 1236 .getTriggerNameKey(trigger)); 1237 if (tw != null && tw.state == TriggerWrapper.STATE_ACQUIRED) { 1238 tw.state = TriggerWrapper.STATE_WAITING; 1239 timeTriggers.add(tw); 1240 } 1241 } 1242 } 1243 1244 1251 public TriggerFiredBundle triggerFired(SchedulingContext ctxt, 1252 Trigger trigger) { 1253 1254 synchronized (triggerLock) { 1255 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 1256 .getTriggerNameKey(trigger)); 1257 if (tw == null || tw.trigger == null) { 1259 return null; 1260 } 1261 if (tw.state == TriggerWrapper.STATE_COMPLETE) { 1263 return null; 1264 } 1265 if (tw.state == TriggerWrapper.STATE_PAUSED) { 1267 return null; 1268 } 1269 if (tw.state == TriggerWrapper.STATE_BLOCKED) { 1271 return null; 1272 } 1273 if (tw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) { 1275 return null; 1276 } 1277 1278 Calendar cal = null; 1279 if (tw.trigger.getCalendarName() != null) { 1280 cal = retrieveCalendar(ctxt, tw.trigger.getCalendarName()); 1281 } 1282 Date prevFireTime = trigger.getPreviousFireTime(); 1283 tw.trigger.triggered(cal); 1285 trigger.triggered(cal); 1286 tw.state = TriggerWrapper.STATE_WAITING; 1288 1289 TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(ctxt, 1290 trigger.getJobName(), trigger.getJobGroup()), trigger, cal, 1291 false, new Date (), trigger.getPreviousFireTime(), prevFireTime, 1292 trigger.getNextFireTime()); 1293 1294 JobDetail job = bndle.getJobDetail(); 1295 1296 if (job.isStateful()) { 1297 ArrayList trigs = getTriggerWrappersForJob(job.getName(), job 1298 .getGroup()); 1299 Iterator itr = trigs.iterator(); 1300 while (itr.hasNext()) { 1301 TriggerWrapper ttw = (TriggerWrapper) itr.next(); 1302 if(ttw.state == TriggerWrapper.STATE_WAITING) { 1303 ttw.state = TriggerWrapper.STATE_BLOCKED; 1304 } 1305 if(ttw.state == TriggerWrapper.STATE_PAUSED) { 1306 ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED; 1307 } 1308 timeTriggers.remove(ttw); 1309 } 1310 blockedJobs.add(JobWrapper.getJobNameKey(job)); 1311 } else if (tw.trigger.getNextFireTime() != null) { 1312 synchronized (triggerLock) { 1313 timeTriggers.add(tw); 1314 } 1315 } 1316 1317 return bndle; 1318 } 1319 } 1320 1321 1330 public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger, 1331 JobDetail jobDetail, int triggerInstCode) { 1332 1333 synchronized (triggerLock) { 1334 1335 String jobKey = JobWrapper.getJobNameKey(jobDetail.getName(), jobDetail 1336 .getGroup()); 1337 JobWrapper jw = (JobWrapper) jobsByFQN.get(jobKey); 1338 TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper 1339 .getTriggerNameKey(trigger)); 1340 1341 if (jw != null) { 1346 JobDetail jd = jw.jobDetail; 1347 1348 if (jd.isStateful()) { 1349 JobDataMap newData = jobDetail.getJobDataMap(); 1350 if (newData != null) { 1351 newData = (JobDataMap)newData.clone(); 1352 newData.clearDirtyFlag(); 1353 } 1354 jd.setJobDataMap(newData); 1355 blockedJobs.remove(JobWrapper.getJobNameKey(jd)); 1356 ArrayList trigs = getTriggerWrappersForJob(jd.getName(), jd 1357 .getGroup()); 1358 Iterator itr = trigs.iterator(); 1359 while (itr.hasNext()) { 1360 TriggerWrapper ttw = (TriggerWrapper) itr.next(); 1361 if (ttw.state == TriggerWrapper.STATE_BLOCKED) { 1362 ttw.state = TriggerWrapper.STATE_WAITING; 1363 timeTriggers.add(ttw); 1364 } 1365 if (ttw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) { 1366 ttw.state = TriggerWrapper.STATE_PAUSED; 1367 } 1368 } 1369 } 1370 } else { blockedJobs.remove(JobWrapper.getJobNameKey(jobDetail)); 1372 } 1373 1374 if (tw != null) { 1376 if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) { 1377 1378 if(trigger.getNextFireTime() == null) { 1379 if(tw.getTrigger().getNextFireTime() == null) { 1382 removeTrigger(ctxt, trigger.getName(), trigger.getGroup()); 1383 } 1384 } else { 1385 removeTrigger(ctxt, trigger.getName(), trigger.getGroup()); 1386 } 1387 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) { 1388 tw.state = TriggerWrapper.STATE_COMPLETE; 1389 timeTriggers.remove(tw); 1390 } else if(triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) { 1391 getLog().info("Trigger " + trigger.getFullName() + " set to ERROR state."); 1392 tw.state = TriggerWrapper.STATE_ERROR; 1393 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) { 1394 getLog().info("All triggers of Job " 1395 + trigger.getFullJobName() + " set to ERROR state."); 1396 setAllTriggersOfJobToState( 1397 trigger.getJobName(), 1398 trigger.getJobGroup(), 1399 TriggerWrapper.STATE_ERROR); 1400 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) { 1401 setAllTriggersOfJobToState( 1402 trigger.getJobName(), 1403 trigger.getJobGroup(), 1404 TriggerWrapper.STATE_COMPLETE); 1405 } 1406 } 1407 } 1408 } 1409 1410 protected void setAllTriggersOfJobToState(String jobName, String jobGroup, int state) { 1411 ArrayList tws = getTriggerWrappersForJob(jobName, jobGroup); 1412 Iterator itr = tws.iterator(); 1413 while (itr.hasNext()) { 1414 TriggerWrapper tw = (TriggerWrapper) itr.next(); 1415 tw.state = state; 1416 if(state != TriggerWrapper.STATE_WAITING) { 1417 timeTriggers.remove(tw); 1418 } 1419 } 1420 } 1421 1422 protected String peekTriggers() { 1423 1424 StringBuffer str = new StringBuffer (); 1425 TriggerWrapper tw = null; 1426 synchronized (triggerLock) { 1427 for (Iterator valueIter = triggersByFQN.values().iterator(); valueIter.hasNext();) { 1428 tw = (TriggerWrapper)valueIter.next(); 1429 str.append(tw.trigger.getName()); 1430 str.append("/"); 1431 } 1432 } 1433 str.append(" | "); 1434 1435 synchronized (triggerLock) { 1436 Iterator itr = timeTriggers.iterator(); 1437 while (itr.hasNext()) { 1438 tw = (TriggerWrapper) itr.next(); 1439 str.append(tw.trigger.getName()); 1440 str.append("->"); 1441 } 1442 } 1443 1444 return str.toString(); 1445 } 1446 1447 1450 public Set getPausedTriggerGroups(SchedulingContext ctxt) throws JobPersistenceException { 1451 HashSet set = new HashSet (); 1452 1453 set.addAll(pausedTriggerGroups); 1454 1455 return set; 1456 } 1457 1458} 1459 1460 1465 1466class TriggerComparator implements Comparator { 1467 1468 public int compare(Object obj1, Object obj2) { 1469 TriggerWrapper trig1 = (TriggerWrapper) obj1; 1470 TriggerWrapper trig2 = (TriggerWrapper) obj2; 1471 1472 int comp = trig1.trigger.compareTo(trig2.trigger); 1473 if (comp != 0) { 1474 return comp; 1475 } 1476 1477 comp = trig2.trigger.getPriority() - trig1.trigger.getPriority(); 1478 if (comp != 0) { 1479 return comp; 1480 } 1481 1482 return trig1.trigger.getFullName().compareTo(trig2.trigger.getFullName()); 1483 } 1484 1485 public boolean equals(Object obj) { 1486 return (obj instanceof TriggerComparator); 1487 } 1488} 1489 1490class JobWrapper { 1491 1492 public String key; 1493 1494 public JobDetail jobDetail; 1495 1496 JobWrapper(JobDetail jobDetail) { 1497 this.jobDetail = jobDetail; 1498 key = getJobNameKey(jobDetail); 1499 } 1500 1501 JobWrapper(JobDetail jobDetail, String key) { 1502 this.jobDetail = jobDetail; 1503 this.key = key; 1504 } 1505 1506 static String getJobNameKey(JobDetail jobDetail) { 1507 return jobDetail.getGroup() + "_$x$x$_" + jobDetail.getName(); 1508 } 1509 1510 static String getJobNameKey(String jobName, String groupName) { 1511 return groupName + "_$x$x$_" + jobName; 1512 } 1513 1514 public boolean equals(Object obj) { 1515 if (obj instanceof JobWrapper) { 1516 JobWrapper jw = (JobWrapper) obj; 1517 if (jw.key.equals(this.key)) { 1518 return true; 1519 } 1520 } 1521 1522 return false; 1523 } 1524 1525 public int hashCode() { 1526 return key.hashCode(); 1527 } 1528 1529 1530} 1531 1532class TriggerWrapper { 1533 1534 public String key; 1535 1536 public String jobKey; 1537 1538 public Trigger trigger; 1539 1540 public int state = STATE_WAITING; 1541 1542 public static final int STATE_WAITING = 0; 1543 1544 public static final int STATE_ACQUIRED = 1; 1545 1546 public static final int STATE_EXECUTING = 2; 1547 1548 public static final int STATE_COMPLETE = 3; 1549 1550 public static final int STATE_PAUSED = 4; 1551 1552 public static final int STATE_BLOCKED = 5; 1553 1554 public static final int STATE_PAUSED_BLOCKED = 6; 1555 1556 public static final int STATE_ERROR = 7; 1557 1558 TriggerWrapper(Trigger trigger) { 1559 this.trigger = trigger; 1560 key = getTriggerNameKey(trigger); 1561 this.jobKey = JobWrapper.getJobNameKey(trigger.getJobName(), trigger 1562 .getJobGroup()); 1563 } 1564 1565 static String getTriggerNameKey(Trigger trigger) { 1566 return trigger.getGroup() + "_$x$x$_" + trigger.getName(); 1567 } 1568 1569 static String getTriggerNameKey(String triggerName, String groupName) { 1570 return groupName + "_$x$x$_" + triggerName; 1571 } 1572 1573 public boolean equals(Object obj) { 1574 if (obj instanceof TriggerWrapper) { 1575 TriggerWrapper tw = (TriggerWrapper) obj; 1576 if (tw.key.equals(this.key)) { 1577 return true; 1578 } 1579 } 1580 1581 return false; 1582 } 1583 1584 public int hashCode() { 1585 return key.hashCode(); 1586 } 1587 1588 1589 public Trigger getTrigger() { 1590 return this.trigger; 1591 } 1592} 1593 | Popular Tags |