1 16 package scriptella.execution; 17 18 import scriptella.core.SystemException; 19 import scriptella.core.ThreadSafe; 20 21 import javax.management.MBeanServer ; 22 import javax.management.MalformedObjectNameException ; 23 import javax.management.ObjectName ; 24 import java.lang.management.ManagementFactory ; 25 import java.util.Date ; 26 import java.util.Set ; 27 import java.util.logging.Level ; 28 import java.util.logging.Logger ; 29 30 36 @ThreadSafe 37 public class JmxEtlManager implements JmxEtlManagerMBean { 38 private static Logger LOG = Logger.getLogger(JmxEtlManager.class.getName()); 39 private static final String MBEAN_NAME_PREFIX = "scriptella:type=etl"; 40 private static MBeanServer mbeanServer; 41 private MBeanServer server; 42 private EtlContext ctx; 43 private Thread etlThread; 44 private ObjectName name; 45 private Date started; 46 47 public JmxEtlManager(EtlContext ctx) { 48 this.ctx = ctx; 49 } 50 51 public synchronized long getExecutedStatementsCount() { 52 return ctx.getSession().getExecutedStatementsCount(); 53 } 54 55 public synchronized Date getStartDate() { 56 return started; 57 } 58 59 public synchronized double getThroughput() { 60 long cnt = getExecutedStatementsCount(); 61 if (cnt > 0 && started != null) { 62 double ti = System.currentTimeMillis() - started.getTime(); 63 return (1000 * cnt) / ti; 64 } 65 return 0; 66 } 67 68 74 public static void setMBeanServer(MBeanServer mbeanServer) { 75 JmxEtlManager.mbeanServer = mbeanServer; 76 } 77 78 81 public synchronized void register() { 82 if (name != null) { 83 throw new IllegalStateException ("MBean already registered"); 84 } 85 server = getMBeanServer(); 86 String url = ctx.getScriptFileURL().toString(); 87 boolean registered = false; 88 for (int i = 0; i < 1000; i++) { 89 if (name == null || server.isRegistered(name)) { 90 registered = true; 91 name = toObjectName(url, i); 92 } else { 93 registered = false; 94 break; 95 } 96 } 97 etlThread = Thread.currentThread(); 98 if (!registered) { 99 try { 100 server.registerMBean(this, name); 101 started = new Date (); 102 LOG.info("Registered JMX mbean: " + name); 103 } catch (Exception e) { 104 throw new SystemException("Unable to register mbean " + name, e); 105 } 106 } else { 107 throw new SystemException("Unable to register mbean for url " + url + ": too many equal tasks already registered"); 108 } 109 } 110 111 private static MBeanServer getMBeanServer() { 112 return mbeanServer == null ? ManagementFactory.getPlatformMBeanServer() : mbeanServer; 113 } 114 115 120 public static synchronized int cancelAll() { 121 Set <ObjectName > names = findEtlMBeans(); 122 MBeanServer srv = getMBeanServer(); 123 int cancelled = 0; 124 for (ObjectName objectName : names) { 125 try { 126 srv.invoke(objectName, "cancel", null, null); 127 cancelled++; 128 } catch (Exception e) { 129 LOG.log(Level.WARNING, "Cannot cancel ETL, MBean " + objectName, e); 130 } 131 } 132 return cancelled; 133 } 134 135 140 public static synchronized Set <ObjectName > findEtlMBeans() { 141 try { 142 return getMBeanServer().queryNames(new ObjectName (MBEAN_NAME_PREFIX + ",*"), null); 143 } catch (MalformedObjectNameException e) { 144 throw new IllegalStateException (e.getMessage(), e); 145 } 146 } 147 148 static ObjectName toObjectName(String url, int n) { 149 try { 150 return new ObjectName (MBEAN_NAME_PREFIX + ",url=" + ObjectName.quote(url) + (n > 0 ? ",n=" + n : "")); 151 } catch (MalformedObjectNameException e) { throw new IllegalStateException ("Cannot set MBean name", e); 153 } 154 155 } 156 157 161 public synchronized void unregister() { 162 if (name != null && server != null) { 163 try { 164 server.unregisterMBean(name); 165 } catch (Exception e) { 166 LOG.log(Level.WARNING, "Unable to unregister mbean " + name, e); 167 } 168 name = null; 169 } 170 } 171 172 public synchronized void cancel() { 173 if (etlThread != null && etlThread.isAlive() && !etlThread.isInterrupted()) { 174 etlThread.interrupt(); 175 etlThread = null; 176 } 177 } 178 } 179 | Popular Tags |