1 4 package org.oddjob.jmx.client; 5 6 import java.util.LinkedList ; 7 8 import org.apache.log4j.Logger; 9 10 17 public class NotificationProcessor extends Thread { 18 private static final Logger logger = Logger.getLogger(NotificationProcessor.class); 19 20 private final LinkedList notifications = new LinkedList (); 21 private final LinkedList resyncChecks = new LinkedList (); 22 23 synchronized public void enqueue(Runnable o) { 24 if (isInterrupted()) { 25 return; 27 } 28 notifications.add(o); 29 notify(); 30 } 31 32 class Pair { 33 final ClientNode clientNode; 34 final long time; 35 Pair(ClientNode clientNode) { 36 this.clientNode = clientNode; 37 this.time = System.currentTimeMillis(); 38 } 39 } 40 41 synchronized public void check(ClientNode clientNode) { 42 if (isInterrupted()) { 43 return; 45 } 46 logger.debug("Resync check added for [" + clientNode.toString() + "]"); 47 resyncChecks.add(new Pair(clientNode)); 48 notifyAll(); 49 } 50 51 public void run() { 52 while (!isInterrupted()) { 53 Runnable notification = null; 54 ClientNode resyncCheck = null; 55 int backlog = 0; 56 int checks = 0; 57 synchronized (this) { 58 backlog = notifications.size(); 59 if (backlog > 0) { 60 notification = (Runnable ) notifications.removeFirst(); 61 --backlog; 62 } else { 63 long sleep = 0; 64 checks = resyncChecks.size(); 65 if (checks > 0) { 66 Pair pair = (Pair) resyncChecks.getFirst(); 67 sleep = pair.time - System.currentTimeMillis() + 5000; 68 if (sleep < 1) { 69 pair = (Pair) resyncChecks.removeFirst(); 70 resyncCheck = pair.clientNode; 71 --checks; 72 } 73 } 74 if (resyncCheck == null) { 75 try { 76 wait(sleep); 77 } 78 catch (InterruptedException e) { 79 notifications.clear(); 80 resyncChecks.clear(); 81 return; 82 } 83 } 84 } 85 } 86 logger.debug("Processing, backlog is [" + backlog 87 + "], outstanding checks are [" + checks + "]"); 88 if (notification != null) { 89 try { 90 notification.run(); 91 } catch (Throwable t) { 92 logger.error("Failed processing:", t); 93 } 94 } 95 else if (resyncCheck != null) { 96 try { 97 resyncCheck.resync(); 98 } catch (Throwable t) { 99 logger.error("Failed check:", t); 100 } 101 } 102 } 103 logger.debug("Stopping."); 104 } 105 106 synchronized public int size() { 107 return notifications.size(); 108 } 109 110 } 111 | Popular Tags |