KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > file > FileMessageReceiver


1 /*
2  * $Id: FileMessageReceiver.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.file;
12
13 import java.io.File JavaDoc;
14 import java.io.FileInputStream JavaDoc;
15 import java.io.FileOutputStream JavaDoc;
16 import java.io.FilenameFilter JavaDoc;
17 import java.io.IOException JavaDoc;
18 import java.io.RandomAccessFile JavaDoc;
19 import java.nio.channels.FileChannel JavaDoc;
20 import java.nio.channels.FileLock JavaDoc;
21
22 import org.apache.commons.io.IOUtils;
23 import org.mule.MuleException;
24 import org.mule.config.i18n.Message;
25 import org.mule.config.i18n.Messages;
26 import org.mule.impl.MuleMessage;
27 import org.mule.providers.ConnectException;
28 import org.mule.providers.PollingMessageReceiver;
29 import org.mule.umo.UMOComponent;
30 import org.mule.umo.UMOException;
31 import org.mule.umo.endpoint.UMOEndpoint;
32 import org.mule.umo.lifecycle.InitialisationException;
33 import org.mule.umo.provider.UMOConnector;
34 import org.mule.umo.provider.UMOMessageAdapter;
35 import org.mule.umo.routing.RoutingException;
36 import org.mule.util.FileUtils;
37
38 /**
39  * <code>FileMessageReceiver</code> is a polling listener that reads files from a
40  * directory.
41  */

