1 17 package org.apache.servicemix.components.net; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.commons.net.SocketClient; 22 import org.apache.commons.net.ftp.FTPClient; 23 import org.apache.commons.net.ftp.FTPFile; 24 import org.apache.servicemix.components.util.DefaultFileMarshaler; 25 import org.apache.servicemix.components.util.FileMarshaler; 26 import org.apache.servicemix.components.util.PollingComponentSupport; 27 28 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; 29 30 import javax.jbi.JBIException; 31 import javax.jbi.messaging.InOnly; 32 import javax.jbi.messaging.NormalizedMessage; 33 import javax.resource.spi.work.Work ; 34 import java.util.Set ; 35 import java.io.IOException ; 36 import java.io.InputStream ; 37 38 46 public class FTPPoller extends PollingComponentSupport { 47 private static final Log log = LogFactory.getLog(FTPPoller.class); 48 49 private FTPClientPool clientPool; 50 private String path; 51 private FileMarshaler marshaler = new DefaultFileMarshaler(); 52 private Set workingSet = new CopyOnWriteArraySet(); 53 54 private String getWorkingPath() { 55 return path == null ? "." : path; 56 } 57 58 public void poll() throws Exception { 59 FTPClient ftp = (FTPClient) borrowClient(); 60 try { 61 FTPFile[] files = ftp.listFiles(getWorkingPath()); 62 for (int i = 0; i < files.length; i++) { 63 final FTPFile file = files[i]; 64 workingSet.add(file); 65 getWorkManager().scheduleWork(new Work () { 66 public void run() { 67 processFile(file); 68 } 69 70 public void release() { 71 workingSet.remove(file); 72 } 73 }); 74 } 75 } 76 finally { 77 returnClient(ftp); 78 } 79 } 80 81 82 public FTPClientPool getClientPool() { 85 return clientPool; 86 } 87 88 public void setClientPool(FTPClientPool clientPool) { 89 this.clientPool = clientPool; 90 } 91 92 public String getPath() { 93 return path; 94 } 95 96 public void setPath(String path) { 97 this.path = path; 98 } 99 100 public FileMarshaler getMarshaler() { 101 return marshaler; 102 } 103 104 public void setMarshaler(FileMarshaler marshaler) { 105 this.marshaler = marshaler; 106 } 107 108 111 public Set getWorkingSet() { 112 return workingSet; 113 } 114 115 118 protected void init() throws JBIException { 119 if (clientPool == null) { 120 throw new IllegalArgumentException ("You must initialise the clientPool property"); 121 } 122 super.init(); 123 } 124 125 protected void processFile(FTPFile file) { 126 if (file.getName().equals(".") || file.getName().equals("..")) { return; 128 } 129 FTPClient client = null; 130 try { 131 client = (FTPClient) borrowClient(); 132 processFile(client, file); 133 if (!client.deleteFile(getWorkingPath() + file.getName())) { 134 throw new IOException ("Could not delete file " + file); 135 } 136 } 137 catch (Exception e) { 138 log.error("Failed to process file: " + file + ". Reason: " + e, e); 139 } 140 finally { 141 if (client != null) { 142 returnClient(client); 143 } 144 } 145 } 146 147 protected void processFile(FTPClient client, FTPFile file) throws Exception { 148 String name = file.getName(); 149 InputStream in = client.retrieveFileStream(getWorkingPath() + name); 150 client.completePendingCommand(); 151 InOnly exchange = getExchangeFactory().createInOnlyExchange(); 152 NormalizedMessage message = exchange.createMessage(); 153 exchange.setInMessage(message); 154 marshaler.readMessage(exchange, message, in, name); 155 getDeliveryChannel().sendSync(exchange); 156 in.close(); 157 } 158 159 160 protected SocketClient borrowClient() throws JBIException { 161 try { 162 return getClientPool().borrowClient(); 163 } 164 catch (Exception e) { 165 throw new JBIException(e); 166 } 167 } 168 169 protected void returnClient(SocketClient client) { 170 if (client != null) { 171 try { 172 getClientPool().returnClient(client); 173 } 174 catch (Exception e) { 175 log.error("Failed to return client to pool: " + e, e); 176 } 177 } 178 } 179 } 180 | Popular Tags |