1 16 package com.sun.slamd.example; 17 18 19 20 import java.io.*; 21 import java.text.*; 22 import java.util.*; 23 import com.sun.slamd.client.*; 24 import com.sun.slamd.job.*; 25 import com.sun.slamd.parameter.*; 26 import com.sun.slamd.stat.*; 27 28 29 30 39 public class LDIF2DBImportRateJobClass 40 extends JobClass 41 { 42 46 public static final String STAT_TRACKER_AVERAGE_RATE = 47 "Average Import Rate (Entries/Second)"; 48 49 50 51 55 public static final String STAT_TRACKER_RECENT_RATE = 56 "Recent Import Rate (Entries/Second)"; 57 58 59 60 64 public static final String STAT_TRACKER_ENTRIES_PROCESSED = 65 "Total Entries Processed"; 66 67 68 69 72 public static final String STAT_TRACKER_HIT_RATIO = "Hit Ratio (Percent)"; 73 74 75 76 79 public static final int READ_BUFFER_SIZE = 4096; 80 81 82 83 BooleanParameter logOutputParameter = 85 new BooleanParameter("log_output", "Log Command Output", 86 "Indicates whether the ldif2db output should be " + 87 "logged.", true); 88 89 StringParameter dbNameParameter = 92 new StringParameter("db_name", "Database Name", 93 "The name of the database into which the data is " + 94 "to be imported.", true, "userRoot"); 95 96 StringParameter ldif2dbCommandParmeter = 98 new StringParameter("ldif2db_command", "ldif2db Command", 99 "The path to the ldif2db command to be executed.", 100 true, ""); 101 102 StringParameter ldifFileParameter = 104 new StringParameter("ldif_file", "LDIF File", 105 "The path to the LDIF file to be imported.", true, 106 ""); 107 108 static boolean logOutput; 110 111 PlaceholderParameter placeholder = new PlaceholderParameter(); 113 114 static String dbName; 116 117 static String ldif2dbCommand; 119 120 static String ldifFile; 122 123 byte[] readBuffer; 125 126 127 PeriodicEventTracker entriesProcessed; 129 PeriodicEventTracker averageRate; 130 PeriodicEventTracker recentRate; 131 PeriodicEventTracker hitRatio; 132 133 134 SimpleDateFormat dateFormat; 137 138 139 140 141 147 public LDIF2DBImportRateJobClass() 148 { 149 super(); 150 } 151 152 153 154 159 public String getJobName() 160 { 161 return "ldif2db Import Rate"; 162 } 163 164 165 166 171 public String getJobDescription() 172 { 173 return "This job can be used to import data into a Sun ONE Directory " + 174 "Server using the ldif2db utility."; 175 } 176 177 178 179 185 public String getJobCategoryName() 186 { 187 return "LDAP"; 188 } 189 190 191 192 203 public int overrideNumClients() 204 { 205 return 1; 206 } 207 208 209 210 223 public int overrideThreadsPerClient() 224 { 225 return 1; 226 } 227 228 229 230 237 public ParameterList getParameterStubs() 238 { 239 Parameter[] parameters = new Parameter[] 240 { 241 placeholder, 242 ldif2dbCommandParmeter, 243 dbNameParameter, 244 ldifFileParameter, 245 logOutputParameter 246 }; 247 248 return new ParameterList(parameters); 249 } 250 251 252 253 275 public StatTracker[] getStatTrackerStubs(String clientID, String threadID, 276 int collectionInterval) 277 { 278 return new StatTracker[] 279 { 280 new PeriodicEventTracker(clientID, threadID, STAT_TRACKER_AVERAGE_RATE, 281 collectionInterval), 282 new PeriodicEventTracker(clientID, threadID, STAT_TRACKER_RECENT_RATE, 283 collectionInterval), 284 new PeriodicEventTracker(clientID, threadID, 285 STAT_TRACKER_ENTRIES_PROCESSED, 286 collectionInterval), 287 new PeriodicEventTracker(clientID, threadID, STAT_TRACKER_HIT_RATIO, 288 collectionInterval), 289 }; 290 } 291 292 293 294 299 public StatTracker[] getStatTrackers() 300 { 301 return new StatTracker[] 302 { 303 averageRate, 304 recentRate, 305 entriesProcessed, 306 hitRatio 307 }; 308 } 309 310 311 312 322 public void initializeClient(String clientID, ParameterList parameters) 323 throws UnableToRunException 324 { 325 ldif2dbCommand = null; 326 ldif2dbCommandParmeter = 327 parameters.getStringParameter(ldif2dbCommandParmeter.getName()); 328 if (ldif2dbCommandParmeter != null) 329 { 330 ldif2dbCommand = ldif2dbCommandParmeter.getStringValue(); 331 } 332 333 334 ldifFile = null; 335 ldifFileParameter = 336 parameters.getStringParameter(ldifFileParameter.getName()); 337 if (ldifFileParameter != null) 338 { 339 ldifFile = ldifFileParameter.getStringValue(); 340 } 341 342 343 dbName = null; 344 dbNameParameter = parameters.getStringParameter(dbNameParameter.getName()); 345 if (dbNameParameter != null) 346 { 347 dbName = dbNameParameter.getStringValue(); 348 } 349 350 351 logOutput = true; 352 logOutputParameter = 353 parameters.getBooleanParameter(logOutputParameter.getName()); 354 if (logOutputParameter != null) 355 { 356 logOutput = logOutputParameter.getBooleanValue(); 357 } 358 } 359 360 361 362 378 public void initializeThread(String clientID, String threadID, 379 int collectionInterval, ParameterList parameters) 380 throws UnableToRunException 381 { 382 averageRate = new PeriodicEventTracker(clientID, threadID, 384 STAT_TRACKER_AVERAGE_RATE, 385 collectionInterval); 386 recentRate = new PeriodicEventTracker(clientID, threadID, 387 STAT_TRACKER_RECENT_RATE, 388 collectionInterval); 389 entriesProcessed = new PeriodicEventTracker(clientID, threadID, 390 STAT_TRACKER_ENTRIES_PROCESSED, 391 collectionInterval); 392 hitRatio = new PeriodicEventTracker(clientID, threadID, 393 STAT_TRACKER_HIT_RATIO, 394 collectionInterval); 395 396 averageRate.setFlatBetweenPoints(false); 397 recentRate.setFlatBetweenPoints(false); 398 entriesProcessed.setFlatBetweenPoints(false); 399 hitRatio.setFlatBetweenPoints(false); 400 401 402 readBuffer = new byte[READ_BUFFER_SIZE]; 404 405 406 dateFormat = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]"); 408 } 409 410 411 412 415 public void runJob() 416 { 417 Runtime runtime = Runtime.getRuntime(); 418 Process process = null; 419 420 try 421 { 422 String [] commandArray = { ldif2dbCommand, "-n", dbName, "-i", ldifFile }; 423 process = runtime.exec(commandArray); 424 } 425 catch (IOException ioe) 426 { 427 logMessage("Unable to execute ldif2db command \"" + ldif2dbCommand + 428 " -n " + dbName + " -i " + ldifFile + "\": " + ioe); 429 indicateStoppedDueToError(); 430 return; 431 } 432 433 434 BufferedInputStream stdOutStream = 435 new BufferedInputStream(process.getInputStream()); 436 BufferedInputStream stdErrStream = 437 new BufferedInputStream(process.getErrorStream()); 438 439 averageRate.startTracker(); 440 recentRate.startTracker(); 441 entriesProcessed.startTracker(); 442 hitRatio.startTracker(); 443 444 445 while (true) 446 { 447 try 448 { 449 if (logOutput) 450 { 451 if (stdOutStream.available() > 0) 452 { 453 while ((! shouldStop()) && (stdOutStream.available() > 0)) 454 { 455 int bytesRead = stdOutStream.read(readBuffer); 456 String [] outputStrs = byteArrayToStrings(readBuffer, bytesRead); 457 for (int i=0; i < outputStrs.length; i++) 458 { 459 logMessage("STDOUT: " + outputStrs[i]); 460 processLine(outputStrs[i]); 461 } 462 } 463 } 464 465 if (stdErrStream.available() > 0) 466 { 467 while ((! shouldStop()) && (stdErrStream.available() > 0)) 468 { 469 int bytesRead = stdErrStream.read(readBuffer); 470 String [] errorStrs = byteArrayToStrings(readBuffer, bytesRead); 471 for (int i=0; i < errorStrs.length; i++) 472 { 473 logMessage("STDERR: " + errorStrs[i]); 474 processLine(errorStrs[i]); 475 } 476 } 477 } 478 } 479 480 if (shouldStop()) 481 { 482 try 483 { 484 stdOutStream.close(); 485 stdErrStream.close(); 486 } catch (Exception e) {} 487 488 process.destroy(); 489 logMessage("Terminated process because the client determined it " + 490 "should stop running."); 491 break; 492 } 493 494 try 495 { 496 int returnCode = process.exitValue(); 497 if (returnCode == 0) 498 { 499 logMessage("Command completed successfully (exit code 0)"); 500 } 501 else 502 { 503 logMessage("Command completed abnormally (exit code " + 504 returnCode + ")"); 505 indicateCompletedWithErrors(); 506 } 507 508 try 509 { 510 stdOutStream.close(); 511 stdErrStream.close(); 512 } catch (Exception e) {} 513 514 break; 515 } catch (IllegalThreadStateException itse) {} 516 517 try 518 { 519 Thread.sleep(100); 520 } catch (InterruptedException ie) {} 521 } 522 catch (IOException ioe) 523 { 524 boolean completedSuccessfully = false; 527 try 528 { 529 int returnCode = process.exitValue(); 530 completedSuccessfully = (returnCode == 0); 531 if (completedSuccessfully) 532 { 533 logMessage("Command completed successfully (exit code 0)"); 534 } 535 else 536 { 537 logMessage("Command completed abnormally (exit code " + returnCode + 538 ")"); 539 indicateCompletedWithErrors(); 540 } 541 } 542 catch (IllegalThreadStateException itse) 543 { 544 logMessage("Attempt to read process output failed: " + ioe); 545 indicateCompletedWithErrors(); 546 } 547 548 break; 549 } 550 } 551 552 553 averageRate.stopTracker(); 554 recentRate.stopTracker(); 555 entriesProcessed.stopTracker(); 556 hitRatio.stopTracker(); 557 } 558 559 560 561 572 private static String [] byteArrayToStrings(byte[] byteArray, int length) 573 { 574 ArrayList stringList = new ArrayList(); 575 576 String byteStr = new String (byteArray, 0, length); 577 StringTokenizer tokenizer = new StringTokenizer(byteStr, "\r\n"); 578 while (tokenizer.hasMoreTokens()) 579 { 580 stringList.add(tokenizer.nextToken()); 581 } 582 583 String [] returnStrings = new String [stringList.size()]; 584 stringList.toArray(returnStrings); 585 return returnStrings; 586 } 587 588 589 590 597 private void processLine(String line) 598 { 599 try 600 { 601 if (line.indexOf(" -- average rate ") < 0) 602 { 603 return; 604 } 605 606 int closeBracketPos = line.indexOf(']'); 607 String dateStr = line.substring(0, closeBracketPos+1); 608 Date logDate = dateFormat.parse(dateStr); 609 long logTime = logDate.getTime(); 610 611 612 int entriesStartPos = line.indexOf(": Processed ", closeBracketPos) + 12; 613 int entriesEndPos = line.indexOf(" entries", entriesStartPos); 614 int numEntries = Integer.parseInt(line.substring(entriesStartPos, 615 entriesEndPos)); 616 617 int avgStartPos = line.indexOf(" -- average rate ", entriesEndPos) + 618 17; 619 int avgEndPos = line.indexOf("/sec", avgStartPos); 620 double avgRate = Double.parseDouble(line.substring(avgStartPos, 621 avgEndPos)); 622 623 int rctStartPos = line.indexOf(", recent rate ", avgEndPos) + 14; 624 int rctEndPos = line.indexOf("/sec", rctStartPos); 625 double rctRate = Double.parseDouble(line.substring(rctStartPos, 626 rctEndPos)); 627 628 int hitStartPos = line.indexOf(", hit ratio ", rctEndPos) + 12; 629 int hitEndPos = line.indexOf("%", hitStartPos); 630 double hitRate = Double.parseDouble(line.substring(hitStartPos, 631 hitEndPos)); 632 633 writeVerbose("Line is \"" + line + "\""); 634 writeVerbose("Timestamp is " + logDate); 635 writeVerbose("Average rate is " + avgRate); 636 writeVerbose("Recent rate is " + rctRate); 637 writeVerbose("Entries processed is " + numEntries); 638 writeVerbose("Hit ratio is " + hitRate); 639 averageRate.update(logTime, avgRate); 640 recentRate.update(logTime, rctRate); 641 entriesProcessed.update(logTime, numEntries); 642 hitRatio.update(logTime, hitRate); 643 } 644 catch (Exception e) 645 { 646 writeVerbose("Unable to parse line \"" + line + "\" -- " + 647 stackTraceToString(e)); 648 } 649 } 650 } 651 652 | Popular Tags |