1 22 package org.jboss.varia.scheduler; 23 24 import java.util.ArrayList ; 25 import java.util.Arrays ; 26 import java.util.Collections ; 27 import java.util.List ; 28 import java.util.Date ; 29 import java.util.Iterator ; 30 import java.util.Map ; 31 32 import javax.management.InstanceNotFoundException ; 33 import javax.management.JMException ; 34 import javax.management.MBeanException ; 35 import javax.management.MBeanServer ; 36 import javax.management.MBeanServerInvocationHandler ; 37 import javax.management.MalformedObjectNameException ; 38 import javax.management.Notification ; 39 import javax.management.NotificationListener ; 40 import javax.management.NotificationEmitter ; 41 import javax.management.ObjectName ; 42 import javax.management.timer.TimerNotification ; 43 import javax.management.timer.TimerMBean ; 44 import javax.management.timer.Timer ; 45 46 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 47 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 48 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 49 import org.jboss.logging.Logger; 50 import org.jboss.system.ServiceMBeanSupport; 51 52 66 public class ScheduleManager extends ServiceMBeanSupport 67 implements ScheduleManagerMBean 68 { 69 70 74 77 public static String DEFAULT_TIMER_NAME = "jboss:service=Timer"; 78 79 82 private static SynchronizedInt sCounter = new SynchronizedInt(0); 83 84 private static final int NOTIFICATION = 0; 85 private static final int DATE = 1; 86 private static final int REPETITIONS = 2; 87 private static final int SCHEDULER_NAME = 3; 88 private static final int NULL = 4; 89 private static final int ID = 5; 90 private static final int NEXT_DATE = 6; 91 92 96 private String mTimerName = DEFAULT_TIMER_NAME; 97 private ObjectName mTimerObjectName; 98 private TimerMBean mTimer; 99 private NotificationEmitter mTimerEmitter; 100 101 private boolean mStartOnStart = true; 102 private boolean mFixedRate = false; 103 private SynchronizedBoolean mIsPaused = new SynchronizedBoolean(false); 104 105 109 private List mProviders = Collections.synchronizedList(new ArrayList ()); 110 111 114 private Map mSchedules = new ConcurrentReaderHashMap(); 115 116 120 123 public ScheduleManager() 124 { 125 } 126 127 131 136 public void startSchedules() 137 { 138 log.debug("startSchedules()"); 139 if (!isStarted()) 141 { 142 Iterator i = mSchedules.values().iterator(); 144 while (i.hasNext()) 145 { 146 ScheduleInstance lInstance = (ScheduleInstance) i.next(); 147 try 148 { 149 lInstance.start(); 150 } 151 catch (JMException e) 152 { 153 log.error("Could not start: " + lInstance, e); 154 } 155 } 156 } 157 } 158 159 168 public void stopSchedules(boolean pDoItNow) 169 { 170 if (isStarted()) 172 { 173 Iterator i = mSchedules.values().iterator(); 175 while (i.hasNext()) 176 { 177 ScheduleInstance lInstance = (ScheduleInstance) i.next(); 178 try 179 { 180 lInstance.stop(); 181 } 182 catch (JMException e) 183 { 184 log.error("Could not stop: " + lInstance, e); 185 } 186 } 187 } 188 } 189 190 195 public void restartSchedule() 196 { 197 stopSchedules(true); 198 startSchedules(); 199 } 200 201 210 public void registerProvider(String pProviderObjectName) 211 { 212 try 213 { 214 registerProvider(new ObjectName (pProviderObjectName)); 215 } 216 catch (JMException jme) 217 { 218 log.error("Could not call startProviding() on " + pProviderObjectName, jme); 219 } 220 } 221 222 231 public void registerProvider(ObjectName pProviderObjectName) 232 throws JMException 233 { 234 if (pProviderObjectName == null) 235 { 236 throw new MalformedObjectNameException ("Provider must not be null"); 237 } 238 synchronized (mProviders) { 239 if (mProviders.contains(pProviderObjectName)) 240 throw new JMException ("Already registered: " + pProviderObjectName); 241 mProviders.add(pProviderObjectName); 242 } 243 server.invoke( 244 pProviderObjectName, 245 "startProviding", 246 new Object []{}, 247 new String []{} 248 ); 249 } 250 251 259 public void unregisterProvider(String pProviderObjectName) 260 { 261 try 262 { 263 unregisterProvider(new ObjectName (pProviderObjectName)); 264 } 265 catch (JMException jme) 266 { 267 log.error("Could not call stopProviding() on " + pProviderObjectName, jme); 268 } 269 } 270 271 279 public void unregisterProvider(ObjectName pProviderObjectName) 280 throws JMException 281 { 282 if (!mProviders.remove(pProviderObjectName)) 283 return; 284 server.invoke( 285 pProviderObjectName, 286 "stopProviding", 287 new Object []{}, 288 new String []{} 289 ); 290 } 291 292 307 public int addSchedule( 308 ObjectName pProvider, 309 ObjectName pTarget, 310 String pMethodName, 311 String [] pMethodSignature, 312 Date pStartDate, 313 long pPeriod, 314 int pRepetitions 315 ) 316 { 317 ScheduleInstance lInstance = new ScheduleInstance( 318 pProvider, 319 pTarget, 320 pMethodName, 321 pMethodSignature, 322 pStartDate, 323 pRepetitions, 324 pPeriod 325 ); 326 if (isStarted()) 327 { 328 try 329 { 330 lInstance.start(); 331 } 332 catch (JMException jme) 333 { 334 log.error("Could not start " + lInstance, jme); 335 } 336 } 337 int lID = lInstance.getID(); 338 mSchedules.put(new Integer (lID), lInstance); 339 return lID; 340 } 341 342 351 public void removeSchedule(int pIdentification) 352 { 353 ScheduleInstance lInstance = (ScheduleInstance) mSchedules.get(new Integer (pIdentification)); 354 try 355 { 356 if (lInstance == null) 357 throw new InstanceNotFoundException (); 358 lInstance.stop(); 359 } 360 catch (JMException e) 361 { 362 log.error("Could not stop " + lInstance, e); 363 } 364 mSchedules.remove(new Integer (pIdentification)); 365 } 366 367 372 public String getSchedules() 373 { 374 Iterator i = mSchedules.values().iterator(); 375 StringBuffer lReturn = new StringBuffer (); 376 boolean lFirst = true; 377 while (i.hasNext()) 378 { 379 ScheduleInstance lInstance = (ScheduleInstance) i.next(); 380 if (lFirst) 381 { 382 lReturn.append(lInstance.mIdentification); 383 lFirst = false; 384 } 385 else 386 { 387 lReturn.append(",").append(lInstance.mIdentification); 388 } 389 } 390 return lReturn.toString(); 391 } 392 393 398 public boolean isPaused() 399 { 400 return mIsPaused.get(); 401 } 402 403 409 public void setPaused(boolean pIsPaused) 410 { 411 mIsPaused.set(pIsPaused); 412 } 413 414 417 public boolean isStarted() 418 { 419 return getState() == STARTED; 420 } 421 422 426 435 public void setStartAtStartup(boolean pStartAtStartup) 436 { 437 mStartOnStart = pStartAtStartup; 438 } 439 440 445 public boolean isStartAtStartup() 446 { 447 return mStartOnStart; 448 } 449 450 457 public void setTimerName(String pTimerName) 458 { 459 mTimerName = pTimerName; 460 } 461 462 467 public String getTimerName() 468 { 469 return mTimerName; 470 } 471 472 477 public void setFixedRate(boolean fixedRate) 478 { 479 mFixedRate = fixedRate; 480 } 481 482 487 public boolean getFixedRate() 488 { 489 return mFixedRate; 490 } 491 492 496 public ObjectName getObjectName(MBeanServer pServer, ObjectName pName) 497 throws MalformedObjectNameException 498 { 499 return pName == null ? OBJECT_NAME : pName; 500 } 501 502 510 protected void startService() throws Exception 511 { 512 mTimerObjectName = new ObjectName (mTimerName); 513 if (!getServer().isRegistered(mTimerObjectName)) 514 { 515 getServer().createMBean(Timer .class.getName(), mTimerObjectName); 516 } 517 mTimer = (TimerMBean )MBeanServerInvocationHandler.newProxyInstance(getServer(), 518 mTimerObjectName, TimerMBean .class, true); 519 mTimerEmitter = (NotificationEmitter )mTimer; 520 if (!mTimer.isActive()) 521 { 522 mTimer.start(); 523 } 524 startSchedules(); 525 } 526 527 530 protected void stopService() 531 { 532 stopSchedules(true); 533 } 534 535 540 protected void destroyService() 541 { 542 Iterator i = mSchedules.values().iterator(); 544 while (i.hasNext()) 545 { 546 ScheduleInstance lInstance = (ScheduleInstance) i.next(); 547 unregisterProvider(lInstance.mProvider.toString()); 548 } 549 } 550 551 555 560 public class MBeanListener implements NotificationListener 561 { 562 private final Logger log = Logger.getLogger(MBeanListener.class); 563 564 private ScheduleInstance mSchedule; 565 566 public MBeanListener(ScheduleInstance pSchedule) 567 { 568 mSchedule = pSchedule; 569 } 570 571 public void handleNotification(Notification pNotification, Object pHandback) 572 { 573 boolean trace = log.isTraceEnabled(); 574 if (trace) { 575 log.trace("MBeanListener.handleNotification: " + pNotification); 576 } 577 578 try 579 { 580 if (!isStarted()) { 581 log.trace("Scheduler not started"); 582 mSchedule.stop(); 583 return; 584 } 585 if (mSchedule.mRemainingRepetitions == 0) 586 { 587 log.trace("No more repetitions"); 588 mSchedule.stop(); 589 return; 590 } 591 if (mIsPaused.get()) 592 { 593 log.trace("Paused"); 594 return; 595 } 596 if (mSchedule.mRemainingRepetitions > 0) 597 { 598 mSchedule.mRemainingRepetitions--; 599 if (trace) 600 log.trace("Remaining repetitions: " + mSchedule.mRemainingRepetitions); 601 } 602 Object [] lArguments = getArguments(pNotification); 603 if (trace) 604 { 605 log.trace("invoke " + mSchedule); 606 log.trace("arguments are: " + Arrays.asList(lArguments)); 607 } 608 609 ObjectName on = mSchedule.mTarget; 610 String mn = mSchedule.mMethodName; 611 getServer().invoke(on, mn, lArguments, 612 mSchedule.mSchedulableMBeanArgumentTypes 613 ); 614 } 615 catch (Exception e) 616 { 617 log.error("Invoke failed: " + mSchedule.getTargetString(), e); 618 } 619 } 620 621 private Object [] getArguments(Notification pNotification) 622 { 623 Object [] lArguments = new Object [mSchedule.mSchedulableMBeanArguments.length]; 624 Date lTimeStamp = new Date (pNotification.getTimeStamp()); 625 for (int i = 0; i < lArguments.length; i++) 626 { 627 switch (mSchedule.mSchedulableMBeanArguments[i]) 628 { 629 case ID: 630 lArguments[i] = pNotification.getUserData(); 631 break; 632 case NOTIFICATION: 633 lArguments[i] = pNotification; 634 break; 635 case DATE: 636 lArguments[i] = lTimeStamp; 637 break; 638 case REPETITIONS: 639 lArguments[i] = new Long (mSchedule.mRemainingRepetitions); 640 break; 641 case SCHEDULER_NAME: 642 lArguments[i] = getServiceName(); 643 break; 644 case NEXT_DATE: 645 lArguments[i] = new Date (lTimeStamp.getTime() + mSchedule.mPeriod); 646 break; 647 default: 648 lArguments[i] = null; 649 } 650 } 651 return lArguments; 652 } 653 654 } 655 656 659 static class IdNotificationFilter implements javax.management.NotificationFilter 660 { 661 private static final Logger log = Logger.getLogger(IdNotificationFilter.class); 662 663 private Integer filterId; 664 665 669 public IdNotificationFilter(int filterId) 670 { 671 this.filterId = new Integer (filterId); 672 } 673 674 677 public boolean isNotificationEnabled(Notification pNotification) 678 { 679 if (!(pNotification instanceof TimerNotification )) 680 return false; 681 TimerNotification lTimerNotification = (TimerNotification ) pNotification; 682 if (log.isTraceEnabled()) 683 log.trace("isNotificationEnabled(), filterId=" + filterId + 684 ", notification=" + pNotification + 685 ", notificationId=" + lTimerNotification.getNotificationID() + 686 ", timestamp=" + lTimerNotification.getTimeStamp() + 687 ", message=" + lTimerNotification.getMessage() 688 ); 689 return lTimerNotification.getNotificationID().equals(filterId); 690 } 691 } 692 693 697 private class ScheduleInstance 698 { 699 700 private final Logger log = Logger.getLogger(ScheduleInstance.class); 701 private int mIdentification; 702 private MBeanListener mListener; 703 704 public int mNotificationID; 705 public ObjectName mProvider; 706 public ObjectName mTarget; 707 public int mInitialRepetitions; 708 public int mRemainingRepetitions = 0; 709 public Date mStartDate; 710 public long mPeriod; 711 public String mMethodName; 712 public int[] mSchedulableMBeanArguments; 713 public String [] mSchedulableMBeanArgumentTypes; 714 715 public ScheduleInstance( 716 ObjectName pProvider, 717 ObjectName pTarget, 718 String pMethodName, 719 String [] pMethodArguments, 720 Date pStartDate, 721 int pRepetitions, 722 long pPeriod 723 ) 724 { 725 mProvider = pProvider; 726 mTarget = pTarget; 727 mInitialRepetitions = pRepetitions; 728 mStartDate = pStartDate; 729 mPeriod = pPeriod; 730 mMethodName = pMethodName; 731 mSchedulableMBeanArguments = new int[pMethodArguments.length]; 732 mSchedulableMBeanArgumentTypes = new String [pMethodArguments.length]; 733 for (int i = 0; i < pMethodArguments.length; i++) 734 { 735 String lToken = pMethodArguments[i]; 736 if (lToken.equals("ID")) 737 { 738 mSchedulableMBeanArguments[i] = ID; 739 mSchedulableMBeanArgumentTypes[i] = Integer .class.getName(); 740 } 741 else if (lToken.equals("NOTIFICATION")) 742 { 743 mSchedulableMBeanArguments[i] = NOTIFICATION; 744 mSchedulableMBeanArgumentTypes[i] = Notification .class.getName(); 745 } 746 else if (lToken.equals("NEXT_DATE")) 747 { 748 mSchedulableMBeanArguments[i] = NEXT_DATE; 749 mSchedulableMBeanArgumentTypes[i] = Date .class.getName(); 750 } 751 else if (lToken.equals("DATE")) 752 { 753 mSchedulableMBeanArguments[i] = DATE; 754 mSchedulableMBeanArgumentTypes[i] = Date .class.getName(); 755 } 756 else if (lToken.equals("REPETITIONS")) 757 { 758 mSchedulableMBeanArguments[i] = REPETITIONS; 759 mSchedulableMBeanArgumentTypes[i] = Long.TYPE.getName(); 760 } 761 else if (lToken.equals("SCHEDULER_NAME")) 762 { 763 mSchedulableMBeanArguments[i] = SCHEDULER_NAME; 764 mSchedulableMBeanArgumentTypes[i] = ObjectName .class.getName(); 765 } 766 else 767 { 768 mSchedulableMBeanArguments[i] = NULL; 769 mSchedulableMBeanArgumentTypes[i] = lToken; 771 } 772 } 773 mIdentification = sCounter.increment(); 774 } 775 776 781 public void start() throws JMException 782 { 783 Date lStartDate = null; 784 if (mStartDate.getTime() < new Date ().getTime() && mPeriod > 0) 786 { 787 long lNow = new Date ().getTime() + 100; 789 int lSkipRepeats = (int) ((lNow - mStartDate.getTime()) / mPeriod) + 1; 790 log.debug("Old start date: " + mStartDate + ", now: " + new Date (lNow) + ", Skip repeats: " + lSkipRepeats); 791 if (mInitialRepetitions > 0) 792 { 793 if (lSkipRepeats >= mInitialRepetitions) 795 { 796 log.warn("No repetitions left because start date is in the past and could " + 798 "not be reached by Initial Repetitions * Schedule Period"); 799 return; 800 } 801 else 802 { 803 mRemainingRepetitions = mInitialRepetitions - lSkipRepeats; 805 } 806 } 807 else 808 { 809 if (mInitialRepetitions == 0) 810 { 811 mRemainingRepetitions = 0; 812 } 813 else 814 { 815 mRemainingRepetitions = -1; 816 } 817 } 818 lStartDate = new Date (mStartDate.getTime() + (lSkipRepeats * mPeriod)); 819 } 820 else 821 { 822 lStartDate = mStartDate; 823 mRemainingRepetitions = mInitialRepetitions; 824 } 825 mNotificationID = mTimer.addNotification( 826 "Schedule", "Scheduler Notification", 827 new Integer (getID()), lStartDate, 829 new Long (mPeriod), 830 mRemainingRepetitions < 0 ? new Long (0) : new Long (mRemainingRepetitions), 831 Boolean.valueOf(mFixedRate) 832 ); 833 mListener = new MBeanListener(this); 834 mTimerEmitter.addNotificationListener( 835 mListener, 836 new IdNotificationFilter(mNotificationID), 837 null 839 ); 840 log.debug("start(), add Notification to Timer with ID: " + mNotificationID); 841 } 842 843 847 public void stop() 848 throws JMException 849 { 850 log.debug("stopSchedule(), notification id: " + mNotificationID); 851 mTimerEmitter.removeNotificationListener(mListener); 852 try 853 { 854 mTimer.removeNotification(mNotificationID); 855 } 856 catch (InstanceNotFoundException e) 857 { 858 log.trace(e); 859 } 860 } 861 862 public int getID() 863 { 864 return mIdentification; 865 } 866 867 public String toString() 868 { 869 return "Schedule target=" + getTargetString(); 870 } 871 872 public String getTargetString() 873 { 874 return mTarget + " " + mMethodName + "" + Arrays.asList(mSchedulableMBeanArgumentTypes); 875 } 876 877 } 878 } 879 | Popular Tags |