KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > process > LinkedJavaProcessPollingAgent


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.process;
6
7 import java.io.BufferedReader JavaDoc;
8 import java.io.IOException JavaDoc;
9 import java.io.InputStreamReader JavaDoc;
10 import java.io.PrintWriter JavaDoc;
11 import java.net.ServerSocket JavaDoc;
12 import java.net.Socket JavaDoc;
13 import java.net.SocketException JavaDoc;
14 import java.util.ArrayList JavaDoc;
15 import java.util.Arrays JavaDoc;
16 import java.util.Date JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.List JavaDoc;
19
20 /**
21  * Creates a connection between a parent java process and it's child process.
22  * <p>
23  * SCENARIO: When a parent process creates a child the parent may monitor the child using various hooks. If the parent
24  * process itself dies unexpectedly (kill -9) the child process will remain alive unaware of it's parents fate.
25  * <p>
26  * This class is able to create a server thread on the parent and a watchdog thread on the child which periodically
27  * pages it's parent to make sure it's still alive. If the parent's heartbeat flatlines, the child's watchdog thread
28  * will call <tt>System.exit(0)</tt>.
29  */

30 public final class LinkedJavaProcessPollingAgent {
31
32   private static final int NORMAL_HEARTBEAT_INTERVAL = 15 * 1000;
33
34   private static final String JavaDoc HEARTBEAT = "HEARTBEAT";
35   private static final String JavaDoc SHUTDOWN = "SHUTDOWN";
36   private static final String JavaDoc ARE_YOU_ALIVE = "ARE_YOU_ALIVE";
37
38   private static final int MAX_HEARTBEAT_DELAY = 2 * NORMAL_HEARTBEAT_INTERVAL;
39   private static final int EXIT_CODE = 42;
40   private static HeartbeatServer server = null;
41   private static PingThread client = null;
42
43   public static synchronized void startHeartBeatServer() {
44     if (server == null) {
45       server = new HeartbeatServer();
46       server.start();
47     }
48   }
49
50   public static synchronized boolean isServerRunning() {
51     if (server == null) { return false; }
52     return server.isRunning();
53   }
54
55   /**
56    * Creates a server thread in the parent process posting a periodic heartbeat.
57    *
58    * @return server port - must be passed to {@link startClientWatchdogService()}
59    */

60   public static synchronized int getChildProcessHeartbeatServerPort() {
61     if (server == null) throw new IllegalStateException JavaDoc("Heartbeat Server has not started!");
62     return server.getPort();
63   }
64
65   /**
66    * Creates a watchdog service thread in the child process which receives a heartbeart from the parent process.
67    *
68    * @param pingPort - this must come from {@link getChildProcessHeartbeatServerPort()}
69    * @param childClass - used for debugging
70    * @param honorShutdownMsg - false, will ignore the destroy() method and keep this client alive after the shutdown
71    * message is broadcast
72    */

73   public static synchronized void startClientWatchdogService(int pingPort, String JavaDoc childClass, boolean honorShutdownMsg) {
74     if (client == null) {
75       client = new PingThread(pingPort, childClass, honorShutdownMsg);
76       client.start();
77       System.err.println("Child-process watchdog for class " + childClass + " monitoring server on port: " + pingPort);
78     }
79   }
80
81   public static synchronized void startClientWatchdogService(int pingPort, String JavaDoc childClass) {
82     startClientWatchdogService(pingPort, childClass, false);
83   }
84
85   /**
86    * Shutdown heartbeat server and send a kill signal to child processes
87    */

88   public static synchronized void shutdown() {
89     if (server != null) {
90       server.shutdown();
91       server = null;
92     }
93   }
94
95   public static synchronized boolean isAnyAppServerAlive() {
96     if (server == null) return false;
97     return server.isAnyAppServerAlive();
98   }
99
100   private static synchronized void log(String JavaDoc msg) {
101     System.out.println("LJP: [" + new Date JavaDoc() + "] " + msg);
102   }
103
104   static void reallySleep(long millis) {
105     try {
106       long millisLeft = millis;
107       while (millisLeft > 0) {
108         long start = System.currentTimeMillis();
109         Thread.sleep(millisLeft);
110         millisLeft -= System.currentTimeMillis() - start;
111       }
112     } catch (InterruptedException JavaDoc ie) {
113       throw new AssertionError JavaDoc(ie);
114     }
115   }
116
117   private static class PingThread extends Thread JavaDoc {
118     private final int pingPort;
119     private final String JavaDoc forClass;
120     private boolean honorShutdownMsg;
121     private BufferedReader JavaDoc in;
122     private PrintWriter JavaDoc out;
123
124     public PingThread(int port, String JavaDoc forClass, boolean honorShutdownMsg) {
125       this(port, forClass);
126       this.honorShutdownMsg = honorShutdownMsg;
127     }
128
129     public PingThread(int port, String JavaDoc forClass) {
130       if (!(port > 0)) throw new RuntimeException JavaDoc("Port not > 0");
131       if (forClass.trim().length() == 0) throw new RuntimeException JavaDoc("blank argument");
132
133       this.pingPort = port;
134       this.forClass = forClass;
135
136       this.setDaemon(true);
137     }
138
139     public void run() {
140       int port = -1;
141       Socket JavaDoc toServer = null;
142       try {
143         toServer = new Socket JavaDoc("localhost", this.pingPort);
144         toServer.setSoTimeout(MAX_HEARTBEAT_DELAY);
145
146         port = toServer.getLocalPort();
147
148         in = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(toServer.getInputStream()));
149         out = new PrintWriter JavaDoc(toServer.getOutputStream(), true);
150
151         while (true) {
152           long start = System.currentTimeMillis();
153
154           String JavaDoc data = in.readLine();
155           if (HEARTBEAT.equals(data)) {
156             log("Got heartbeat for main class " + this.forClass);
157           } else if (SHUTDOWN.equals(data)) {
158             if (!honorShutdownMsg) continue;
159             log("Client received shutdown message from server. Shutting Down...");
160             System.exit(0);
161           } else if (ARE_YOU_ALIVE.equals(data)) {
162             out.println(forClass);
163             out.flush();
164           } else {
165             throw new Exception JavaDoc("Doesn't recognize data: " + data);
166           }
167
168           long elapsed = System.currentTimeMillis() - start;
169           if (elapsed > MAX_HEARTBEAT_DELAY) { throw new Exception JavaDoc("Client took too long to response."); }
170         }
171       } catch (Exception JavaDoc e) {
172         log(e.getClass() + ": " + Arrays.asList(e.getStackTrace()));
173         log("Didn't get heartbeat for at least " + MAX_HEARTBEAT_DELAY + " milliseconds. Killing self (port " + port
174             + ").");
175       } finally {
176         log("Ping thread exiting port (" + port + ")");
177         if (toServer != null) {
178           try {
179             toServer.close();
180           } catch (IOException JavaDoc e) {
181             throw new RuntimeException JavaDoc(e);
182           }
183         }
184         System.exit(EXIT_CODE);
185       }
186     }
187   }
188
189   private static class HeartbeatServer extends Thread JavaDoc {
190     private int port;
191     private List JavaDoc heartBeatThreads = new ArrayList JavaDoc();
192     private ServerSocket JavaDoc serverSocket = null;
193     private boolean running = false;
194     private volatile boolean isStarting = false;
195
196     public HeartbeatServer() {
197       this.port = -1;
198       this.setDaemon(true);
199     }
200
201     public synchronized boolean isAnyAppServerAlive() {
202       boolean foundAlive = false;
203       synchronized (heartBeatThreads) {
204         for (Iterator JavaDoc it = heartBeatThreads.iterator(); it.hasNext();) {
205           HeartbeatThread hb = (HeartbeatThread) it.next();
206           boolean aliveStatus = hb.isAppServerAlive();
207           log("pinging: " + hb.port + ", alive? = " + aliveStatus);
208           foundAlive = foundAlive || aliveStatus;
209         }
210       }
211       return foundAlive;
212     }
213
214     public synchronized int getPort() {
215       while (port == -1) {
216         try {
217           this.wait(5000);
218         } catch (InterruptedException JavaDoc e) {
219           throw new RuntimeException JavaDoc("Server might have not started yet", e);
220         }
221       }
222       return port;
223     }
224
225     public synchronized boolean isRunning() {
226       while (isStarting) {
227         try {
228           this.wait();
229         } catch (InterruptedException JavaDoc e) {
230           throw new RuntimeException JavaDoc(e);
231         }
232       }
233       return running;
234     }
235
236     public synchronized void setRunning(boolean status) {
237       running = status;
238     }
239
240     private synchronized void shutdown() {
241       setRunning(false);
242
243       if (serverSocket != null) {
244         try {
245           serverSocket.close(); // this effectively interrupts the thread and force it to exit
246
} catch (IOException JavaDoc e) {
247           throw new RuntimeException JavaDoc(e);
248         }
249       }
250
251       synchronized (heartBeatThreads) {
252         HeartbeatThread ht;
253         for (Iterator JavaDoc i = heartBeatThreads.iterator(); i.hasNext();) {
254           ht = (HeartbeatThread) i.next();
255           ht.sendKillSignal();
256         }
257       }
258     }
259
260     public void run() {
261
262       try {
263         isStarting = true;
264         synchronized (this) {
265           serverSocket = new ServerSocket JavaDoc(0);
266           this.port = serverSocket.getLocalPort();
267           setRunning(true);
268           isStarting = false;
269           this.notifyAll();
270         }
271
272         System.err.println("Child-process heartbeat server started on port: " + port);
273
274         while (true) {
275           Socket JavaDoc sock = serverSocket.accept();
276           log("Got heartbeat connection from client; starting heartbeat.");
277           synchronized (heartBeatThreads) {
278             HeartbeatThread hbt = new HeartbeatThread(sock);
279             heartBeatThreads.add(hbt);
280             hbt.start();
281           }
282         }
283       } catch (Exception JavaDoc e) {
284         if (!running) log("Heartbeat server was shutdown.");
285         else log("Got expcetion in heartbeat server: " + e.getMessage());
286       } finally {
287         setRunning(false);
288         log("Heartbeat server terminated.");
289       }
290     }
291   }
292
293   private static class HeartbeatThread extends Thread JavaDoc {
294     private final Socket JavaDoc socket;
295     private final int port;
296
297     private BufferedReader JavaDoc in;
298     private PrintWriter JavaDoc out;
299
300     public HeartbeatThread(Socket JavaDoc socket) {
301       if (socket == null) throw new NullPointerException JavaDoc();
302       this.socket = socket;
303       try {
304         this.socket.setSoTimeout(MAX_HEARTBEAT_DELAY);
305       } catch (SocketException JavaDoc e) {
306         throw new RuntimeException JavaDoc(e);
307       }
308       this.port = socket.getPort();
309       this.setDaemon(true);
310     }
311
312     public synchronized void sendKillSignal() {
313       try {
314         out.println(SHUTDOWN);
315         out.flush();
316       } catch (Exception JavaDoc e) {
317         log("Socket Exception: client may have already shutdown.");
318       }
319     }
320
321     public boolean isAppServerAlive() {
322       try {
323         log("sending ARE_YOU_ALIVE...");
324         out.println(ARE_YOU_ALIVE);
325         out.flush();
326         String JavaDoc result = in.readLine();
327         log("received: " + result);
328         if (result == null || result.endsWith("TCServerMain")) {
329           // not an apserver
330
return false;
331         } else {
332           return true;
333         }
334       } catch (IOException JavaDoc e) {
335         log("got exception: " + e.getMessage());
336         return false;
337       }
338     }
339
340     public void run() {
341       try {
342         out = new PrintWriter JavaDoc(this.socket.getOutputStream(), true);
343         in = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(this.socket.getInputStream()));
344
345         while (true) {
346           synchronized (this) {
347             out.println(HEARTBEAT);
348           }
349           reallySleep(NORMAL_HEARTBEAT_INTERVAL);
350         }
351       } catch (SocketException JavaDoc e) {
352         log("Socket Exception: client may have already shutdown.");
353         log(e.getClass() + ": " + Arrays.asList(e.getStackTrace()));
354       } catch (Exception JavaDoc e) {
355         log("Heartbeat thread for child process (port " + port + ") got exception");
356         log(e.getClass() + ": " + Arrays.asList(e.getStackTrace()));
357       } finally {
358         log("Heartbeat thread for child process (port " + port + ") terminating.");
359         try {
360           socket.close();
361         } catch (IOException JavaDoc e) {
362           throw new RuntimeException JavaDoc(e);
363         }
364       }
365     }
366   }
367 }
368
Popular Tags