1 2 3 4 package net.nutch.fs; 5 6 import java.io.*; 7 import java.net.*; 8 import java.util.*; 9 import java.text.*; 10 11 import net.nutch.io.*; 12 import net.nutch.ndfs.*; 13 14 19 public class NDFSFileSystem extends NutchFileSystem { 20 Random r = new Random(); 21 22 NDFSClient ndfs; 23 24 28 public NDFSFileSystem(InetSocketAddress namenode) throws IOException { 29 this.ndfs = new NDFSClient(namenode); 30 } 31 32 35 public NFSInputStream open(File f) throws IOException { 36 return ndfs.open(new UTF8(f.getPath())); 37 } 38 39 42 public NFSOutputStream create(File f) throws IOException { 43 return create(f, false); 44 } 45 47 public NFSOutputStream create(File f, boolean overwrite) throws IOException { 48 return ndfs.create(new UTF8(f.getPath()), overwrite); 49 } 50 51 54 public boolean rename(File src, File dst) throws IOException { 55 return ndfs.rename(new UTF8(src.getPath()), new UTF8(dst.getPath())); 56 } 57 58 61 public boolean delete(File f) throws IOException { 62 return ndfs.delete(new UTF8(f.getPath())); 63 } 64 65 67 public boolean exists(File f) throws IOException { 68 return ndfs.exists(new UTF8(f.getPath())); 69 } 70 71 73 public boolean isDirectory(File f) throws IOException { 74 return ndfs.isDirectory(new UTF8(f.getPath())); 75 } 76 77 79 public long getLength(File f) throws IOException { 80 NDFSFileInfo info[] = ndfs.listFiles(new UTF8(f.getPath())); 81 return info[0].getLen(); 82 } 83 84 86 public File[] listFiles(File f) throws IOException { 87 NDFSFileInfo info[] = ndfs.listFiles(new UTF8(f.getPath())); 88 File results[] = new NDFSFile[info.length]; 89 for (int i = 0; i < info.length; i++) { 90 results[i] = new NDFSFile(info[i]); 91 } 92 return results; 93 } 94 95 97 public void mkdirs(File f) throws IOException { 98 ndfs.mkdirs(new UTF8(f.getPath())); 99 } 100 101 104 public void lock(File f, boolean shared) throws IOException { 105 ndfs.lock(new UTF8(f.getPath()), ! shared); 106 } 107 108 111 public void release(File f) throws IOException { 112 ndfs.release(new UTF8(f.getPath())); 113 } 114 115 118 public void moveFromLocalFile(File src, File dst) throws IOException { 119 doFromLocalFile(src, dst, true); 120 } 121 122 125 public void copyFromLocalFile(File src, File dst) throws IOException { 126 doFromLocalFile(src, dst, false); 127 } 128 129 private void doFromLocalFile(File src, File dst, boolean deleteSource) throws IOException { 130 if (exists(dst)) { 131 if (! isDirectory(dst)) { 132 throw new IOException("Target " + dst + " already exists"); 133 } else { 134 dst = new File(dst, src.getName()); 135 if (exists(dst)) { 136 throw new IOException("Target " + dst + " already exists"); 137 } 138 } 139 } 140 141 if (src.isDirectory()) { 142 mkdirs(dst); 143 File contents[] = src.listFiles(); 144 for (int i = 0; i < contents.length; i++) { 145 doFromLocalFile(contents[i], new File(dst, contents[i].getName()), deleteSource); 146 } 147 } else { 148 byte buf[] = new byte[4096]; 149 InputStream in = new BufferedInputStream(new FileInputStream(src)); 150 try { 151 OutputStream out = create(dst); 152 try { 153 int bytesRead = in.read(buf); 154 while (bytesRead >= 0) { 155 out.write(buf, 0, bytesRead); 156 bytesRead = in.read(buf); 157 } 158 } finally { 159 out.close(); 160 } 161 } finally { 162 in.close(); 163 } 164 } 165 if (deleteSource) 166 src.delete(); 167 } 168 169 173 public void copyToLocalFile(File src, File dst) throws IOException { 174 if (dst.exists()) { 175 if (! dst.isDirectory()) { 176 throw new IOException("Target " + dst + " already exists"); 177 } else { 178 dst = new File(dst, src.getName()); 179 if (dst.exists()) { 180 throw new IOException("Target " + dst + " already exists"); 181 } 182 } 183 } 184 185 if (isDirectory(src)) { 186 dst.mkdirs(); 187 File contents[] = listFiles(src); 188 for (int i = 0; i < contents.length; i++) { 189 copyToLocalFile(contents[i], new File(dst, contents[i].getName())); 190 } 191 } else { 192 byte buf[] = new byte[4096]; 193 InputStream in = open(src); 194 try { 195 OutputStream out = new BufferedOutputStream(new FileOutputStream(dst)); 196 try { 197 int bytesRead = in.read(buf); 198 while (bytesRead >= 0) { 199 out.write(buf, 0, bytesRead); 200 bytesRead = in.read(buf); 201 } 202 } finally { 203 out.close(); 204 } 205 } finally { 206 in.close(); 207 } 208 } 209 } 210 211 215 public File startLocalOutput(File nfsOutputFile, File tmpLocalFile) throws IOException { 216 if (exists(nfsOutputFile)) { 217 copyToLocalFile(nfsOutputFile, tmpLocalFile); 218 } 219 return tmpLocalFile; 220 } 221 222 225 public void completeLocalOutput(File nfsOutputFile, File tmpLocalFile) throws IOException { 226 moveFromLocalFile(tmpLocalFile, nfsOutputFile); 227 } 228 229 232 public File startLocalInput(File nfsInputFile, File tmpLocalFile) throws IOException { 233 copyToLocalFile(nfsInputFile, tmpLocalFile); 234 return tmpLocalFile; 235 } 236 237 240 public void completeLocalInput(File localFile) throws IOException { 241 FileUtil.fullyDelete(localFile); 243 } 244 245 248 public File createTempFile(String prefix, String suffix, File directory) throws IOException { 249 if (directory == null) { 250 directory = new File("/tmp"); 251 } 252 253 while (true) { 254 int randomPart = Math.abs(r.nextInt()); 255 File f = new File(prefix + randomPart + suffix); 256 File tmpResult = new File(directory, f.getPath()); 257 if (! exists(tmpResult) && createNewFile(tmpResult)) { 258 return tmpResult; 259 } 260 } 261 } 262 263 266 public void close() throws IOException { 267 ndfs.close(); 268 } 269 270 272 public String toString() { 273 return "NDFS[" + ndfs + "]"; 274 } 275 276 278 public NDFSClient getClient() { 279 return ndfs; 280 } 281 } 282 | Popular Tags |