1 10 11 package org.mule.providers.ftp; 12 13 import java.io.FilenameFilter ; 14 import java.io.IOException ; 15 import java.io.OutputStream ; 16 import java.util.ArrayList ; 17 import java.util.List ; 18 19 import org.apache.commons.io.IOUtils; 20 import org.apache.commons.io.output.ByteArrayOutputStream; 21 import org.apache.commons.net.ftp.FTPClient; 22 import org.apache.commons.net.ftp.FTPFile; 23 import org.apache.commons.net.ftp.FTPReply; 24 import org.mule.config.i18n.Message; 25 import org.mule.config.i18n.Messages; 26 import org.mule.impl.MuleMessage; 27 import org.mule.providers.AbstractMessageDispatcher; 28 import org.mule.umo.UMOEvent; 29 import org.mule.umo.UMOException; 30 import org.mule.umo.UMOMessage; 31 import org.mule.umo.endpoint.UMOEndpointURI; 32 import org.mule.umo.endpoint.UMOImmutableEndpoint; 33 import org.mule.umo.provider.DispatchException; 34 35 public class FtpMessageDispatcher extends AbstractMessageDispatcher 36 { 37 protected final FtpConnector connector; 38 39 public FtpMessageDispatcher(UMOImmutableEndpoint endpoint) 40 { 41 super(endpoint); 42 this.connector = (FtpConnector)endpoint.getConnector(); 43 } 44 45 protected void doDispose() 46 { 47 } 49 50 protected void doDispatch(UMOEvent event) throws Exception 51 { 52 FTPClient client = null; 53 UMOEndpointURI uri = event.getEndpoint().getEndpointURI(); 54 55 try 56 { 57 Object data = event.getTransformedMessage(); 58 byte[] dataBytes; 59 60 if (data instanceof byte[]) 61 { 62 dataBytes = (byte[])data; 63 } 64 else 65 { 66 dataBytes = data.toString().getBytes(); 67 } 68 69 FtpOutputStreamWrapper out = (FtpOutputStreamWrapper)getOutputStream(event.getEndpoint(), 70 event.getMessage()); 71 client = out.getFtpClient(); 72 IOUtils.write(dataBytes, out); 73 out.close(); 75 } 76 finally 77 { 78 connector.releaseFtp(uri, client); 79 } 80 } 81 82 92 public OutputStream getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message) 93 throws UMOException 94 { 95 FTPClient client = null; 96 97 UMOEndpointURI uri = endpoint.getEndpointURI(); 98 String filename = (String )message.getProperty(FtpConnector.PROPERTY_FILENAME); 99 100 try 101 { 102 if (filename == null) 103 { 104 String outPattern = (String )endpoint.getProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN); 105 if (outPattern == null){ 106 outPattern = message.getStringProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN, 107 connector.getOutputPattern()); 108 } 109 filename = generateFilename(message, outPattern); 110 } 111 112 if (filename == null) 113 { 114 throw new IOException ("Filename is null"); 115 } 116 117 client = connector.getFtp(uri); 118 connector.enterActiveOrPassiveMode(client, endpoint); 119 connector.setupFileType(client, endpoint); 120 if (!client.changeWorkingDirectory(uri.getPath())) 121 { 122 throw new IOException ("Ftp error: " + client.getReplyCode()); 123 } 124 OutputStream out = client.storeFileStream(filename); 125 return new FtpOutputStreamWrapper(client, out); 128 } 129 catch (Exception e) 130 { 131 throw new DispatchException(new Message(Messages.STREAMING_FAILED_NO_STREAM), message, endpoint, 132 e); 133 } 134 } 135 136 protected UMOMessage doSend(UMOEvent event) throws Exception 137 { 138 doDispatch(event); 139 return event.getMessage(); 140 } 141 142 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 143 { 144 FTPClient client = connector.getFtp(endpoint.getEndpointURI()); 145 connector.releaseFtp(endpoint.getEndpointURI(), client); 146 } 147 148 protected void doDisconnect() throws Exception 149 { 150 FTPClient client = connector.getFtp(endpoint.getEndpointURI()); 151 connector.destroyFtp(endpoint.getEndpointURI(), client); 152 } 153 154 166 protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 167 { 168 169 FTPClient client = null; 170 try 171 { 172 173 client = connector.getFtp(endpoint.getEndpointURI()); 174 connector.enterActiveOrPassiveMode(client, endpoint); 176 connector.setupFileType(client, endpoint); 177 if (!client.changeWorkingDirectory(endpoint.getEndpointURI().getPath())) 178 { 179 throw new IOException ("Ftp error: " + client.getReplyCode()); 180 } 181 182 FilenameFilter filenameFilter = null; 183 if (endpoint.getFilter() instanceof FilenameFilter ) 184 { 185 filenameFilter = (FilenameFilter )endpoint.getFilter(); 186 } 187 188 FTPFile[] files = client.listFiles(); 189 if (!FTPReply.isPositiveCompletion(client.getReplyCode())) 190 { 191 throw new IOException ("Ftp error: " + client.getReplyCode()); 192 } 193 if (files == null || files.length == 0) 194 { 195 return null; 196 } 197 List fileList = new ArrayList (); 198 for (int i = 0; i < files.length; i++) 199 { 200 if (files[i].isFile()) 201 { 202 if (filenameFilter == null || filenameFilter.accept(null, files[i].getName())) 203 { 204 fileList.add(files[i]); 205 break; 207 } 208 } 209 } 210 if (fileList.size() == 0) 211 { 212 return null; 213 } 214 215 FTPFile file = (FTPFile)fileList.get(0); 216 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 217 if (!client.retrieveFile(file.getName(), baos)) 218 { 219 throw new IOException ("Ftp error: " + client.getReplyCode()); 220 } 221 return new MuleMessage(connector.getMessageAdapter(baos.toByteArray())); 222 223 } 224 finally 225 { 226 connector.releaseFtp(endpoint.getEndpointURI(), client); 227 } 228 } 229 230 public Object getDelegateSession() throws UMOException 231 { 232 return null; 233 } 234 235 private String generateFilename(UMOMessage message, String pattern) 236 { 237 if (pattern == null) 238 { 239 pattern = connector.getOutputPattern(); 240 } 241 return connector.getFilenameParser().getFilename(message, pattern); 242 } 243 244 class FtpOutputStreamWrapper extends OutputStream 245 { 246 private final FTPClient client; 247 private final OutputStream out; 248 249 public FtpOutputStreamWrapper(FTPClient client, OutputStream out) 250 { 251 this.client = client; 252 this.out = out; 253 } 254 255 public void write(int b) throws IOException 256 { 257 out.write(b); 258 } 259 260 public void write(byte b[]) throws IOException 261 { 262 out.write(b); 263 } 264 265 public void write(byte b[], int off, int len) throws IOException 266 { 267 out.write(b, off, len); 268 } 269 270 public void flush() throws IOException 271 { 272 out.flush(); 273 } 274 275 public void close() throws IOException 276 { 277 try 278 { 279 out.close(); 281 282 if (!client.completePendingCommand()) 283 { 284 client.logout(); 285 client.disconnect(); 286 throw new IOException ("FTP Stream failed to complete pending request"); 287 } 288 } 289 finally 290 { 291 out.close(); 292 super.close(); 293 } 294 } 295 296 FTPClient getFtpClient() 297 { 298 return client; 299 } 300 } 301 302 } 303 | Popular Tags |