1 31 package org.objectweb.proactive.core.process.lsf; 32 33 import org.objectweb.proactive.core.process.SimpleExternalProcess; 34 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator; 35 import org.objectweb.proactive.core.process.ExternalProcess; 36 import org.objectweb.proactive.core.process.MessageSink; 37 import org.objectweb.proactive.core.util.MessageLogger; 38 39 60 61 public class LSFBSubProcess extends AbstractExternalProcessDecorator { 62 63 private static final String FILE_SEPARATOR = System.getProperty("file.separator"); 64 65 private static final String DEFAULT_SCRIPT_LOCATION = System.getProperty("user.home")+FILE_SEPARATOR+"ProActive"+FILE_SEPARATOR+"scripts"+FILE_SEPARATOR+"unix"+FILE_SEPARATOR+"cluster"+FILE_SEPARATOR+"startRuntime.sh "; 66 67 public final static String DEFAULT_LSFPATH =FILE_SEPARATOR+"usr"+FILE_SEPARATOR+"local"+FILE_SEPARATOR+"lsf"+FILE_SEPARATOR+"bin"; 68 69 public final static String DEFAULT_BSUBPATH=DEFAULT_LSFPATH+FILE_SEPARATOR+"bsub"; 70 71 public final static String DEFAULT_BJOBPATH=DEFAULT_LSFPATH+FILE_SEPARATOR+"bjobs"; 72 73 public static final String DEFAULT_QUEUE_NAME = "normal"; 74 75 protected static final String DEFAULT_PROCESSOR_NUMBER = "1"; 76 77 protected int jobID; 78 79 protected String queueName = DEFAULT_QUEUE_NAME; 80 81 protected String hostList; 82 83 protected String scriptLocation = DEFAULT_SCRIPT_LOCATION; 84 85 protected String processor = DEFAULT_PROCESSOR_NUMBER; 86 87 protected String interactive = "false"; 88 89 protected String res_requirement=""; 90 91 98 public LSFBSubProcess() { 99 super(); 100 setCompositionType(GIVE_COMMAND_AS_PARAMETER); 101 this.hostname = null; 102 } 103 104 105 110 public LSFBSubProcess(ExternalProcess targetProcess) { 111 super(targetProcess); 112 this.hostname = null; 113 } 114 115 116 120 public void setInputMessageLogger(MessageLogger inputMessageLogger) { 121 super.setInputMessageLogger(new CompositeMessageLogger(new ParserMessageLogger(), inputMessageLogger)); 122 } 123 124 public void setOutputMessageSink(MessageSink outputMessageSink) { 125 if (outputMessageSink == null) { 126 super.setOutputMessageSink(new SimpleMessageSink()); 127 } else { 128 super.setOutputMessageSink(outputMessageSink); 129 } 130 } 131 132 133 138 public static ExternalProcess buildBKillProcess(int jobID) { 139 return new SimpleExternalProcess("bkill "+jobID); 140 } 141 142 143 public static void main(String [] args) { 144 try { 145 LSFBSubProcess p = new LSFBSubProcess(new SimpleExternalProcess("ls -lsa")); 146 p.startProcess(); 147 } catch (Exception e) { 148 e.printStackTrace(); 149 } 150 } 151 152 153 157 public int getJobID() { 158 return jobID; 159 } 160 161 162 166 public String getQueueName() { 167 return queueName; 168 } 169 170 171 175 public void setQueueName(String queueName) { 176 checkStarted(); 177 if (queueName == null) throw new NullPointerException (); 178 this.queueName = queueName; 179 } 180 181 182 186 public void setHostList(String hostList){ 187 checkStarted(); 188 this.hostList = hostList; 189 } 190 191 192 196 public String getHostList(){ 197 return hostList; 198 } 199 200 201 205 public String isInteractive(){ 206 return interactive; 207 } 208 209 210 214 public void setInteractive(String interactive){ 215 this.interactive = interactive; 216 } 217 218 219 223 public void setProcessorNumber(String processor){ 224 checkStarted(); 225 if(processor != null){ 226 this.processor = processor; 227 } 228 } 229 230 231 235 public String getProcessorNumber(){ 236 return processor; 237 } 238 239 240 241 public void setScriptLocation(String location){ 242 checkStarted(); 243 if(location != null){ 244 this.scriptLocation = location; 245 } 246 } 247 248 public String getScriptLocation(){ 249 return scriptLocation; 250 } 251 252 253 254 255 public String getRes_requirement() { 256 return res_requirement; 257 } 258 259 260 public void setRes_requirement(String res_requirement) { 261 this.res_requirement = "-R "+res_requirement+" "; 262 } 263 264 268 protected String internalBuildCommand() { 269 return buildEnvironmentCommand()+buildBSubCommand(); 270 } 271 272 273 protected String buildBSubCommand() { 274 StringBuffer bSubCommand = new StringBuffer (); 275 bSubCommand.append(DEFAULT_BSUBPATH); 276 if(interactive.equals("true")) bSubCommand.append(" -I"); 277 bSubCommand.append(" -n "+processor+" -q "+queueName+" "); 278 if(hostList != null){ 279 bSubCommand.append("-m "+hostList+" "); 280 } 281 if(getCompositionType() == GIVE_COMMAND_AS_PARAMETER){ 282 bSubCommand.append(getRes_requirement()+scriptLocation+" "+getTargetProcess().getCommand()); 283 } 284 285 return bSubCommand.toString(); 287 } 288 289 290 protected String buildBJobsCommand() { 291 return DEFAULT_BJOBPATH+" "+jobID; 292 } 293 294 301 protected int parseJobID(String message) { 302 logger.info("parseJobID analyzing "+message); 303 String beginJobIDMarkup = "Job <"; 304 String endJobIDMarkup = ">"; 305 int n1 = message.indexOf(beginJobIDMarkup); 306 if (n1 == -1) return 0; 307 int n2 = message.indexOf(endJobIDMarkup, n1+beginJobIDMarkup.length()); 308 if (n2 == -1) return 0; 309 String id = message.substring(n1+beginJobIDMarkup.length(), n2); 310 logger.info("!!!!!!!!!!!!!! JOBID = "+id); 311 try { 312 return Integer.parseInt(id); 313 } catch (NumberFormatException e) { 314 return 0; 315 } 316 } 317 318 319 333 protected String parseHostname(String message) { 334 logger.info("parseHostname analyzing "+message); 335 java.util.StringTokenizer st = new java.util.StringTokenizer (message); 336 if (st.countTokens() < 6) return null; try { 338 int currentJobID = Integer.parseInt(st.nextToken()); 339 if (currentJobID != jobID) return null; } catch (NumberFormatException e) { 341 return null; 342 } 343 st.nextToken(); String status = st.nextToken(); 345 if (status.equals("PEND")) { 346 return ""; } 348 st.nextToken(); st.nextToken(); String hostname = st.nextToken(); 351 logger.info("!!!!!!!!!!!!!! hostname = "+hostname); 352 logger.info("token "+st.countTokens()); 353 return hostname; 354 } 355 356 protected void sendJobDetailsCommand() { 357 outputMessageSink.setMessage(buildBJobsCommand()); 358 } 359 360 361 365 366 367 371 374 public class ParserMessageLogger implements MessageLogger,java.io.Serializable { 375 376 private boolean foundJobID; 377 private boolean foundHostname; 378 379 public ParserMessageLogger() { 380 } 381 382 public void log(String message) { 383 if (! foundJobID) { 386 jobID = parseJobID(message); 387 foundJobID = jobID != 0; 388 if (foundJobID) sendJobDetailsCommand(); 389 } else if (! foundHostname) { 390 hostname = parseHostname(message); 391 if (hostname != null) { 392 foundHostname = hostname.length() > 0; 394 if (foundHostname) { 399 outputMessageSink.setMessage(null); 401 } else { 402 try { 404 Thread.sleep(2000); 405 } catch (InterruptedException e) { 406 } 407 sendJobDetailsCommand(); 408 } 409 } 410 } 411 } 412 413 public void log(Throwable t) { 414 } 415 416 public void log(String message, Throwable t) { 417 } 418 419 } 421 422 423 424 } 425 | Popular Tags |