1 10 11 package org.mule.providers.ftp; 12 13 import java.io.FilenameFilter ; 14 import java.io.IOException ; 15 import java.util.ArrayList ; 16 import java.util.Collections ; 17 import java.util.HashSet ; 18 import java.util.List ; 19 import java.util.Set ; 20 21 import javax.resource.spi.work.Work ; 22 23 import org.apache.commons.io.output.ByteArrayOutputStream; 24 import org.apache.commons.net.ftp.FTPClient; 25 import org.apache.commons.net.ftp.FTPFile; 26 import org.apache.commons.net.ftp.FTPReply; 27 import org.mule.impl.MuleMessage; 28 import org.mule.providers.PollingMessageReceiver; 29 import org.mule.providers.file.FileConnector; 30 import org.mule.umo.UMOComponent; 31 import org.mule.umo.UMOMessage; 32 import org.mule.umo.endpoint.UMOEndpoint; 33 import org.mule.umo.endpoint.UMOEndpointURI; 34 import org.mule.umo.lifecycle.InitialisationException; 35 import org.mule.umo.provider.UMOConnector; 36 37 public class FtpMessageReceiver extends PollingMessageReceiver 38 { 39 protected final FtpConnector connector; 40 protected final FilenameFilter filenameFilter; 41 42 protected final Set scheduledFiles = Collections.synchronizedSet(new HashSet ()); 46 protected final Set currentFiles = Collections.synchronizedSet(new HashSet ()); 47 48 public FtpMessageReceiver(UMOConnector connector, 49 UMOComponent component, 50 UMOEndpoint endpoint, 51 Long frequency) throws InitialisationException 52 { 53 super(connector, component, endpoint, frequency); 54 this.connector = (FtpConnector)connector; 55 56 if (endpoint.getFilter() instanceof FilenameFilter ) 57 { 58 this.filenameFilter = (FilenameFilter )endpoint.getFilter(); 59 } 60 else 61 { 62 this.filenameFilter = null; 63 } 64 } 65 66 public void poll() throws Exception 67 { 68 FTPFile[] files = listFiles(); 69 70 synchronized (scheduledFiles) 71 { 72 for (int i = 0; i < files.length; i++) 73 { 74 final FTPFile file = files[i]; 75 final String fileName = file.getName(); 76 77 if (!scheduledFiles.contains(fileName) && !currentFiles.contains(fileName)) 78 { 79 scheduledFiles.add(fileName); 80 getWorkManager().scheduleWork(new FtpWork(fileName, file)); 81 } 82 } 83 } 84 } 85 86 protected FTPFile[] listFiles() throws Exception 87 { 88 final UMOEndpointURI uri = endpoint.getEndpointURI(); 89 FTPClient client = null; 90 91 try 92 { 93 client = connector.getFtp(uri); 94 connector.enterActiveOrPassiveMode(client, endpoint); 95 connector.setupFileType(client, endpoint); 96 97 if (!client.changeWorkingDirectory(uri.getPath())) 98 { 99 throw new IOException ("Ftp error: " + client.getReplyCode()); 100 } 101 102 FTPFile[] files = client.listFiles(); 103 104 if (!FTPReply.isPositiveCompletion(client.getReplyCode())) 105 { 106 throw new IOException ("Ftp error: " + client.getReplyCode()); 107 } 108 109 if (files == null || files.length == 0) 110 { 111 return files; 112 } 113 114 List v = new ArrayList (); 115 116 for (int i = 0; i < files.length; i++) 117 { 118 if (files[i].isFile()) 119 { 120 if (filenameFilter == null || filenameFilter.accept(null, files[i].getName())) 121 { 122 v.add(files[i]); 123 } 124 } 125 } 126 127 return (FTPFile[])v.toArray(new FTPFile[v.size()]); 128 } 129 finally 130 { 131 connector.releaseFtp(uri, client); 132 } 133 } 134 135 protected void processFile(FTPFile file) throws Exception 136 { 137 FTPClient client = null; 138 UMOEndpointURI uri = endpoint.getEndpointURI(); 139 140 try 141 { 142 client = connector.getFtp(uri); 143 connector.enterActiveOrPassiveMode(client, endpoint); 144 connector.setupFileType(client, endpoint); 145 146 if (!client.changeWorkingDirectory(endpoint.getEndpointURI().getPath())) 147 { 148 throw new IOException ("Ftp error: " + client.getReplyCode()); 149 } 150 151 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 152 if (!client.retrieveFile(file.getName(), baos)) 153 { 154 throw new IOException ("Ftp error: " + client.getReplyCode()); 155 } 156 157 UMOMessage message = new MuleMessage(connector.getMessageAdapter(baos.toByteArray())); 158 message.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, file.getName()); 159 routeMessage(message); 160 161 if (!client.deleteFile(file.getName())) 162 { 163 throw new IOException ("Ftp error: " + client.getReplyCode()); 164 } 165 } 166 finally 167 { 168 connector.releaseFtp(uri, client); 169 } 170 } 171 172 public void doConnect() throws Exception 173 { 174 FTPClient client = connector.getFtp(getEndpointURI()); 175 connector.releaseFtp(getEndpointURI(), client); 176 } 177 178 public void doDisconnect() throws Exception 179 { 180 } 182 183 private final class FtpWork implements Work 184 { 185 private final String _name; 186 private final FTPFile _file; 187 188 private FtpWork(String name, FTPFile file) 189 { 190 _name = name; 191 _file = file; 192 } 193 194 public void run() 195 { 196 try 197 { 198 currentFiles.add(_name); 199 processFile(_file); 200 } 201 catch (Exception e) 202 { 203 connector.handleException(e); 204 } 205 finally 206 { 207 currentFiles.remove(_name); 208 scheduledFiles.remove(_name); 209 } 210 } 211 212 public void release() 213 { 214 } 216 } 217 218 } 219 | Popular Tags |