1 17 package org.apache.servicemix.components.vfs; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.commons.vfs.FileContent; 22 import org.apache.commons.vfs.FileObject; 23 import org.apache.commons.vfs.FileSelector; 24 import org.apache.commons.vfs.FileSystemManager; 25 import org.apache.servicemix.components.util.DefaultFileMarshaler; 26 import org.apache.servicemix.components.util.FileMarshaler; 27 import org.apache.servicemix.components.util.PollingComponentSupport; 28 29 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; 30 31 import javax.jbi.JBIException; 32 import javax.jbi.messaging.RobustInOnly; 33 import javax.jbi.messaging.NormalizedMessage; 34 import javax.resource.spi.work.Work ; 35 36 import java.io.IOException ; 37 import java.io.InputStream ; 38 import java.util.Set ; 39 40 47 public class FilePoller extends PollingComponentSupport { 48 private static final Log log = LogFactory.getLog(FilePoller.class); 49 50 private FileMarshaler marshaler = new DefaultFileMarshaler(); 51 private FileObjectEditor editor = new FileObjectEditor(); 52 private FileObject directory; 53 private FileSelector selector; 54 private Set workingSet = new CopyOnWriteArraySet(); 55 private boolean deleteFile = true; 56 57 public void poll() throws Exception { 58 FileObject[] files = null; 59 directory.close(); 61 if (selector != null) { 62 files = directory.findFiles(selector); 63 } 64 else { 65 files = directory.getChildren(); 66 } 67 for (int i = 0; i < files.length; i++) { 68 final FileObject file = files[i]; 69 if (!workingSet.contains(file)) { 70 workingSet.add(file); 71 getWorkManager().scheduleWork(new Work () { 72 public void run() { 73 processFileAndDelete(file); 74 } 75 76 public void release() { 77 } 78 }); 79 } 80 } 81 } 82 83 84 public FileObject getDirectory() { 87 return directory; 88 } 89 90 public void setDirectory(FileObject directory) { 91 this.directory = directory; 92 } 93 94 public FileSelector getSelector() { 95 return selector; 96 } 97 98 public void setSelector(FileSelector selector) { 99 this.selector = selector; 100 } 101 102 public String getPath() { 103 return editor.getPath(); 104 } 105 106 public void setPath(String path) { 107 editor.setPath(path); 108 } 109 110 113 public boolean isDeleteFile() { 114 return deleteFile; 115 } 116 117 public void setDeleteFile(boolean deleteFile) { 118 this.deleteFile = deleteFile; 119 } 120 121 public FileSystemManager getFileSystemManager() { 122 return editor.getFileSystemManager(); 123 } 124 125 public void setFileSystemManager(FileSystemManager fileSystemManager) { 126 editor.setFileSystemManager(fileSystemManager); 127 } 128 129 public FileMarshaler getMarshaler() { 130 return marshaler; 131 } 132 133 public void setMarshaler(FileMarshaler marshaler) { 134 this.marshaler = marshaler; 135 } 136 137 142 public Set getWorkingSet() { 143 return workingSet; 144 } 145 146 protected void init() throws JBIException { 149 if (directory == null) { 150 directory = editor.getFileObject(); 151 } 152 super.init(); 153 } 154 155 protected void processFileAndDelete(FileObject file) { 156 try { 157 processFile(file); 158 if (isDeleteFile()) { 159 if (!file.delete()) { 160 throw new IOException ("Could not delete file " + file); 161 } 162 } 163 } 164 catch (Exception e) { 165 log.error("Failed to process file: " + file + ". Reason: " + e, e); 166 } 167 finally { 168 workingSet.remove(file); 169 } 170 } 171 172 protected void processFile(FileObject file) throws Exception { 173 file.close(); 175 String name = file.getName().getURI(); 176 FileContent content = file.getContent(); 177 content.close(); 178 InputStream in = content.getInputStream(); 179 if (in == null) { 180 throw new JBIException("No input available for file!"); 181 } 182 RobustInOnly exchange = getExchangeFactory().createRobustInOnlyExchange(); 183 NormalizedMessage message = exchange.createMessage(); 184 exchange.setInMessage(message); 185 marshaler.readMessage(exchange, message, in, name); 186 getDeliveryChannel().sendSync(exchange); 187 in.close(); 188 content.close(); 189 if (exchange.getError() != null) { 190 throw exchange.getError(); 191 } 192 } 193 } 194 | Popular Tags |