42
43 public class FileMessageReceiver extends PollingMessageReceiver
44 {
45     private String JavaDoc readDir = null;
46     private String JavaDoc moveDir = null;
47     private File JavaDoc readDirectory = null;
48     private File JavaDoc moveDirectory = null;
49     private String JavaDoc moveToPattern = null;
50     private FilenameFilter JavaDoc filenameFilter = null;
51
52     public FileMessageReceiver(UMOConnector connector,
53                                UMOComponent component,
54                                UMOEndpoint endpoint,
55                                String JavaDoc readDir,
56                                String JavaDoc moveDir,
57                                String JavaDoc moveToPattern,
58                                Long JavaDoc frequency) throws InitialisationException
59     {
60         super(connector, component, endpoint, frequency);
61         this.readDir = readDir;
62         this.moveDir = moveDir;
63         this.moveToPattern = moveToPattern;
64         if (endpoint.getFilter() instanceof FilenameFilter JavaDoc)
65         {
66             filenameFilter = (FilenameFilter JavaDoc)endpoint.getFilter();
67         }
68     }
69
70     public void doConnect() throws Exception JavaDoc
71     {
72         if (readDir != null)
73         {
74             readDirectory = FileUtils.openDirectory(readDir);
75             if (!(readDirectory.canRead()))
76             {
77                 throw new ConnectException(new Message(Messages.FILE_X_DOES_NOT_EXIST,
78                     readDirectory.getAbsolutePath()), this);
79             }
80             else
81             {
82                 logger.debug("Listening on endpointUri: " + readDirectory.getAbsolutePath());
83             }
84         }
85
86         if (moveDir != null)
87         {
88             moveDirectory = FileUtils.openDirectory((moveDir));
89             if (!(moveDirectory.canRead()) || !moveDirectory.canWrite())
90             {
91                 throw new ConnectException(new Message("file", 5), this);
92             }
93         }
94     }
95
96     public void doDisconnect() throws Exception JavaDoc
97     {
98         // template method
99
}
100
101     public void poll()
102     {
103         try
104         {
105             File JavaDoc[] files = this.listFiles();
106             for (int i = 0; i < files.length; i++)
107             {
108                 this.processFile(files[i]);
109             }
110         }
111         catch (Exception JavaDoc e)
112         {
113             this.handleException(e);
114         }
115     }
116
117     public synchronized void processFile(final File JavaDoc sourceFile) throws UMOException
118     {
119         boolean checkFileAge = ((FileConnector)connector).getCheckFileAge();
120         if (checkFileAge)
121         {
122             long fileAge = ((FileConnector)connector).getFileAge();
123             long lastMod = sourceFile.lastModified();
124             long now = (new java.util.Date JavaDoc()).getTime();
125             if ((now - lastMod) < fileAge)
126             {
127                 return;
128             }
129         }
130
131         // don't process a file that is locked by another process (probably still being written)
132
if (!attemptFileLock(sourceFile))
133         {
134             return;
135         }
136
137         File JavaDoc destinationFile = null;
138         String JavaDoc sourceFileOriginalName = sourceFile.getName();
139         UMOMessageAdapter msgAdapter = connector.getMessageAdapter(sourceFile);
140         msgAdapter.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, sourceFileOriginalName);
141
142         // set up destination file
143
if (moveDir != null)
144         {
145             String JavaDoc destinationFileName = sourceFileOriginalName;
146
147             if (moveToPattern != null)
148             {
149                 destinationFileName = ((FileConnector) connector).getFilenameParser().getFilename(msgAdapter,
150                     moveToPattern);
151             }
152
153             // don't use new File() directly, see MULE-1112
154
destinationFile = FileUtils.newFile(moveDir, destinationFileName);
155         }
156
157         boolean fileWasMoved = false;
158
159         try
160         {
161             // Perform some quick checks to make sure file can be processed
162
if (!(sourceFile.canRead() && sourceFile.exists() && sourceFile.isFile()))
163             {
164                 throw new MuleException(new Message(Messages.FILE_X_DOES_NOT_EXIST, sourceFileOriginalName));
165             }
166
167             if (destinationFile != null)
168             {
169                 // move sourceFile to new destination
170
fileWasMoved = this.moveFile(sourceFile, destinationFile);
171
172                 // move didn't work - bail out (will attempt rollback)
173
if (!fileWasMoved)
174                 {
175                     throw new MuleException(new Message("file", 4, sourceFile.getAbsolutePath(),
176                         destinationFile.getAbsolutePath()));
177                 }
178
179                 // create new MessageAdapter for destinationFile
180
msgAdapter = connector.getMessageAdapter(destinationFile);
181                 msgAdapter.setProperty(FileConnector.PROPERTY_FILENAME, destinationFile.getName());
182                 msgAdapter.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, sourceFileOriginalName);
183             }
184
185             // at this point msgAdapter either points to the old sourceFile
186
// or the new destinationFile.
187
if (((FileConnector) connector).isAutoDelete())
188             {
189                 // no moveTo directory
190
if (destinationFile == null)
191                 {
192                     // delete source
193
if (!sourceFile.delete())
194                     {
195                         // TODO better message
196
throw new MuleException(new Message("file", 3, sourceFile.getAbsolutePath()));
197                     }
198                 }
199                 else
200                 {
201                     // nothing to do here since moveFile() should have deleted
202
// the source file for us
203
}
204             }
205
206             // finally deliver the file message
207
this.routeMessage(new MuleMessage(msgAdapter), endpoint.isSynchronous());
208         }
209         catch (Exception JavaDoc e)
210         {
211             boolean fileWasRolledBack = false;
212
213             // only attempt rollback if file move was successful
214
if (fileWasMoved)
215             {
216                 fileWasRolledBack = this.rollbackFileMove(destinationFile, sourceFile.getAbsolutePath());
217             }
218
219             // wrap exception & handle it
220
Exception JavaDoc ex = new RoutingException(new Message("file", 2, sourceFile.getName(),
221                 (fileWasRolledBack ? "successful" : "unsuccessful")), new MuleMessage(msgAdapter), endpoint,
222                 e);
223             this.handleException(ex);
224         }
225     }
226
227     /**
228      * Try to acquire a lock on a file and release it immediately. Usually used as a quick check to
229      * see if another process is still holding onto the file, e.g. a large file (more than 100MB) is
230      * still being written to.
231      * @param sourceFile file to check
232      * @return <code>true</code> if the file can be locked
233      */

