KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > ftp > FtpMessageReceiver


1 /*
2  * $Id: FtpMessageReceiver.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.ftp;
12
13 import java.io.FilenameFilter JavaDoc;
14 import java.io.IOException JavaDoc;
15 import java.util.ArrayList JavaDoc;
16 import java.util.Collections JavaDoc;
17 import java.util.HashSet JavaDoc;
18 import java.util.List JavaDoc;
19 import java.util.Set JavaDoc;
20
21 import javax.resource.spi.work.Work JavaDoc;
22
23 import org.apache.commons.io.output.ByteArrayOutputStream;
24 import org.apache.commons.net.ftp.FTPClient;
25 import org.apache.commons.net.ftp.FTPFile;
26 import org.apache.commons.net.ftp.FTPReply;
27 import org.mule.impl.MuleMessage;
28 import org.mule.providers.PollingMessageReceiver;
29 import org.mule.providers.file.FileConnector;
30 import org.mule.umo.UMOComponent;
31 import org.mule.umo.UMOMessage;
32 import org.mule.umo.endpoint.UMOEndpoint;
33 import org.mule.umo.endpoint.UMOEndpointURI;
34 import org.mule.umo.lifecycle.InitialisationException;
35 import org.mule.umo.provider.UMOConnector;
36
37 public class FtpMessageReceiver extends PollingMessageReceiver
38 {
39     protected final FtpConnector connector;
40     protected final FilenameFilter JavaDoc filenameFilter;
41
42     // there's nothing like homegrown pseudo-2PC.. :/
43
// shared state management like this should go into the connector and use
44
// something like commons-tx
45
protected final Set JavaDoc scheduledFiles = Collections.synchronizedSet(new HashSet JavaDoc());
46     protected final Set JavaDoc currentFiles = Collections.synchronizedSet(new HashSet JavaDoc());
47
48     public FtpMessageReceiver(UMOConnector connector,
49                               UMOComponent component,
50                               UMOEndpoint endpoint,
51                               Long JavaDoc frequency) throws InitialisationException
52     {
53         super(connector, component, endpoint, frequency);
54         this.connector = (FtpConnector)connector;
55
56         if (endpoint.getFilter() instanceof FilenameFilter JavaDoc)
57         {
58             this.filenameFilter = (FilenameFilter JavaDoc)endpoint.getFilter();
59         }
60         else
61         {
62             this.filenameFilter = null;
63         }
64     }
65
66     public void poll() throws Exception JavaDoc
67     {
68         FTPFile[] files = listFiles();
69
70         synchronized (scheduledFiles)
71         {
72             for (int i = 0; i < files.length; i++)
73             {
74                 final FTPFile file = files[i];
75                 final String JavaDoc fileName = file.getName();
76
77                 if (!scheduledFiles.contains(fileName) && !currentFiles.contains(fileName))
78                 {
79                     scheduledFiles.add(fileName);
80                     getWorkManager().scheduleWork(new FtpWork(fileName, file));
81                 }
82             }
83         }
84     }
85
86     protected FTPFile[] listFiles() throws Exception JavaDoc
87     {
88         final UMOEndpointURI uri = endpoint.getEndpointURI();
89         FTPClient client = null;
90
91         try
92         {
93             client = connector.getFtp(uri);
94             connector.enterActiveOrPassiveMode(client, endpoint);
95             connector.setupFileType(client, endpoint);
96
97             if (!client.changeWorkingDirectory(uri.getPath()))
98             {
99                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
100             }
101
102             FTPFile[] files = client.listFiles();
103
104             if (!FTPReply.isPositiveCompletion(client.getReplyCode()))
105             {
106                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
107             }
108
109             if (files == null || files.length == 0)
110             {
111                 return files;
112             }
113
114             List JavaDoc v = new ArrayList JavaDoc();
115
116             for (int i = 0; i < files.length; i++)
117             {
118                 if (files[i].isFile())
119                 {
120                     if (filenameFilter == null || filenameFilter.accept(null, files[i].getName()))
121                     {
122                         v.add(files[i]);
123                     }
124                 }
125             }
126
127             return (FTPFile[])v.toArray(new FTPFile[v.size()]);
128         }
129         finally
130         {
131             connector.releaseFtp(uri, client);
132         }
133     }
134
135     protected void processFile(FTPFile file) throws Exception JavaDoc
136     {
137         FTPClient client = null;
138         UMOEndpointURI uri = endpoint.getEndpointURI();
139
140         try
141         {
142             client = connector.getFtp(uri);
143             connector.enterActiveOrPassiveMode(client, endpoint);
144             connector.setupFileType(client, endpoint);
145
146             if (!client.changeWorkingDirectory(endpoint.getEndpointURI().getPath()))
147             {
148                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
149             }
150
151             ByteArrayOutputStream baos = new ByteArrayOutputStream();
152             if (!client.retrieveFile(file.getName(), baos))
153             {
154                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
155             }
156
157             UMOMessage message = new MuleMessage(connector.getMessageAdapter(baos.toByteArray()));
158             message.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, file.getName());
159             routeMessage(message);
160
161             if (!client.deleteFile(file.getName()))
162             {
163                 throw new IOException JavaDoc("Ftp error: " + client.getReplyCode());
164             }
165         }
166         finally
167         {
168             connector.releaseFtp(uri, client);
169         }
170     }
171
172     public void doConnect() throws Exception JavaDoc
173     {
174         FTPClient client = connector.getFtp(getEndpointURI());
175         connector.releaseFtp(getEndpointURI(), client);
176     }
177
178     public void doDisconnect() throws Exception JavaDoc
179     {
180         // no op
181
}
182
183     private final class FtpWork implements Work JavaDoc
184     {
185         private final String JavaDoc _name;
186         private final FTPFile _file;
187
188         private FtpWork(String JavaDoc name, FTPFile file)
189         {
190             _name = name;
191             _file = file;
192         }
193
194         public void run()
195         {
196             try
197             {
198                 currentFiles.add(_name);
199                 processFile(_file);
200             }
201             catch (Exception JavaDoc e)
202             {
203                 connector.handleException(e);
204             }
205             finally
206             {
207                 currentFiles.remove(_name);
208                 scheduledFiles.remove(_name);
209             }
210         }
211
212         public void release()
213         {
214             // no op
215
}
216     }
217
218 }
219
Popular Tags