1 17 package org.apache.servicemix.components.file; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.servicemix.components.util.DefaultFileMarshaler; 22 import org.apache.servicemix.components.util.FileMarshaler; 23 import org.apache.servicemix.components.util.PollingComponentSupport; 24 25 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; 26 27 import javax.jbi.JBIException; 28 import javax.jbi.messaging.InOnly; 29 import javax.jbi.messaging.NormalizedMessage; 30 import javax.resource.spi.work.Work ; 31 import javax.resource.spi.work.WorkException ; 32 import java.io.BufferedInputStream ; 33 import java.io.File ; 34 import java.io.FileFilter ; 35 import java.io.FileInputStream ; 36 import java.io.IOException ; 37 import java.io.InputStream ; 38 import java.util.Set ; 39 40 47 public class FilePoller extends PollingComponentSupport { 48 private static final Log log = LogFactory.getLog(FilePoller.class); 49 50 private File file; 51 private FileFilter filter; 52 private boolean deleteFile = true; 53 private boolean recursive = true; 54 private boolean autoCreateDirectory = true; 55 private FileMarshaler marshaler = new DefaultFileMarshaler(); 56 private Set workingSet = new CopyOnWriteArraySet(); 57 58 public void poll() throws Exception { 59 pollFileOrDirectory(file); 60 } 61 62 public File getFile() { 65 return file; 66 } 67 68 73 public void setFile(File file) { 74 this.file = file; 75 } 76 77 public FileFilter getFilter() { 78 return filter; 79 } 80 81 84 public void setFilter(FileFilter filter) { 85 this.filter = filter; 86 } 87 88 91 public boolean isDeleteFile() { 92 return deleteFile; 93 } 94 95 public void setDeleteFile(boolean deleteFile) { 96 this.deleteFile = deleteFile; 97 } 98 99 public boolean isRecursive() { 100 return recursive; 101 } 102 103 public void setRecursive(boolean recursive) { 104 this.recursive = recursive; 105 } 106 107 public boolean isAutoCreateDirectory() { 108 return autoCreateDirectory; 109 } 110 111 public void setAutoCreateDirectory(boolean autoCreateDirectory) { 112 this.autoCreateDirectory = autoCreateDirectory; 113 } 114 115 public FileMarshaler getMarshaler() { 116 return marshaler; 117 } 118 119 public void setMarshaler(FileMarshaler marshaler) { 120 this.marshaler = marshaler; 121 } 122 123 128 public Set getWorkingSet() { 129 return workingSet; 130 } 131 132 protected void init() throws JBIException { 135 if (file == null) { 136 throw new IllegalArgumentException ("You must specify a file property"); 137 } 138 if (isAutoCreateDirectory() && !file.exists()) { 139 file.mkdirs(); 140 } 141 super.init(); 142 } 143 144 protected void pollFileOrDirectory(File fileOrDirectory) throws WorkException { 145 pollFileOrDirectory(fileOrDirectory, true); 146 } 147 148 protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) throws WorkException { 149 if (!fileOrDirectory.isDirectory()) { 150 pollFile(fileOrDirectory); } else if (processDir) { 152 log.debug("Polling directory " + fileOrDirectory); 153 File [] files = fileOrDirectory.listFiles(getFilter()); 154 for (int i = 0; i < files.length; i++) { 155 pollFileOrDirectory(files[i], isRecursive()); } 157 } else { 158 log.debug("Skipping directory " + fileOrDirectory); 159 } 160 } 161 162 protected void pollFile(final File aFile) throws WorkException { 163 if (!workingSet.contains(aFile)) { 164 workingSet.add(aFile); 165 if (log.isDebugEnabled()) { 166 log.debug("Scheduling file " + aFile + " for processing"); 167 } 168 getWorkManager().scheduleWork(new Work () { 169 public void run() { 170 processFileAndDelete(aFile); 171 } 172 173 public void release() { 174 } 175 }); 176 } 177 } 178 179 protected void processFileAndDelete(File aFile) { 180 try { 181 if (log.isDebugEnabled()) { 182 log.debug("Processing file " + aFile); 183 } 184 if (aFile.exists()) { 185 processFile(aFile); 186 if (isDeleteFile()) { 187 if (!aFile.delete()) { 188 throw new IOException ("Could not delete file " + aFile); 189 } 190 } 191 } 192 } 193 catch (Exception e) { 194 log.error("Failed to process file: " + aFile + ". Reason: " + e, e); 195 } 196 finally { 197 workingSet.remove(aFile); 198 } 199 } 200 201 protected void processFile(File aFile) throws Exception { 202 String name = aFile.getCanonicalPath(); 203 InputStream in = new BufferedInputStream (new FileInputStream (aFile)); 204 InOnly exchange = getExchangeFactory().createInOnlyExchange(); 205 NormalizedMessage message = exchange.createMessage(); 206 exchange.setInMessage(message); 207 marshaler.readMessage(exchange, message, in, name); 208 getDeliveryChannel().sendSync(exchange); 209 in.close(); 210 } 211 } 212 | Popular Tags |