234     protected boolean attemptFileLock(final File JavaDoc sourceFile)
235     {
236         // check if the file can be processed, be sure that it's not still being written
237
// if the file can't be locked don't process it yet, since creating
238
// a new FileInputStream() will throw an exception
239
FileLock JavaDoc lock = null;
240         FileChannel JavaDoc channel = null;
241         boolean fileCanBeLocked = false;
242         try
243         {
244             channel = new RandomAccessFile JavaDoc(sourceFile, "rw").getChannel();
245
246             // Try acquiring the lock without blocking. This method returns
247
// null or throws an exception if the file is already locked.
248
lock = channel.tryLock();
249         }
250         catch (IOException JavaDoc e)
251         {
252             // unable to create a lock
253
}
254         finally {
255             if (lock != null)
256             {
257                 // if lock is null the file is locked by another process
258
fileCanBeLocked = true;
259                 try
260                 {
261                     // Release the lock
262
lock.release();
263                 }
264                 catch (IOException JavaDoc e)
265                 {
266                     // ignore
267
}
268             }
269
270             if (channel != null)
271             {
272                 try
273                 {
274                     // Close the file
275
channel.close();
276                 }
277                 catch (IOException JavaDoc e)
278                 {
279                     // ignore
280
}
281             }
282         }
283
284         return fileCanBeLocked;
285     }
286
287     /**
288      * Try to move a file by renaming with backup attempt by copying/deleting via NIO
289      */

290     protected boolean moveFile(File JavaDoc sourceFile, File JavaDoc destinationFile)
291     {
292         // try fast file-system-level move/rename first
293
boolean success = sourceFile.renameTo(destinationFile);
294
295         if (!success)
296         {
297             // try again using NIO copy
298
FileInputStream JavaDoc fis = null;
299             FileOutputStream JavaDoc fos = null;
300             try
301             {
302                 fis = new FileInputStream JavaDoc(sourceFile);
303                 fos = new FileOutputStream JavaDoc(destinationFile);
304                 FileChannel JavaDoc srcChannel = fis.getChannel();
305                 FileChannel JavaDoc dstChannel = fos.getChannel();
306                 dstChannel.transferFrom(srcChannel, 0, srcChannel.size());
307                 srcChannel.close();
308                 dstChannel.close();
309                 success = sourceFile.delete();
310             }
311             catch (IOException JavaDoc ioex)
312             {
313                 // grr!
314
success = false;
315             }
316             finally
317             {
318                 IOUtils.closeQuietly(fis);
319                 IOUtils.closeQuietly(fos);
320             }
321         }
322
323         return success;
324     }
325
326     /**
327      * Exception tolerant roll back method
328      */

329     protected boolean rollbackFileMove(File JavaDoc sourceFile, String JavaDoc destinationFilePath)
330     {
331         boolean result = false;
332         try
333         {
334             result = this.moveFile(sourceFile, FileUtils.newFile(destinationFilePath));
335         }
336         catch (Throwable JavaDoc t)
337         {
338             logger.debug("rollback of file move failed: " + t.getMessage());
339         }
340         return result;
341     }
342
343     /**
344      * Get a list of files to be processed.
345      *
346      * @return an array of files to be processed.
347      * @throws org.mule.MuleException which will wrap any other exceptions or errors.
348      */

349     File JavaDoc[] listFiles() throws MuleException
350     {
351         try
352         {
353             File JavaDoc[] todoFiles = readDirectory.listFiles(filenameFilter);
354             // logger.trace("Reading directory " + readDirectory.getAbsolutePath() +
355
// " -> " + todoFiles.length + " file(s)");
356
return (todoFiles == null ? new File JavaDoc[0] : todoFiles);
357         }
358         catch (Exception JavaDoc e)
359         {
360             throw new MuleException(new Message("file", 1), e);
361         }
362     }
363
364 }
365
Popular Tags