KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > ftp > FtpMessageDispatcher


1 /*
2  * $Id: FtpMessageDispatcher.java 4310 2006-12-19 12:34:06Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.ftp;
12
13 import java.io.FilenameFilter JavaDoc;
14 import java.io.IOException JavaDoc;
15 import java.io.OutputStream JavaDoc;
16 import java.util.ArrayList JavaDoc;
17 import java.util.List JavaDoc;
18
19 import org.apache.commons.io.IOUtils;
20 import org.apache.commons.io.output.ByteArrayOutputStream;
21 import org.apache.commons.net.ftp.FTPClient;
22 import org.apache.commons.net.ftp.FTPFile;
23 import org.apache.commons.net.ftp.FTPReply;
24 import org.mule.config.i18n.Message;
25 import org.mule.config.i18n.Messages;
26 import org.mule.impl.MuleMessage;
27 import org.mule.providers.AbstractMessageDispatcher;
28 import org.mule.umo.UMOEvent;
29 import org.mule.umo.UMOException;
30 import org.mule.umo.UMOMessage;
31 import org.mule.umo.endpoint.UMOEndpointURI;
32 import org.mule.umo.endpoint.UMOImmutableEndpoint;
33 import org.mule.umo.provider.DispatchException;
34
35 public class FtpMessageDispatcher extends AbstractMessageDispatcher
36 {
37     protected final FtpConnector connector;
38
39     public FtpMessageDispatcher(UMOImmutableEndpoint endpoint)
40     {
41         super(endpoint);
42         this.connector = (FtpConnector)endpoint.getConnector();
43     }
44
45     protected void doDispose()
46     {
47         // no op
48
}
49
50     protected void doDispatch(UMOEvent event) throws Exception JavaDoc
51     {
52         FTPClient client = null;
53         UMOEndpointURI uri = event.getEndpoint().getEndpointURI();
54
55         try
56         {
57             Object JavaDoc data = event.getTransformedMessage();
58             byte[] dataBytes;
59
60             if (data instanceof byte[])
61             {
62                 dataBytes = (byte[])data;
63             }
64             else
65             {
66                 dataBytes = data.toString().getBytes();
67             }
68
69             FtpOutputStreamWrapper out = (FtpOutputStreamWrapper)getOutputStream(event.getEndpoint(),
70                 event.getMessage());
71             client = out.getFtpClient();
72             IOUtils.write(dataBytes, out);
73             // This will ensure that the completePendingRequest is called
74
out.close();
75         }
76         finally
77         {
78             connector.releaseFtp(uri, client);
79         }
80     }
81
82     /**
83      * Well get the output stream (if any) for this type of transport. Typically this
84      * will be called only when Streaming is being used on an outbound endpoint
85      *
86      * @param endpoint the endpoint that releates to this Dispatcher
87      * @param message the current message being processed
88      * @return the output stream to use for this request or null if the transport
89      * does not support streaming
90      * @throws org.mule.umo.UMOException
91      */

92     public OutputStream JavaDoc getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message)
93         throws UMOException
94     {
95         FTPClient client = null;
96
97         UMOEndpointURI uri = endpoint.getEndpointURI();
98         String JavaDoc filename = (String JavaDoc)message.getProperty(FtpConnector.PROPERTY_FILENAME);
99
100         try
101         {
102             if (filename == null)
103             {
104                 String JavaDoc outPattern = (String JavaDoc)endpoint.getProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN);
105                 if (outPattern == null){
106                     outPattern = message.getStringProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN,
107                     connector.getOutputPattern());
108                 }
109                 filename = generateFilename(message, outPattern);
110             }
111
112             if (filename == null)
113             {
114                 throw new IOException JavaDoc("Filename is null");
115             }
116
117             client = connector.getFtp(uri);
118             connector.enterActiveOrPassiveMode(client, endpoint);
119             connector.setupFileType(client, endpoint);
120             if (!client.changeWorkingDirectory(uri.getPath()))
121             {
122                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
123             }
124             OutputStream JavaDoc out = client.storeFileStream(filename);
125             // We wrap the ftp outputstream to ensure that the
126
// completePendingRequest() method is called when the stream is closed
127
return new FtpOutputStreamWrapper(client, out);
128         }
129         catch (Exception JavaDoc e)
130         {
131             throw new DispatchException(new Message(Messages.STREAMING_FAILED_NO_STREAM), message, endpoint,
132                 e);
133         }
134     }
135
136     protected UMOMessage doSend(UMOEvent event) throws Exception JavaDoc
137     {
138         doDispatch(event);
139         return event.getMessage();
140     }
141
142     protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception JavaDoc
143     {
144         FTPClient client = connector.getFtp(endpoint.getEndpointURI());
145         connector.releaseFtp(endpoint.getEndpointURI(), client);
146     }
147
148     protected void doDisconnect() throws Exception JavaDoc
149     {
150         FTPClient client = connector.getFtp(endpoint.getEndpointURI());
151         connector.destroyFtp(endpoint.getEndpointURI(), client);
152     }
153
154     /**
155      * Make a specific request to the underlying transport
156      *
157      * @param endpoint the endpoint to use when connecting to the resource
158      * @param timeout the maximum time the operation should block before returning.
159      * The call should return immediately if there is data available. If
160      * no data becomes available before the timeout elapses, null will be
161      * returned
162      * @return the result of the request wrapped in a UMOMessage object. Null will be
163      * returned if no data was avaialable
164      * @throws Exception if the call to the underlying protocal cuases an exception
165      */

