KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > components > net > FTPPoller


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.components.net;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.apache.commons.net.SocketClient;
22 import org.apache.commons.net.ftp.FTPClient;
23 import org.apache.commons.net.ftp.FTPFile;
24 import org.apache.servicemix.components.util.DefaultFileMarshaler;
25 import org.apache.servicemix.components.util.FileMarshaler;
26 import org.apache.servicemix.components.util.PollingComponentSupport;
27
28 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
29
30 import javax.jbi.JBIException;
31 import javax.jbi.messaging.InOnly;
32 import javax.jbi.messaging.NormalizedMessage;
33 import javax.resource.spi.work.Work JavaDoc;
34 import java.util.Set JavaDoc;
35 import java.io.IOException JavaDoc;
36 import java.io.InputStream JavaDoc;
37
38 /**
39  * A component which polls for files to arrive on an FTP server
40  * using the <a HREF="http://jakarta.apache.org/commons/net.html">Jakarta Commons Net</a> library
41  * and then sends them into the normalized message service, using a plugable transformer
42  * and removes them.
43  *
44  * @version $Revision: 426415 $
45  */

46 public class FTPPoller extends PollingComponentSupport {
47     private static final Log log = LogFactory.getLog(FTPPoller.class);
48
49     private FTPClientPool clientPool;
50     private String JavaDoc path;
51     private FileMarshaler marshaler = new DefaultFileMarshaler();
52     private Set workingSet = new CopyOnWriteArraySet();
53
54     private String JavaDoc getWorkingPath() {
55       return path == null ? "." : path;
56     }
57
58     public void poll() throws Exception JavaDoc {
59         FTPClient ftp = (FTPClient) borrowClient();
60         try {
61             FTPFile[] files = ftp.listFiles(getWorkingPath());
62             for (int i = 0; i < files.length; i++) {
63                 final FTPFile file = files[i];
64                 workingSet.add(file);
65                 getWorkManager().scheduleWork(new Work JavaDoc() {
66                     public void run() {
67                         processFile(file);
68                     }
69
70                     public void release() {
71                         workingSet.remove(file);
72                     }
73                 });
74             }
75         }
76         finally {
77             returnClient(ftp);
78         }
79     }
80
81
82     // Properties
83
//-------------------------------------------------------------------------
84
public FTPClientPool getClientPool() {
85         return clientPool;
86     }
87
88     public void setClientPool(FTPClientPool clientPool) {
89         this.clientPool = clientPool;
90     }
91
92     public String JavaDoc getPath() {
93         return path;
94     }
95
96     public void setPath(String JavaDoc path) {
97         this.path = path;
98     }
99
100     public FileMarshaler getMarshaler() {
101         return marshaler;
102     }
103
104     public void setMarshaler(FileMarshaler marshaler) {
105         this.marshaler = marshaler;
106     }
107
108     /**
109      * The set of FTPFiles that this component is currently working on
110      */

111     public Set getWorkingSet() {
112         return workingSet;
113     }
114
115     // Implementation methods
116
//-------------------------------------------------------------------------
117

118     protected void init() throws JBIException {
119         if (clientPool == null) {
120             throw new IllegalArgumentException JavaDoc("You must initialise the clientPool property");
121         }
122         super.init();
123     }
124
125     protected void processFile(FTPFile file) {
126         if (file.getName().equals(".") || file.getName().equals("..")) { // TODO: what about other directories?
127
return;
128         }
129         FTPClient client = null;
130         try {
131             client = (FTPClient) borrowClient();
132             processFile(client, file);
133             if (!client.deleteFile(getWorkingPath() + file.getName())) {
134                 throw new IOException JavaDoc("Could not delete file " + file);
135             }
136         }
137         catch (Exception JavaDoc e) {
138             log.error("Failed to process file: " + file + ". Reason: " + e, e);
139         }
140         finally {
141             if (client != null) {
142                 returnClient(client);
143             }
144         }
145     }
146
147     protected void processFile(FTPClient client, FTPFile file) throws Exception JavaDoc {
148         String JavaDoc name = file.getName();
149         InputStream JavaDoc in = client.retrieveFileStream(getWorkingPath() + name);
150         client.completePendingCommand();
151         InOnly exchange = getExchangeFactory().createInOnlyExchange();
152         NormalizedMessage message = exchange.createMessage();
153         exchange.setInMessage(message);
154         marshaler.readMessage(exchange, message, in, name);
155         getDeliveryChannel().sendSync(exchange);
156         in.close();
157     }
158
159
160     protected SocketClient borrowClient() throws JBIException {
161         try {
162             return getClientPool().borrowClient();
163         }
164         catch (Exception JavaDoc e) {
165             throw new JBIException(e);
166         }
167     }
168
169     protected void returnClient(SocketClient client) {
170         if (client != null) {
171             try {
172                 getClientPool().returnClient(client);
173             }
174             catch (Exception JavaDoc e) {
175                 log.error("Failed to return client to pool: " + e, e);
176             }
177         }
178     }
179 }
180
Popular Tags