1 31 package org.objectweb.proactive.core.process; 32 33 import org.apache.log4j.Logger; 34 35 import org.objectweb.proactive.core.util.MessageLogger; 36 37 38 public abstract class AbstractExternalProcess extends AbstractUniversalProcess 39 implements ExternalProcess { 40 protected static Logger clogger = Logger.getLogger(AbstractExternalProcess.class.getName()); 41 protected static final boolean IS_WINDOWS_SYSTEM = System.getProperty( 42 "os.name").toLowerCase().startsWith("win"); 43 protected Process externalProcess; 44 private boolean shouldRun; 45 protected MessageLogger inputMessageLogger; 46 protected MessageLogger errorMessageLogger; 47 protected MessageSink outputMessageSink; 48 private ThreadActivityMonitor inThreadMonitor; 49 private ThreadActivityMonitor errThreadMonitor; 50 51 protected AbstractExternalProcess() { 55 } 56 57 public AbstractExternalProcess(MessageLogger messageLogger) { 58 this(messageLogger, messageLogger, null); 59 } 60 61 public AbstractExternalProcess(MessageLogger inputMessageLogger, 62 MessageLogger errorMessageLogger) { 63 this(inputMessageLogger, errorMessageLogger, null); 64 } 65 66 public AbstractExternalProcess(MessageLogger inputMessageLogger, 67 MessageLogger errorMessageLogger, MessageSink outputMessageSink) { 68 this.inputMessageLogger = inputMessageLogger; 69 this.errorMessageLogger = errorMessageLogger; 70 this.outputMessageSink = outputMessageSink; 71 } 72 73 public MessageLogger getInputMessageLogger() { 80 return inputMessageLogger; 81 } 82 83 public MessageLogger getErrorMessageLogger() { 84 return errorMessageLogger; 85 } 86 87 public MessageSink getOutputMessageSink() { 88 return outputMessageSink; 89 } 90 91 public void setInputMessageLogger(MessageLogger inputMessageLogger) { 92 checkStarted(); 93 this.inputMessageLogger = inputMessageLogger; 94 } 95 96 public void setErrorMessageLogger(MessageLogger errorMessageLogger) { 97 checkStarted(); 98 this.errorMessageLogger = errorMessageLogger; 99 } 100 101 public void setOutputMessageSink(MessageSink outputMessageSink) { 102 checkStarted(); 103 this.outputMessageSink = outputMessageSink; 104 } 105 106 protected abstract String buildCommand(); 110 111 protected String buildEnvironmentCommand() { 112 if (environment == null) { 113 return ""; 114 } 115 if (IS_WINDOWS_SYSTEM) { 116 return buildWindowsEnvironmentCommand(); 117 } else { 118 return buildUnixEnvironmentCommand(); 119 } 120 } 121 122 protected String buildWindowsEnvironmentCommand() { 123 StringBuffer sb = new StringBuffer (); 124 for (int i = 0; i < environment.length; i++) { 125 inputMessageLogger.log(" exporting variable " + 126 environment[i]); 127 sb.append("set "); 128 sb.append(environment[i]); 129 sb.append(" ; "); 130 } 131 return sb.toString(); 132 } 133 134 protected String buildUnixEnvironmentCommand() { 135 StringBuffer sb = new StringBuffer (); 136 for (int i = 0; i < environment.length; i++) { 137 inputMessageLogger.log(" exporting variable " + 138 environment[i]); 139 sb.append("export "); 140 sb.append(environment[i]); 141 sb.append(" ; "); 142 } 143 return sb.toString(); 144 } 145 146 protected void internalStartProcess(String commandToExecute) 147 throws java.io.IOException { 148 try { 149 externalProcess = Runtime.getRuntime().exec(commandToExecute); 151 java.io.BufferedReader in = new java.io.BufferedReader (new java.io.InputStreamReader ( 152 externalProcess.getInputStream())); 153 java.io.BufferedReader err = new java.io.BufferedReader (new java.io.InputStreamReader ( 154 externalProcess.getErrorStream())); 155 java.io.BufferedWriter out = new java.io.BufferedWriter (new java.io.OutputStreamWriter ( 156 externalProcess.getOutputStream())); 157 handleProcess(in, out, err); 158 } catch (java.io.IOException e) { 159 isFinished = true; 160 throw e; 161 } 162 } 163 164 protected void internalStopProcess() { 165 if (externalProcess != null) { 167 externalProcess.destroy(); 168 } 169 if (outputMessageSink != null) { 170 outputMessageSink.setMessage(null); 171 } 172 } 173 174 protected int internalWaitFor() throws InterruptedException { 175 return externalProcess.waitFor(); 176 } 177 178 protected void handleProcess(java.io.BufferedReader in, 179 java.io.BufferedWriter out, java.io.BufferedReader err) { 180 handleInput(in); 181 handleOutput(out); 182 handleError(err); 183 } 184 185 protected void handleInput(java.io.BufferedReader in) { 186 if (inputMessageLogger == null) { 187 return; 188 } 189 inThreadMonitor = new ThreadActivityMonitor(); 190 Runnable r = new ProcessInputHandler(in, inputMessageLogger, 191 inThreadMonitor); 192 Thread t = new Thread (r, "IN -> " + getShortName(getCommand(), 20)); 193 t.start(); 194 } 195 196 protected void handleError(java.io.BufferedReader err) { 197 if (errorMessageLogger == null) { 198 return; 199 } 200 errThreadMonitor = new ThreadActivityMonitor(); 201 Runnable r = new ProcessInputHandler(err, errorMessageLogger, 202 errThreadMonitor); 203 Thread t = new Thread (r, "ERR -> " + getShortName(getCommand(), 20)); 204 t.start(); 205 } 206 207 protected void handleOutput(java.io.BufferedWriter out) { 208 if (outputMessageSink == null) { 209 return; 210 } 211 212 Runnable r = new ProcessOutputHandler(out, outputMessageSink); 214 Thread t = new Thread (r, "OUT -> " + getShortName(getCommand(), 20)); 215 t.start(); 216 } 217 218 private final String getShortName(String name, int length) { 222 return name.substring(0, Math.min(name.length(), length)); 223 } 224 225 private final void waitForMonitoredThread() { 226 do { 227 try { 228 Thread.sleep(300); 229 } catch (InterruptedException e) { 230 } 231 } while (errThreadMonitor.isActive() || inThreadMonitor.isActive()); 232 } 233 234 private static class ThreadActivityMonitor implements java.io.Serializable { 238 private boolean isActive; 239 240 public boolean isActive() { 241 return isActive; 242 } 243 244 public void setActive(boolean b) { 245 isActive = b; 246 } 247 } 248 249 252 public static class StandardOutputMessageLogger implements MessageLogger, 253 java.io.Serializable { 254 public StandardOutputMessageLogger() { 255 } 257 258 public void log(String message) { 259 messageLogger.info(message); 260 } 261 262 public void log(Throwable t) { 263 t.printStackTrace(); 264 } 265 266 public void log(String message, Throwable t) { 267 messageLogger.info(message); 268 t.printStackTrace(); 269 } 270 } 271 272 274 277 public static class NullMessageLogger implements MessageLogger, 278 java.io.Serializable { 279 public NullMessageLogger() { 280 } 281 282 public void log(String message) { 283 } 284 285 public void log(Throwable t) { 286 } 287 288 public void log(String message, Throwable t) { 289 } 290 } 291 292 294 297 public static class SimpleMessageSink implements MessageSink, 298 java.io.Serializable { 299 private String message; 300 private boolean isActive = true; 301 302 public synchronized String getMessage() { 303 if (!isActive) { 304 return null; 305 } 306 while ((message == null) && isActive) { 307 try { 308 wait(); 309 } catch (InterruptedException e) { 310 } 311 } 312 String messageToSend = message; 313 message = null; 314 notifyAll(); 315 return messageToSend; 316 } 317 318 public synchronized void setMessage(String messageToPost) { 319 if (!isActive) { 320 return; 321 } 322 while ((message != null) && isActive) { 323 try { 324 wait(); 325 } catch (InterruptedException e) { 326 } 327 } 328 if (messageToPost == null) { 329 isActive = false; 330 } 331 this.message = messageToPost; 332 notifyAll(); 333 } 334 335 public synchronized boolean hasMessage() { 336 return message != null; 337 } 338 339 public synchronized boolean isActive() { 340 return isActive; 341 } 342 } 343 344 346 350 protected class ProcessInputHandler implements Runnable { 351 private java.io.BufferedReader in; 352 private MessageLogger logger; 353 private ThreadActivityMonitor threadMonitor; 354 355 public ProcessInputHandler(java.io.BufferedReader in, 356 MessageLogger logger, ThreadActivityMonitor threadMonitor) { 357 this.in = in; 358 this.logger = logger; 359 this.threadMonitor = threadMonitor; 360 } 361 362 public void run() { 363 if (AbstractExternalProcess.clogger.isDebugEnabled()) { 364 AbstractExternalProcess.clogger.debug("Process started Thread=" + 365 Thread.currentThread().getName()); 366 } 367 368 try { 370 while (true) { 371 377 String s = in.readLine(); 378 if (AbstractExternalProcess.clogger.isDebugEnabled()) { 379 AbstractExternalProcess.clogger.debug(s); 382 } 383 384 threadMonitor.setActive(true); 386 if (s == null) { 387 break; 388 } else { 389 logger.log(s); 390 } 391 } 392 } catch (java.io.IOException e) { 393 logger.log(e); 394 } finally { 395 isFinished = true; 396 threadMonitor.setActive(false); 397 try { 398 in.close(); 399 } catch (java.io.IOException e) { 400 e.printStackTrace(); 401 } 402 logger.log("Process finished Thread=" + 403 Thread.currentThread().getName()); 404 } 405 } 406 } 407 408 410 414 protected class ProcessOutputHandler implements Runnable { 415 private java.io.BufferedWriter out; 416 private MessageSink messageSink; 417 418 public ProcessOutputHandler(java.io.BufferedWriter out, 419 MessageSink messageSink) { 420 this.out = out; 421 this.messageSink = messageSink; 422 } 423 424 public void run() { 425 try { 426 while (true) { 427 waitForMonitoredThread(); 428 String message = messageSink.getMessage(); 430 if (message == null) { 431 break; 432 } 433 try { 434 out.write(message); 435 out.newLine(); 436 out.flush(); 437 } catch (java.io.IOException e) { 439 break; 440 } 441 message = null; 442 } 443 } finally { 444 isFinished = true; 445 waitForMonitoredThread(); 446 try { 447 out.close(); 448 } catch (java.io.IOException e) { 449 e.printStackTrace(); 450 } 451 } 452 } 453 } 454 455 } 457 | Popular Tags |