1 40 41 package com.sun.jmx.examples.scandir; 42 43 import static com.sun.jmx.examples.scandir.ScanManagerMXBean.ScanState.*; 44 import com.sun.jmx.examples.scandir.ScanManagerMXBean.ScanState; 45 import com.sun.jmx.examples.scandir.config.DirectoryScannerConfig; 46 import com.sun.jmx.examples.scandir.config.ScanManagerConfig; 47 import java.io.File ; 48 49 import java.io.IOException ; 50 import java.lang.management.ManagementFactory ; 51 import java.util.ArrayList ; 52 import java.util.Collections ; 53 import java.util.EnumSet ; 54 import java.util.HashMap ; 55 import java.util.Map ; 56 import java.util.Map.Entry; 57 import java.util.Timer ; 58 import java.util.TimerTask ; 59 import java.util.concurrent.BlockingQueue ; 60 import java.util.concurrent.ConcurrentHashMap ; 61 import java.util.concurrent.ConcurrentLinkedQueue ; 62 import java.util.concurrent.LinkedBlockingQueue ; 63 import java.util.concurrent.Semaphore ; 64 import java.util.concurrent.TimeUnit ; 65 import java.util.logging.Level ; 66 import java.util.logging.Logger ; 67 import javax.management.AttributeChangeNotification ; 68 import javax.management.InstanceNotFoundException ; 69 import javax.management.JMException ; 70 import javax.management.JMX ; 71 import javax.management.ListenerNotFoundException ; 72 import javax.management.MBeanNotificationInfo ; 73 import javax.management.MBeanRegistration ; 74 import javax.management.MBeanServer ; 75 import javax.management.MBeanServerConnection ; 76 import javax.management.MalformedObjectNameException ; 77 import javax.management.Notification ; 78 import javax.management.NotificationBroadcasterSupport ; 79 import javax.management.NotificationEmitter ; 80 import javax.management.NotificationFilter ; 81 import javax.management.NotificationListener ; 82 import javax.management.ObjectInstance ; 83 import javax.management.ObjectName ; 84 85 103 public class ScanManager implements ScanManagerMXBean, 104 NotificationEmitter , MBeanRegistration { 105 106 109 private static final Logger LOG = 110 Logger.getLogger(ScanManager.class.getName()); 111 112 115 public final static ObjectName SCAN_MANAGER_NAME = 116 makeSingletonName(ScanManagerMXBean.class); 117 118 122 private static long seqNumber=0; 123 124 128 private final NotificationBroadcasterSupport broadcaster; 129 130 135 private volatile MBeanServer mbeanServer; 136 137 142 private final BlockingQueue <Notification > pendingNotifs; 143 144 147 private volatile ScanState state = STOPPED; 148 149 152 private final Map <ObjectName ,DirectoryScannerMXBean> scanmap; 153 154 157 private final Map <ObjectName , ScanDirConfigMXBean> configmap; 158 159 private final ResultLogManager log; 161 162 167 private final Semaphore sequencer = new Semaphore (1); 168 169 private volatile ScanDirConfigMXBean config = null; 173 174 private static <K, V> Map <K, V> newConcurrentHashMap() { 177 return new ConcurrentHashMap <K, V>(); 178 } 179 180 private static <K, V> Map <K, V> newHashMap() { 183 return new HashMap <K, V>(); 184 } 185 186 197 public final static ObjectName makeSingletonName(Class clazz) { 198 try { 199 final Package p = clazz.getPackage(); 200 final String packageName = (p==null)?null:p.getName(); 201 final String className = clazz.getSimpleName(); 202 final String domain; 203 if (packageName == null || packageName.length()==0) { 204 domain = ScanDirAgent.class.getSimpleName(); 207 } else { 208 domain = packageName; 209 } 210 final ObjectName name = new ObjectName (domain,"type",className); 211 return name; 212 } catch (Exception x) { 213 final IllegalArgumentException iae = 214 new IllegalArgumentException (String.valueOf(clazz),x); 215 throw iae; 216 } 217 } 218 219 231 public static final ObjectName makeMBeanName(Class clazz, String name) { 232 try { 233 return ObjectName. 234 getInstance(makeSingletonName(clazz) 235 .toString()+",name="+name); 236 } catch (MalformedObjectNameException x) { 237 final IllegalArgumentException iae = 238 new IllegalArgumentException (String.valueOf(name),x); 239 throw iae; 240 } 241 } 242 243 249 public static final ObjectName makeDirectoryScannerName(String name) { 250 return makeMBeanName(DirectoryScannerMXBean.class,name); 251 } 252 253 259 public static final ObjectName makeScanDirConfigName(String name) { 260 return makeMBeanName(ScanDirConfigMXBean.class,name); 261 } 262 263 275 public static ScanManagerMXBean register(MBeanServerConnection mbs) 276 throws IOException , JMException { 277 final ObjectInstance moi = 278 mbs.createMBean(ScanManager.class.getName(),SCAN_MANAGER_NAME); 279 final ScanManagerMXBean proxy = 280 JMX.newMXBeanProxy(mbs,moi.getObjectName(), 281 ScanManagerMXBean.class,true); 282 return proxy; 283 } 284 285 294 public static ScanManagerMXBean 295 newSingletonProxy(MBeanServerConnection mbs) { 296 final ScanManagerMXBean proxy = 297 JMX.newMXBeanProxy(mbs,SCAN_MANAGER_NAME, 298 ScanManagerMXBean.class,true); 299 return proxy; 300 } 301 302 308 public static ScanManagerMXBean newSingletonProxy() { 309 return newSingletonProxy(ManagementFactory.getPlatformMBeanServer()); 310 } 311 312 322 public static ScanManagerMXBean register() 323 throws IOException , JMException { 324 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 325 return register(mbs); 326 } 327 328 331 public ScanManager() { 332 broadcaster = new NotificationBroadcasterSupport (); 333 pendingNotifs = new LinkedBlockingQueue <Notification >(100); 334 scanmap = newConcurrentHashMap(); 335 configmap = newConcurrentHashMap(); 336 log = new ResultLogManager(); 337 } 338 339 340 DirectoryScannerMXBean createDirectoryScanner(DirectoryScannerConfig config) { 342 return new DirectoryScanner(config,log); 343 } 344 345 private void applyConfiguration(ScanManagerConfig bean) 352 throws IOException , JMException { 353 if (bean == null) return; 354 if (!sequencer.tryAcquire()) { 355 throw new IllegalStateException ("Can't acquire lock"); 356 } 357 try { 358 unregisterScanners(); 359 final DirectoryScannerConfig[] scans = bean.getScanList(); 360 if (scans == null) return; 361 for (DirectoryScannerConfig scan : scans) { 362 addDirectoryScanner(scan); 363 } 364 log.setConfig(bean.getInitialResultLogConfig()); 365 } finally { 366 sequencer.release(); 367 } 368 } 369 370 public void applyConfiguration(boolean fromMemory) 372 throws IOException , JMException { 373 if (fromMemory == false) config.load(); 374 applyConfiguration(config.getConfiguration()); 375 } 376 377 public void applyCurrentResultLogConfig(boolean toMemory) 379 throws IOException , JMException { 380 final ScanManagerConfig bean = config.getConfiguration(); 381 bean.setInitialResultLogConfig(log.getConfig()); 382 config.setConfiguration(bean); 383 if (toMemory==false) config.save(); 384 } 385 386 public void setConfigurationMBean(ScanDirConfigMXBean config) { 388 this.config = config; 389 } 390 391 public ScanDirConfigMXBean getConfigurationMBean() { 393 return config; 394 } 395 396 private DirectoryScannerMXBean addDirectoryScanner( 402 DirectoryScannerConfig bean) 403 throws JMException { 404 try { 405 final DirectoryScannerMXBean scanner; 406 final ObjectName scanName; 407 synchronized (this) { 408 if (state != STOPPED && state != COMPLETED) 409 throw new IllegalStateException (state.toString()); 410 scanner = createDirectoryScanner(bean); 411 scanName = makeDirectoryScannerName(bean.getName()); 412 } 413 LOG.fine("server: "+mbeanServer); 414 LOG.fine("scanner: "+scanner); 415 LOG.fine("scanName: "+scanName); 416 final ObjectInstance moi = 417 mbeanServer.registerMBean(scanner,scanName); 418 final ObjectName moiName = moi.getObjectName(); 419 final DirectoryScannerMXBean proxy = 420 JMX.newMXBeanProxy(mbeanServer,moiName, 421 DirectoryScannerMXBean.class,true); 422 scanmap.put(moiName,proxy); 423 return proxy; 424 } catch (RuntimeException x) { 425 final String msg = "Operation failed: "+x; 426 if (LOG.isLoggable(Level.FINEST)) 427 LOG.log(Level.FINEST,msg,x); 428 else LOG.fine(msg); 429 throw x; 430 } catch (JMException x) { 431 final String msg = "Operation failed: "+x; 432 if (LOG.isLoggable(Level.FINEST)) 433 LOG.log(Level.FINEST,msg,x); 434 else LOG.fine(msg); 435 throw x; 436 } 437 } 438 439 public ScanDirConfigMXBean createOtherConfigurationMBean(String name, 441 String filename) 442 throws JMException { 443 final ScanDirConfig profile = new ScanDirConfig(filename); 444 final ObjectName profName = makeScanDirConfigName(name); 445 final ObjectInstance moi = mbeanServer.registerMBean(profile,profName); 446 final ScanDirConfigMXBean proxy = 447 JMX.newMXBeanProxy(mbeanServer,profName, 448 ScanDirConfigMXBean.class,true); 449 configmap.put(moi.getObjectName(),proxy); 450 return proxy; 451 } 452 453 454 public Map <String ,DirectoryScannerMXBean> getDirectoryScanners() { 456 final Map <String ,DirectoryScannerMXBean> proxyMap = newHashMap(); 457 for (Entry<ObjectName ,DirectoryScannerMXBean> item : scanmap.entrySet()){ 458 proxyMap.put(item.getKey().getKeyProperty("name"),item.getValue()); 459 } 460 return proxyMap; 461 } 462 463 467 474 private final static Map <String ,EnumSet <ScanState>> allowedStates; 475 static { 476 allowedStates = newHashMap(); 477 allowedStates.put("stop",EnumSet.allOf(ScanState.class)); 479 480 allowedStates.put("close",EnumSet.of(STOPPED,COMPLETED,CLOSED)); 482 483 allowedStates.put("schedule",EnumSet.of(STOPPED,COMPLETED)); 486 487 allowedStates.put("scan-running",EnumSet.of(SCHEDULED)); 490 491 allowedStates.put("scan-scheduled",EnumSet.of(RUNNING)); 495 496 allowedStates.put("scan-done",EnumSet.of(RUNNING)); 499 } 500 501 public ScanState getState() { 505 return state; 506 } 507 508 511 private void queueStateChangedNotification( 512 long sequence, 513 long time, 514 ScanState old, 515 ScanState current) { 516 final AttributeChangeNotification n = 517 new AttributeChangeNotification (SCAN_MANAGER_NAME,sequence,time, 518 "ScanManager State changed to "+current,"State", 519 ScanState.class.getName(),old.toString(),current.toString()); 520 try { 523 if (!pendingNotifs.offer(n,2,TimeUnit.SECONDS)) { 524 LOG.fine("Can't queue Notification: "+n); 525 } 526 } catch (InterruptedException x) { 527 LOG.fine("Can't queue Notification: "+x); 528 } 529 } 530 531 534 private void sendQueuedNotifications() { 535 Notification n; 536 while ((n = pendingNotifs.poll()) != null) { 537 broadcaster.sendNotification(n); 538 } 539 } 540 541 547 private ScanState switchState(ScanState desired,String forOperation) { 548 return switchState(desired,allowedStates.get(forOperation)); 549 } 550 551 557 private ScanState switchState(ScanState desired,EnumSet <ScanState> allowed) { 558 final ScanState old; 559 final long timestamp; 560 final long sequence; 561 synchronized(this) { 562 old = state; 563 if (!allowed.contains(state)) 564 throw new IllegalStateException (state.toString()); 565 state = desired; 566 timestamp = System.currentTimeMillis(); 567 sequence = getNextSeqNumber(); 568 } 569 LOG.fine("switched state: "+old+" -> "+desired); 570 if (old != desired) 571 queueStateChangedNotification(sequence,timestamp,old,desired); 572 return old; 573 } 574 575 576 581 private Timer timer = null; 585 586 public void schedule(long delay, long interval) { 588 if (!sequencer.tryAcquire()) { 589 throw new IllegalStateException ("Can't acquire lock"); 590 } 591 try { 592 LOG.fine("scheduling new task: state="+state); 593 final ScanState old = switchState(SCHEDULED,"schedule"); 594 final boolean scheduled = 595 scheduleSession(new SessionTask(interval),delay); 596 if (scheduled) 597 LOG.fine("new task scheduled: state="+state); 598 } finally { 599 sequencer.release(); 600 } 601 sendQueuedNotifications(); 602 } 603 604 private synchronized boolean scheduleSession(SessionTask task, long delay) { 612 if (state == STOPPED) return false; 613 if (timer == null) timer = new Timer ("ScanManager"); 614 tasklist.add(task); 615 timer.schedule(task,delay); 616 return true; 617 } 618 619 623 public void start() throws IOException , InstanceNotFoundException { 625 schedule(0,0); 626 } 627 628 635 public void stop() { 637 if (!sequencer.tryAcquire()) 638 throw new IllegalStateException ("Can't acquire lock"); 639 int errcount = 0; 640 final StringBuilder b = new StringBuilder (); 641 642 try { 643 switchState(STOPPED,"stop"); 644 645 errcount += cancelSessionTasks(b); 646 errcount += stopDirectoryScanners(b); 647 } finally { 648 sequencer.release(); 649 } 650 651 sendQueuedNotifications(); 652 if (errcount > 0) { 653 b.insert(0,"stop partially failed with "+errcount+" error(s):"); 654 throw new RuntimeException (b.toString()); 655 } 656 } 657 658 public void close() { 660 switchState(CLOSED,"close"); 661 sendQueuedNotifications(); 662 } 663 664 private void append(StringBuilder b,String prefix,Throwable t) { 667 final String first = (prefix==null)?"\n":"\n"+prefix; 668 b.append(first).append(String.valueOf(t)); 669 Throwable cause = t; 670 while ((cause = cause.getCause())!=null) { 671 b.append(first).append("Caused by:").append(first); 672 b.append('\t').append(String.valueOf(cause)); 673 } 674 } 675 676 private int cancelSessionTasks(StringBuilder b) { 679 int errcount = 0; 680 for (SessionTask task : tasklist) { 683 try { 684 task.cancel(); 685 tasklist.remove(task); 686 } catch (Exception ex) { 687 errcount++; 688 append(b,"\t",ex); 689 } 690 } 691 return errcount; 692 } 693 694 private int stopDirectoryScanners(StringBuilder b) { 697 int errcount = 0; 698 for (DirectoryScannerMXBean s : scanmap.values()) { 701 try { 702 s.stop(); 703 } catch (Exception ex) { 704 errcount++; 705 append(b,"\t",ex); 706 } 707 } 708 return errcount; 709 } 710 711 712 717 private void scanAllDirectories() 718 throws IOException , InstanceNotFoundException { 719 720 int errcount = 0; 721 final StringBuilder b = new StringBuilder (); 722 for (ObjectName key : scanmap.keySet()) { 723 final DirectoryScannerMXBean s = scanmap.get(key); 724 try { 725 if (state == STOPPED) return; 726 s.scan(); 727 } catch (Exception ex) { 728 LOG.log(Level.FINE,key + " failed to scan: "+ex,ex); 729 errcount++; 730 append(b,"\t",ex); 731 } 732 } 733 if (errcount > 0) { 734 b.insert(0,"scan partially performed with "+errcount+" error(s):"); 735 throw new RuntimeException (b.toString()); 736 } 737 } 738 739 private final ConcurrentLinkedQueue <SessionTask> tasklist = 744 new ConcurrentLinkedQueue <SessionTask>(); 745 746 private volatile static long taskcount = 0; 750 751 762 private class SessionTask extends TimerTask { 763 764 769 final long delayBeforeNext; 770 771 774 final long taskid; 775 776 779 volatile boolean cancelled=false; 780 781 784 SessionTask(long scheduleNext) { 785 delayBeforeNext = scheduleNext; 786 taskid = taskcount++; 787 } 788 789 803 private boolean notifyStateChange(ScanState newState,String condition) { 804 synchronized (ScanManager.this) { 805 if (state == STOPPED || state == CLOSED) return false; 806 switchState(newState,condition); 807 } 808 sendQueuedNotifications(); 809 return true; 810 } 811 812 public boolean cancel() { 814 cancelled=true; 815 return super.cancel(); 816 } 817 818 822 private boolean execute() { 823 final String tag = "Scheduled session["+taskid+"]"; 824 try { 825 if (cancelled) { 826 LOG.finer(tag+" cancelled: done"); 827 return false; 828 } 829 if (!notifyStateChange(RUNNING,"scan-running")) { 830 LOG.finer(tag+" stopped: done"); 831 return false; 832 } 833 scanAllDirectories(); 834 } catch (Exception x) { 835 if (LOG.isLoggable(Level.FINEST)) { 836 LOG.log(Level.FINEST, 837 tag+" failed to scan: "+x,x); 838 } else if (LOG.isLoggable(Level.FINE)) { 839 LOG.fine(tag+" failed to scan: "+x); 840 } 841 } 842 return true; 843 } 844 845 848 private boolean scheduleNext() { 849 final String tag = "Scheduled session["+taskid+"]"; 850 851 try { 853 LOG.finer(tag+": scheduling next session for "+ delayBeforeNext + "ms"); 854 if (cancelled || !notifyStateChange(SCHEDULED,"scan-scheduled")) { 855 LOG.finer(tag+" stopped: do not reschedule"); 856 return false; 857 } 858 final SessionTask nextTask = new SessionTask(delayBeforeNext); 859 if (!scheduleSession(nextTask,delayBeforeNext)) return false; 860 LOG.finer(tag+": next session successfully scheduled"); 861 } catch (Exception x) { 862 if (LOG.isLoggable(Level.FINEST)) { 863 LOG.log(Level.FINEST,tag+ 864 " failed to schedule next session: "+x,x); 865 } else if (LOG.isLoggable(Level.FINE)) { 866 LOG.fine(tag+" failed to schedule next session: "+x); 867 } 868 } 869 return true; 870 } 871 872 873 877 public void run() { 878 final String tag = "Scheduled session["+taskid+"]"; 879 LOG.entering(SessionTask.class.getName(),"run"); 880 LOG.finer(tag+" starting..."); 881 try { 882 if (execute()==false) return; 883 884 LOG.finer(tag+" terminating - state is "+state+ 885 ((delayBeforeNext >0)?(" next session is due in "+delayBeforeNext+" ms."): 886 " no additional session scheduled")); 887 888 if (delayBeforeNext <= 0) { 891 if (!notifyStateChange(COMPLETED,"scan-done")) 892 LOG.finer(tag+" stopped: done"); 893 else 894 LOG.finer(tag+" completed: done"); 895 return; 896 } 897 898 scheduleNext(); 900 901 } finally { 902 tasklist.remove(this); 903 LOG.finer(tag+" finished..."); 904 LOG.exiting(SessionTask.class.getName(),"run"); 905 } 906 } 907 } 908 909 912 917 921 public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws IllegalArgumentException { 922 broadcaster.addNotificationListener(listener, filter, handback); 923 } 924 925 926 930 public MBeanNotificationInfo [] getNotificationInfo() { 931 return new MBeanNotificationInfo [] { 932 new MBeanNotificationInfo (new String [] { 933 AttributeChangeNotification.ATTRIBUTE_CHANGE}, 934 AttributeChangeNotification .class.getName(), 935 "Emitted when the State attribute changes") 936 }; 937 } 938 939 943 public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException { 944 broadcaster.removeNotificationListener(listener); 945 } 946 947 951 public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException { 952 broadcaster.removeNotificationListener(listener, filter, handback); 953 } 954 955 961 static synchronized long getNextSeqNumber() { 962 return seqNumber++; 963 } 964 965 969 974 1001 public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { 1002 if (name != null) { 1003 if (!SCAN_MANAGER_NAME.equals(name)) 1004 throw new IllegalArgumentException (String.valueOf(name)); 1005 } 1006 mbeanServer = server; 1007 return SCAN_MANAGER_NAME; 1008 } 1009 1010 static String getDefaultConfigurationFileName() { 1012 final String user = System.getProperty("user.home"); 1015 final String defconf = user+File.separator+"jmx-scandir.xml"; 1016 return defconf; 1017 } 1018 1019 1041 public void postRegister(Boolean registrationDone) { 1042 if (!registrationDone) return; 1043 Exception test=null; 1044 try { 1045 mbeanServer.registerMBean(log, 1046 ResultLogManager.RESULT_LOG_MANAGER_NAME); 1047 final String defconf = getDefaultConfigurationFileName(); 1048 final String conf = System.getProperty("scandir.config.file",defconf); 1049 final String confname = ScanDirConfig.guessConfigName(conf,defconf); 1050 final ObjectName defaultProfileName = 1051 makeMBeanName(ScanDirConfigMXBean.class,confname); 1052 if (!mbeanServer.isRegistered(defaultProfileName)) 1053 mbeanServer.registerMBean(new ScanDirConfig(conf), 1054 defaultProfileName); 1055 config = JMX.newMXBeanProxy(mbeanServer,defaultProfileName, 1056 ScanDirConfigMXBean.class,true); 1057 configmap.put(defaultProfileName,config); 1058 } catch (Exception x) { 1059 LOG.config("Failed to populate MBeanServer: "+x); 1060 close(); 1061 return; 1062 } 1063 try { 1064 config.load(); 1065 } catch (Exception x) { 1066 LOG.finest("No config to load: "+x); 1067 test = x; 1068 } 1069 if (test == null) { 1070 try { 1071 applyConfiguration(config.getConfiguration()); 1072 } catch (Exception x) { 1073 if (LOG.isLoggable(Level.FINEST)) 1074 LOG.log(Level.FINEST,"Failed to apply config: "+x,x); 1075 LOG.config("Failed to apply config: "+x); 1076 } 1077 } 1078 } 1079 1080 private void unregisterScanners() throws JMException { 1082 unregisterMBeans(scanmap); 1083 } 1084 1085 private void unregisterConfigs() throws JMException { 1087 unregisterMBeans(configmap); 1088 } 1089 1090 private void unregisterMBeans(Map <ObjectName ,?> map) throws JMException { 1092 for (ObjectName key : map.keySet()) { 1093 if (mbeanServer.isRegistered(key)) 1094 mbeanServer.unregisterMBean(key); 1095 map.remove(key); 1096 } 1097 } 1098 1099 private void unregisterResultLogManager() throws JMException { 1101 final ObjectName name = ResultLogManager.RESULT_LOG_MANAGER_NAME; 1102 if (mbeanServer.isRegistered(name)) { 1103 mbeanServer.unregisterMBean(name); 1104 } 1105 } 1106 1107 1118 public void preDeregister() throws Exception { 1119 try { 1120 close(); 1121 if (!sequencer.tryAcquire()) 1122 throw new IllegalStateException ("can't acquire lock"); 1123 try { 1124 unregisterScanners(); 1125 unregisterConfigs(); 1126 unregisterResultLogManager(); 1127 } finally { 1128 sequencer.release(); 1129 } 1130 } catch (Exception x) { 1131 LOG.log(Level.FINEST,"Failed to unregister: "+x,x); 1132 throw x; 1133 } 1134 } 1135 1136 1141 public synchronized void postDeregister() { 1142 if (timer != null) { 1143 try { 1144 timer.cancel(); 1145 } catch (Exception x) { 1146 if (LOG.isLoggable(Level.FINEST)) 1147 LOG.log(Level.FINEST,"Failed to cancel timer",x); 1148 else if (LOG.isLoggable(Level.FINE)) 1149 LOG.fine("Failed to cancel timer: "+x); 1150 } finally { 1151 timer = null; 1152 } 1153 } 1154 } 1155 1156 1160} 1161 1162 | Popular Tags |