1 5 package com.tc.process; 6 7 import java.io.BufferedReader ; 8 import java.io.IOException ; 9 import java.io.InputStreamReader ; 10 import java.io.PrintWriter ; 11 import java.net.ServerSocket ; 12 import java.net.Socket ; 13 import java.net.SocketException ; 14 import java.util.ArrayList ; 15 import java.util.Arrays ; 16 import java.util.Date ; 17 import java.util.Iterator ; 18 import java.util.List ; 19 20 30 public final class LinkedJavaProcessPollingAgent { 31 32 private static final int NORMAL_HEARTBEAT_INTERVAL = 15 * 1000; 33 34 private static final String HEARTBEAT = "HEARTBEAT"; 35 private static final String SHUTDOWN = "SHUTDOWN"; 36 private static final String ARE_YOU_ALIVE = "ARE_YOU_ALIVE"; 37 38 private static final int MAX_HEARTBEAT_DELAY = 2 * NORMAL_HEARTBEAT_INTERVAL; 39 private static final int EXIT_CODE = 42; 40 private static HeartbeatServer server = null; 41 private static PingThread client = null; 42 43 public static synchronized void startHeartBeatServer() { 44 if (server == null) { 45 server = new HeartbeatServer(); 46 server.start(); 47 } 48 } 49 50 public static synchronized boolean isServerRunning() { 51 if (server == null) { return false; } 52 return server.isRunning(); 53 } 54 55 60 public static synchronized int getChildProcessHeartbeatServerPort() { 61 if (server == null) throw new IllegalStateException ("Heartbeat Server has not started!"); 62 return server.getPort(); 63 } 64 65 73 public static synchronized void startClientWatchdogService(int pingPort, String childClass, boolean honorShutdownMsg) { 74 if (client == null) { 75 client = new PingThread(pingPort, childClass, honorShutdownMsg); 76 client.start(); 77 System.err.println("Child-process watchdog for class " + childClass + " monitoring server on port: " + pingPort); 78 } 79 } 80 81 public static synchronized void startClientWatchdogService(int pingPort, String childClass) { 82 startClientWatchdogService(pingPort, childClass, false); 83 } 84 85 88 public static synchronized void shutdown() { 89 if (server != null) { 90 server.shutdown(); 91 server = null; 92 } 93 } 94 95 public static synchronized boolean isAnyAppServerAlive() { 96 if (server == null) return false; 97 return server.isAnyAppServerAlive(); 98 } 99 100 private static synchronized void log(String msg) { 101 System.out.println("LJP: [" + new Date () + "] " + msg); 102 } 103 104 static void reallySleep(long millis) { 105 try { 106 long millisLeft = millis; 107 while (millisLeft > 0) { 108 long start = System.currentTimeMillis(); 109 Thread.sleep(millisLeft); 110 millisLeft -= System.currentTimeMillis() - start; 111 } 112 } catch (InterruptedException ie) { 113 throw new AssertionError (ie); 114 } 115 } 116 117 private static class PingThread extends Thread { 118 private final int pingPort; 119 private final String forClass; 120 private boolean honorShutdownMsg; 121 private BufferedReader in; 122 private PrintWriter out; 123 124 public PingThread(int port, String forClass, boolean honorShutdownMsg) { 125 this(port, forClass); 126 this.honorShutdownMsg = honorShutdownMsg; 127 } 128 129 public PingThread(int port, String forClass) { 130 if (!(port > 0)) throw new RuntimeException ("Port not > 0"); 131 if (forClass.trim().length() == 0) throw new RuntimeException ("blank argument"); 132 133 this.pingPort = port; 134 this.forClass = forClass; 135 136 this.setDaemon(true); 137 } 138 139 public void run() { 140 int port = -1; 141 Socket toServer = null; 142 try { 143 toServer = new Socket ("localhost", this.pingPort); 144 toServer.setSoTimeout(MAX_HEARTBEAT_DELAY); 145 146 port = toServer.getLocalPort(); 147 148 in = new BufferedReader (new InputStreamReader (toServer.getInputStream())); 149 out = new PrintWriter (toServer.getOutputStream(), true); 150 151 while (true) { 152 long start = System.currentTimeMillis(); 153 154 String data = in.readLine(); 155 if (HEARTBEAT.equals(data)) { 156 log("Got heartbeat for main class " + this.forClass); 157 } else if (SHUTDOWN.equals(data)) { 158 if (!honorShutdownMsg) continue; 159 log("Client received shutdown message from server. Shutting Down..."); 160 System.exit(0); 161 } else if (ARE_YOU_ALIVE.equals(data)) { 162 out.println(forClass); 163 out.flush(); 164 } else { 165 throw new Exception ("Doesn't recognize data: " + data); 166 } 167 168 long elapsed = System.currentTimeMillis() - start; 169 if (elapsed > MAX_HEARTBEAT_DELAY) { throw new Exception ("Client took too long to response."); } 170 } 171 } catch (Exception e) { 172 log(e.getClass() + ": " + Arrays.asList(e.getStackTrace())); 173 log("Didn't get heartbeat for at least " + MAX_HEARTBEAT_DELAY + " milliseconds. Killing self (port " + port 174 + ")."); 175 } finally { 176 log("Ping thread exiting port (" + port + ")"); 177 if (toServer != null) { 178 try { 179 toServer.close(); 180 } catch (IOException e) { 181 throw new RuntimeException (e); 182 } 183 } 184 System.exit(EXIT_CODE); 185 } 186 } 187 } 188 189 private static class HeartbeatServer extends Thread { 190 private int port; 191 private List heartBeatThreads = new ArrayList (); 192 private ServerSocket serverSocket = null; 193 private boolean running = false; 194 private volatile boolean isStarting = false; 195 196 public HeartbeatServer() { 197 this.port = -1; 198 this.setDaemon(true); 199 } 200 201 public synchronized boolean isAnyAppServerAlive() { 202 boolean foundAlive = false; 203 synchronized (heartBeatThreads) { 204 for (Iterator it = heartBeatThreads.iterator(); it.hasNext();) { 205 HeartbeatThread hb = (HeartbeatThread) it.next(); 206 boolean aliveStatus = hb.isAppServerAlive(); 207 log("pinging: " + hb.port + ", alive? = " + aliveStatus); 208 foundAlive = foundAlive || aliveStatus; 209 } 210 } 211 return foundAlive; 212 } 213 214 public synchronized int getPort() { 215 while (port == -1) { 216 try { 217 this.wait(5000); 218 } catch (InterruptedException e) { 219 throw new RuntimeException ("Server might have not started yet", e); 220 } 221 } 222 return port; 223 } 224 225 public synchronized boolean isRunning() { 226 while (isStarting) { 227 try { 228 this.wait(); 229 } catch (InterruptedException e) { 230 throw new RuntimeException (e); 231 } 232 } 233 return running; 234 } 235 236 public synchronized void setRunning(boolean status) { 237 running = status; 238 } 239 240 private synchronized void shutdown() { 241 setRunning(false); 242 243 if (serverSocket != null) { 244 try { 245 serverSocket.close(); } catch (IOException e) { 247 throw new RuntimeException (e); 248 } 249 } 250 251 synchronized (heartBeatThreads) { 252 HeartbeatThread ht; 253 for (Iterator i = heartBeatThreads.iterator(); i.hasNext();) { 254 ht = (HeartbeatThread) i.next(); 255 ht.sendKillSignal(); 256 } 257 } 258 } 259 260 public void run() { 261 262 try { 263 isStarting = true; 264 synchronized (this) { 265 serverSocket = new ServerSocket (0); 266 this.port = serverSocket.getLocalPort(); 267 setRunning(true); 268 isStarting = false; 269 this.notifyAll(); 270 } 271 272 System.err.println("Child-process heartbeat server started on port: " + port); 273 274 while (true) { 275 Socket sock = serverSocket.accept(); 276 log("Got heartbeat connection from client; starting heartbeat."); 277 synchronized (heartBeatThreads) { 278 HeartbeatThread hbt = new HeartbeatThread(sock); 279 heartBeatThreads.add(hbt); 280 hbt.start(); 281 } 282 } 283 } catch (Exception e) { 284 if (!running) log("Heartbeat server was shutdown."); 285 else log("Got expcetion in heartbeat server: " + e.getMessage()); 286 } finally { 287 setRunning(false); 288 log("Heartbeat server terminated."); 289 } 290 } 291 } 292 293 private static class HeartbeatThread extends Thread { 294 private final Socket socket; 295 private final int port; 296 297 private BufferedReader in; 298 private PrintWriter out; 299 300 public HeartbeatThread(Socket socket) { 301 if (socket == null) throw new NullPointerException (); 302 this.socket = socket; 303 try { 304 this.socket.setSoTimeout(MAX_HEARTBEAT_DELAY); 305 } catch (SocketException e) { 306 throw new RuntimeException (e); 307 } 308 this.port = socket.getPort(); 309 this.setDaemon(true); 310 } 311 312 public synchronized void sendKillSignal() { 313 try { 314 out.println(SHUTDOWN); 315 out.flush(); 316 } catch (Exception e) { 317 log("Socket Exception: client may have already shutdown."); 318 } 319 } 320 321 public boolean isAppServerAlive() { 322 try { 323 log("sending ARE_YOU_ALIVE..."); 324 out.println(ARE_YOU_ALIVE); 325 out.flush(); 326 String result = in.readLine(); 327 log("received: " + result); 328 if (result == null || result.endsWith("TCServerMain")) { 329 return false; 331 } else { 332 return true; 333 } 334 } catch (IOException e) { 335 log("got exception: " + e.getMessage()); 336 return false; 337 } 338 } 339 340 public void run() { 341 try { 342 out = new PrintWriter (this.socket.getOutputStream(), true); 343 in = new BufferedReader (new InputStreamReader (this.socket.getInputStream())); 344 345 while (true) { 346 synchronized (this) { 347 out.println(HEARTBEAT); 348 } 349 reallySleep(NORMAL_HEARTBEAT_INTERVAL); 350 } 351 } catch (SocketException e) { 352 log("Socket Exception: client may have already shutdown."); 353 log(e.getClass() + ": " + Arrays.asList(e.getStackTrace())); 354 } catch (Exception e) { 355 log("Heartbeat thread for child process (port " + port + ") got exception"); 356 log(e.getClass() + ": " + Arrays.asList(e.getStackTrace())); 357 } finally { 358 log("Heartbeat thread for child process (port " + port + ") terminating."); 359 try { 360 socket.close(); 361 } catch (IOException e) { 362 throw new RuntimeException (e); 363 } 364 } 365 } 366 } 367 } 368 | Popular Tags |