166     protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc
167     {
168
169         FTPClient client = null;
170         try
171         {
172
173             client = connector.getFtp(endpoint.getEndpointURI());
174             // not sure this getParams() will always work, there's a todo in the code
175
connector.enterActiveOrPassiveMode(client, endpoint);
176             connector.setupFileType(client, endpoint);
177             if (!client.changeWorkingDirectory(endpoint.getEndpointURI().getPath()))
178             {
179                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
180             }
181
182             FilenameFilter JavaDoc filenameFilter = null;
183             if (endpoint.getFilter() instanceof FilenameFilter JavaDoc)
184             {
185                 filenameFilter = (FilenameFilter JavaDoc)endpoint.getFilter();
186             }
187
188             FTPFile[] files = client.listFiles();
189             if (!FTPReply.isPositiveCompletion(client.getReplyCode()))
190             {
191                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
192             }
193             if (files == null || files.length == 0)
194             {
195                 return null;
196             }
197             List JavaDoc fileList = new ArrayList JavaDoc();
198             for (int i = 0; i < files.length; i++)
199             {
200                 if (files[i].isFile())
201                 {
202                     if (filenameFilter == null || filenameFilter.accept(null, files[i].getName()))
203                     {
204                         fileList.add(files[i]);
205                         // only read the first one
206
break;
207                     }
208                 }
209             }
210             if (fileList.size() == 0)
211             {
212                 return null;
213             }
214
215             FTPFile file = (FTPFile)fileList.get(0);
216             ByteArrayOutputStream baos = new ByteArrayOutputStream();
217             if (!client.retrieveFile(file.getName(), baos))
218             {
219                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
220             }
221             return new MuleMessage(connector.getMessageAdapter(baos.toByteArray()));
222
223         }
224         finally
225         {
226             connector.releaseFtp(endpoint.getEndpointURI(), client);
227         }
228     }
229
230     public Object JavaDoc getDelegateSession() throws UMOException
231     {
232         return null;
233     }
234
235     private String JavaDoc generateFilename(UMOMessage message, String JavaDoc pattern)
236     {
237         if (pattern == null)
238         {
239             pattern = connector.getOutputPattern();
240         }
241         return connector.getFilenameParser().getFilename(message, pattern);
242     }
243
244     class FtpOutputStreamWrapper extends OutputStream JavaDoc
245     {
246         private final FTPClient client;
247         private final OutputStream JavaDoc out;
248
249         public FtpOutputStreamWrapper(FTPClient client, OutputStream JavaDoc out)
250         {
251             this.client = client;
252             this.out = out;
253         }
254
255         public void write(int b) throws IOException JavaDoc
256         {
257             out.write(b);
258         }
259
260         public void write(byte b[]) throws IOException JavaDoc
261         {
262             out.write(b);
263         }
264
265         public void write(byte b[], int off, int len) throws IOException JavaDoc
266         {
267             out.write(b, off, len);
268         }
269
270         public void flush() throws IOException JavaDoc
271         {
272             out.flush();
273         }
274
275         public void close() throws IOException JavaDoc
276         {
277             try
278             {
279                 // close output stream
280
out.close();
281
282                 if (!client.completePendingCommand())
283                 {
284                     client.logout();
285                     client.disconnect();
286                     throw new IOException JavaDoc("FTP Stream failed to complete pending request");
287                 }
288             }
289             finally
290             {
291                 out.close();
292                 super.close();
293             }
294         }
295
296         FTPClient getFtpClient()
297         {
298             return client;
299         }
300     }
301
302 }
303
Popular Tags