1 22 package org.jboss.monitor.services; 23 24 import java.util.LinkedList ; 25 import java.util.List ; 26 27 import javax.management.MBeanServer ; 28 import javax.management.Notification ; 29 import javax.management.ObjectName ; 30 31 import org.apache.bsf.BSFException; 32 import org.apache.bsf.BSFManager; 33 import org.jboss.logging.Logger; 34 import org.jboss.monitor.alarm.AlarmManager; 35 import org.jboss.system.ListenerServiceMBeanSupport; 36 37 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 38 39 66 public class ScriptingListener extends ListenerServiceMBeanSupport 67 implements ScriptingListenerMBean 68 { 69 71 72 private String script; 73 74 75 private String language; 76 77 78 private boolean dynamicSubscriptions; 79 80 81 private ObjectName targetListener; 82 83 84 private SynchronizedLong notificationsReceived; 85 86 87 private SynchronizedLong notificationsProcessed; 88 89 90 private SynchronizedLong totalProcessingTime; 91 92 93 private BSFManager manager; 94 95 96 private List queue; 97 98 99 private boolean stopRequested; 100 101 102 private Thread processorThread; 103 104 105 private AlarmManager alm = new AlarmManager(this); 106 107 109 112 public ScriptingListener() 113 { 114 queue = new LinkedList (); 115 116 notificationsReceived = new SynchronizedLong(0); 117 notificationsProcessed = new SynchronizedLong(0); 118 totalProcessingTime = new SynchronizedLong(0); 119 } 120 121 123 126 public void setScript(String script) 127 { 128 this.script = script; 129 } 130 131 134 public String getScript() 135 { 136 return script; 137 } 138 139 142 public void setScriptLanguage(String language) 143 { 144 this.language = language; 145 } 146 147 150 public String getScriptLanguage() 151 { 152 return language; 153 } 154 155 158 public void setDynamicSubscriptions(boolean dynamicSubscriptions) 159 { 160 this.dynamicSubscriptions = dynamicSubscriptions; 161 } 162 163 166 public boolean getDynamicSubscriptions() 167 { 168 return this.dynamicSubscriptions; 169 } 170 171 174 public long getNotificationsReceived() 175 { 176 return notificationsReceived.get(); 177 } 178 179 182 public long getNotificationsProcessed() 183 { 184 return notificationsProcessed.get(); 185 } 186 187 190 public long getTotalProcessingTime() 191 { 192 return totalProcessingTime.get(); 193 } 194 195 198 public long getAverageProcessingTime() 199 { 200 long processed = notificationsProcessed.get(); 201 202 return (processed == 0) ? 0 : totalProcessingTime.get() / processed; 203 } 204 205 207 210 public void startService() throws Exception 211 { 212 log.debug("Initializing BSFManager for language '" + language + "'"); 213 214 BSFManager.registerScriptingEngine( 216 "groovy", 217 "org.codehaus.groovy.bsf.GroovyEngine", 218 new String [] { "groovy", "gy" } 219 ); 220 221 manager = new BSFManager(); 223 224 manager.setClassLoader(Thread.currentThread().getContextClassLoader()); 225 manager.loadScriptingEngine(language); 226 manager.declareBean("log", log, Logger.class); 227 manager.declareBean("server", server, MBeanServer .class); 228 manager.declareBean("manager", alm, AlarmManager.class); 229 230 Notification testNotification = new Notification ("jboss.script.test", serviceName, 0); 232 manager.declareBean("notification", testNotification, Notification .class); 233 manager.declareBean("handback", "", Object .class); 234 235 manager.exec(language, "in-memory-script", 0, 0, script); 236 237 processorThread = new Thread (new ScriptProcessor(), "ScriptProcessor[" + serviceName + "]"); 239 processorThread.start(); 240 241 super.subscribe(dynamicSubscriptions); 243 } 244 245 248 public void stopService() throws Exception 249 { 250 super.unsubscribe(); 252 253 log.debug("Stopping " + processorThread.getName()); 254 255 stopRequested = true; 257 258 synchronized (queue) 260 { 261 queue.notify(); 262 } 263 264 try 265 { 266 processorThread.join(5000); 268 } 269 catch (InterruptedException e) 270 { 271 Thread.currentThread().interrupt(); 273 } 274 275 queue.clear(); 277 manager.terminate(); 278 } 279 280 282 285 public void handleNotification2(Notification notification, Object handback) 286 { 287 notificationsReceived.increment(); 289 290 synchronized (queue) 293 { 294 queue.add(new QueueEntry(notification, handback)); 295 296 queue.notify(); 298 } 299 } 300 301 303 306 private static class QueueEntry 307 { 308 public Notification notification; 309 public Object handback; 310 311 public QueueEntry(Notification notification, Object handback) 312 { 313 this.notification = notification; 314 this.handback = handback; 315 } 316 } 317 318 321 private class ScriptProcessor implements Runnable 322 { 323 public void run() 324 { 325 String name = Thread.currentThread().getName(); 326 log.debug("Started thread: " + name); 327 328 while (!stopRequested) 329 { 330 QueueEntry entry; 331 332 synchronized (queue) 333 { 334 while (queue.isEmpty() && !stopRequested) 335 { 336 try 337 { 338 queue.wait(); 339 } 340 catch (InterruptedException e) 341 { 342 } 344 } 345 346 if (stopRequested) 347 { 348 break; 350 } 351 else 352 { 353 entry = (QueueEntry)queue.remove(0); 355 } 356 } 357 358 long start = System.currentTimeMillis(); 360 361 try 363 { 364 manager.declareBean("notification", entry.notification, Notification .class); 365 manager.declareBean("handback", entry.handback == null ? "" : entry.handback, Object .class); 366 367 manager.exec(language, "in-memory-script", 0, 0, script); 368 } 369 catch (BSFException e) 370 { 371 log.warn("Caught exception", e); 372 } 373 374 long stop = System.currentTimeMillis(); 376 totalProcessingTime.add(stop - start); 377 notificationsProcessed.increment(); 378 } 379 log.debug("Stopped thread: " + name); 380 } 381 } 382 383 } 384 | Popular Tags |