KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > oddjob > jmx > client > NotificationProcessor


1 /*
2  * Copyright © 2004, Rob Gordon.
3  */

4 package org.oddjob.jmx.client;
5
6 import java.util.LinkedList JavaDoc;
7
8 import org.apache.log4j.Logger;
9
10 /**
11  * Process notifications from the server. Additionally this will asynchronously
12  * perform a resync check. This is because it appears possible to
13  * occasionally miss the first few notifications when creating a new node.
14  *
15  * @author Rob Gordon.
16  */

17 public class NotificationProcessor extends Thread JavaDoc {
18     private static final Logger logger = Logger.getLogger(NotificationProcessor.class);
19     
20     private final LinkedList JavaDoc notifications = new LinkedList JavaDoc();
21     private final LinkedList JavaDoc resyncChecks = new LinkedList JavaDoc();
22
23     synchronized public void enqueue(Runnable JavaDoc o) {
24         if (isInterrupted()) {
25             // late notifications
26
return;
27         }
28         notifications.add(o);
29         notify();
30     }
31
32     class Pair {
33         final ClientNode clientNode;
34         final long time;
35         Pair(ClientNode clientNode) {
36             this.clientNode = clientNode;
37             this.time = System.currentTimeMillis();
38         }
39     }
40     
41     synchronized public void check(ClientNode clientNode) {
42         if (isInterrupted()) {
43             // late notifications
44
return;
45         }
46         logger.debug("Resync check added for [" + clientNode.toString() + "]");
47         resyncChecks.add(new Pair(clientNode));
48         notifyAll();
49     }
50     
51     public void run() {
52         while (!isInterrupted()) {
53             Runnable JavaDoc notification = null;
54             ClientNode resyncCheck = null;
55             int backlog = 0;
56             int checks = 0;
57             synchronized (this) {
58                 backlog = notifications.size();
59                 if (backlog > 0) {
60                     notification = (Runnable JavaDoc) notifications.removeFirst();
61                     --backlog;
62                 } else {
63                     long sleep = 0;
64                     checks = resyncChecks.size();
65                     if (checks > 0) {
66                         Pair pair = (Pair) resyncChecks.getFirst();
67                         sleep = pair.time - System.currentTimeMillis() + 5000;
68                         if (sleep < 1) {
69                             pair = (Pair) resyncChecks.removeFirst();
70                             resyncCheck = pair.clientNode;
71                             --checks;
72                         }
73                     }
74                     if (resyncCheck == null) {
75                         try {
76                             wait(sleep);
77                         }
78                         catch (InterruptedException JavaDoc e) {
79                             notifications.clear();
80                             resyncChecks.clear();
81                             return;
82                         }
83                     }
84                 }
85             }
86             logger.debug("Processing, backlog is [" + backlog
87                     + "], outstanding checks are [" + checks + "]");
88             if (notification != null) {
89                 try {
90                     notification.run();
91                 } catch (Throwable JavaDoc t) {
92                     logger.error("Failed processing:", t);
93                 }
94             }
95             else if (resyncCheck != null) {
96                 try {
97                     resyncCheck.resync();
98                 } catch (Throwable JavaDoc t) {
99                     logger.error("Failed check:", t);
100                 }
101             }
102         }
103         logger.debug("Stopping.");
104     }
105     
106     synchronized public int size() {
107         return notifications.size();
108     }
109     
110 }
111
Popular Tags