1 20 package org.openi.stat.r.rserve; 21 22 import org.apache.log4j.*; 23 import org.openi.stat.r.RClient; 24 import org.rosuda.JRclient.*; 25 import java.util.*; 26 import java.io.*; 27 import java.io.IOException ; 28 import org.openi.stat.r.RFunction; 29 import org.openi.stat.r.RFunctionParam; 30 31 38 public class RServeClient implements RClient { 39 private static Logger logger = Logger.getLogger(RServeClient.class); 40 private String serverIP; 41 private int serverPort; 42 44 45 50 public RServeClient(String rServerIP, int serverPort) throws RSrvException { 51 logger.debug("IP:" + rServerIP); 52 this.serverIP = rServerIP; 53 this.serverPort = serverPort; 54 55 } 56 57 63 64 public void execute(RFunction function, String outputPath) throws Exception { 65 66 String script = generateScript(function); 67 logger.debug("Generated Script: " + script); 68 Rconnection rConnection = new Rconnection(serverIP, serverPort); 69 String scriptContent = getContent(function.getFile()); 70 logger.debug("Executing R Script file"); 71 rConnection.voidEval(scriptContent); 72 73 if (function.getDataSetFile() != null && 74 !"".equals(function.getDataSetFile())) { 75 logger.debug("Sending dataSet file " + function.getDataSetFile() + 76 " to the R server"); 77 sendFileToServer(rConnection, function.getDataSetFile()); 78 } 79 80 logger.debug( 81 "Executing generated R Script to call method on uploaded R script"); 82 REXP xp = rConnection.eval(script); 83 84 if (function.getDataSetFile() != null && 85 !"".equals(function.getDataSetFile())) { 86 logger.debug("Removing dataSet file " + function.getDataSetFile() + 87 " from the R server"); 88 removeFileFromServer(rConnection, function.getDataSetFile()); 89 } 90 91 Vector output = xp.asVector(); 92 logger.debug("Saving " + output.size() + 93 " file(s) are generated by the R script to path " + 94 outputPath); 95 Iterator iter = output.iterator(); 96 97 while (iter.hasNext()) { 98 REXP xpFile = (REXP) iter.next(); 99 String file = xpFile.asString(); 100 logger.info("downloading '" + file + "' from r server"); 101 saveFileFromServer(rConnection, file, outputPath); 102 removeFileFromServer(rConnection, file); 103 } 104 105 logger.debug("closing R"); 106 rConnection.close(); 107 108 } 109 110 120 private String generateScript(RFunction function) { 121 StringBuffer generatedScript = new StringBuffer (); 122 String functionName = function.getName(); 123 StringBuffer paramString = new StringBuffer (); 124 Iterator iterator = function.getParams().iterator(); 125 126 if (function.getDataSetFile() != null && 127 !"".equals(function.getDataSetFile())) { 128 String dataSetFileName = new File(function.getDataSetFile()). 129 getName(); 130 generatedScript.append("data <- read.delim(\"" + dataSetFileName + 131 "\", "); 132 generatedScript.append("header=TRUE, sep=\"\\t\")"); 133 paramString.append("dataFrame=data"); 134 paramString.append(", "); 135 generatedScript.append("\n"); 136 137 } while (iterator.hasNext()) { 138 RFunctionParam param = (RFunctionParam) iterator.next(); 139 String paramVal = "String".equals(param.getType()) ? 140 "\"" + param.getValue() + "\"" : param.getValue(); 141 String paramWithVal = param.getName() + " = " + paramVal; 142 paramString.append(paramWithVal); 143 paramString.append(","); 144 } 145 146 if (paramString.lastIndexOf(",") != -1) { 147 paramString.deleteCharAt(paramString.lastIndexOf(",")); 148 } 149 150 generatedScript.append("filenames <- " + functionName + "("); 151 generatedScript.append(paramString.toString()); 152 generatedScript.append(")"); 153 154 return generatedScript.toString(); 155 } 156 157 163 private String getContent(String file) throws IOException { 164 StringBuffer buffer = new StringBuffer (); 165 BufferedReader input = new BufferedReader(new InputStreamReader(new 166 FileInputStream(file))); 167 while (input.ready()) { 168 buffer.append(input.readLine()); 169 buffer.append("\n"); 170 } 171 return buffer.toString(); 172 } 173 174 180 private void sendFileToServer(Rconnection rConnection, String fileName) throws 181 IOException { 182 File file = new File(fileName); 183 RFileOutputStream ros = rConnection.createFile(file.getName()); 184 185 BufferedInputStream bis = new BufferedInputStream(new FileInputStream( 186 file)); 187 188 byte[] buf = new byte[512]; 189 int n = 0; 190 191 while ((n = bis.read(buf)) != -1) { 192 ros.write(buf, 0, n); 193 } 194 195 ros.flush(); 196 ros.close(); 197 bis.close(); 198 199 } 200 201 208 private void saveFileFromServer(Rconnection rConnection, String fileName, 209 String pathToSave) throws Exception { 210 RFileInputStream ris = rConnection.openFile(fileName); 211 FileOutputStream outs = new FileOutputStream(pathToSave + "/" + 212 new File(fileName).getName()); 213 byte[] buf = new byte[512]; 214 int n = 0; 215 while ((n = ris.read(buf)) != -1) { 216 outs.write(buf, 0, n); 217 } 218 outs.flush(); 219 ris.close(); 220 outs.close(); 221 } 222 223 229 private void removeFileFromServer(Rconnection rConnection, String file) throws 230 RSrvException { 231 rConnection.removeFile(new File(file).getName()); 232 } 233 } 234 | Popular Tags |