1 10 11 package org.mule.providers.file; 12 13 import java.io.File ; 14 import java.io.FileOutputStream ; 15 import java.io.FilenameFilter ; 16 import java.io.IOException ; 17 import java.io.OutputStream ; 18 import java.net.URLDecoder ; 19 20 import org.mule.MuleException; 21 import org.mule.MuleManager; 22 import org.mule.config.i18n.Message; 23 import org.mule.config.i18n.Messages; 24 import org.mule.impl.MuleMessage; 25 import org.mule.providers.AbstractMessageDispatcher; 26 import org.mule.providers.file.filters.FilenameWildcardFilter; 27 import org.mule.umo.UMOEvent; 28 import org.mule.umo.UMOException; 29 import org.mule.umo.UMOMessage; 30 import org.mule.umo.endpoint.UMOImmutableEndpoint; 31 import org.mule.umo.provider.DispatchException; 32 import org.mule.umo.provider.UMOConnector; 33 import org.mule.util.FileUtils; 34 import org.mule.util.MapUtils; 35 36 39 public class FileMessageDispatcher extends AbstractMessageDispatcher 40 { 41 private final FileConnector connector; 42 43 public FileMessageDispatcher(UMOImmutableEndpoint endpoint) 44 { 45 super(endpoint); 46 this.connector = (FileConnector)endpoint.getConnector(); 47 } 48 49 54 protected void doDispatch(UMOEvent event) throws Exception 55 { 56 Object data = event.getTransformedMessage(); 57 UMOMessage message = new MuleMessage(data, event.getMessage()); 59 60 byte[] buf; 61 if (data instanceof byte[]) 62 { 63 buf = (byte[])data; 64 } 65 else 66 { 67 buf = data.toString().getBytes(event.getEncoding()); 68 } 69 FileOutputStream fos = (FileOutputStream )getOutputStream(event.getEndpoint(), message); 70 try 71 { 72 fos.write(buf); 73 } 74 finally 75 { 76 fos.close(); 77 } 78 } 79 80 90 public OutputStream getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message) 91 throws UMOException 92 { 93 String address = endpoint.getEndpointURI().getAddress(); 94 String writeToDirectory = message.getStringProperty( 95 FileConnector.PROPERTY_WRITE_TO_DIRECTORY, null); 96 if (writeToDirectory == null) 97 { 98 writeToDirectory = connector.getWriteToDirectory(); 99 } 100 if (writeToDirectory != null) 101 { 102 address = connector.getFilenameParser().getFilename(message, writeToDirectory); 103 } 104 105 String filename; 106 String outPattern = (String )endpoint.getProperty(FileConnector.PROPERTY_OUTPUT_PATTERN); 107 if (outPattern == null) 108 { 109 outPattern = message.getStringProperty(FileConnector.PROPERTY_OUTPUT_PATTERN, null); 110 } 111 if (outPattern == null) 112 { 113 outPattern = connector.getOutputPattern(); 114 } 115 try 116 { 117 if (outPattern != null) 118 { 119 filename = generateFilename(message, outPattern); 120 } 121 else 122 { 123 filename = message.getStringProperty(FileConnector.PROPERTY_FILENAME, null); 124 if (filename == null) 125 { 126 filename = generateFilename(message, null); 127 } 128 } 129 130 if (filename == null) 131 { 132 throw new IOException ("Filename is null"); 133 } 134 File file = FileUtils.createFile(address + "/" + filename); 135 if (logger.isInfoEnabled()) 136 { 137 logger.info("Writing file to: " + file.getAbsolutePath()); 138 } 139 140 return new FileOutputStream (file, MapUtils.getBooleanValue(endpoint.getProperties(), 141 "outputAppend", connector.isOutputAppend())); 142 } 143 catch (IOException e) 144 { 145 throw new DispatchException(new Message(Messages.STREAMING_FAILED_NO_STREAM), message, 146 endpoint, e); 147 } 148 } 149 150 155 public Object getDelegateSession() throws UMOException 156 { 157 return null; 158 } 159 160 171 172 protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 173 { 174 175 File file = FileUtils.newFile(endpoint.getEndpointURI().getAddress()); 176 File result = null; 177 FilenameFilter filenameFilter = null; 178 String filter = (String )endpoint.getProperty("filter"); 179 if (filter != null) 180 { 181 filter = URLDecoder.decode(filter, MuleManager.getConfiguration().getEncoding()); 182 filenameFilter = new FilenameWildcardFilter(filter); 183 } 184 if (file.exists()) 185 { 186 if (file.isFile()) 187 { 188 result = file; 189 } 190 else if (file.isDirectory()) 191 { 192 result = getNextFile(endpoint.getEndpointURI().getAddress(), filenameFilter); 193 } 194 if (result != null) 195 { 196 boolean checkFileAge = connector.getCheckFileAge(); 197 if (checkFileAge) 198 { 199 long fileAge = connector.getFileAge(); 200 long lastMod = result.lastModified(); 201 long now = (new java.util.Date ()).getTime(); 202 if ((now - lastMod) < fileAge) 203 { 204 return null; 205 } 206 } 207 208 MuleMessage message = new MuleMessage(connector.getMessageAdapter(result)); 209 if (connector.getMoveToDirectory() != null) 210 { 211 { 212 File destinationFile = new File (connector.getMoveToDirectory(), result 213 .getName()); 214 if (!result.renameTo(destinationFile)) 215 { 216 logger.error("Failed to move file: " + result.getAbsolutePath() 217 + " to " + destinationFile.getAbsolutePath()); 218 } 219 } 220 } 221 result.delete(); 222 return message; 223 } 224 } 225 return null; 226 } 227 228 private File getNextFile(String dir, FilenameFilter filter) throws UMOException 229 { 230 File [] files; 231 File file = FileUtils.newFile(dir); 232 File result = null; 233 try 234 { 235 if (file.exists()) 236 { 237 if (file.isFile()) 238 { 239 result = file; 240 } 241 else if (file.isDirectory()) 242 { 243 if (filter != null) 244 { 245 files = file.listFiles(filter); 246 } 247 else 248 { 249 files = file.listFiles(); 250 } 251 if (files.length > 0) 252 { 253 result = files[0]; 254 } 255 } 256 } 257 return result; 258 } 259 catch (Exception e) 260 { 261 throw new MuleException(new Message("file", 1), e); 262 } 263 } 264 265 270 protected UMOMessage doSend(UMOEvent event) throws Exception 271 { 272 doDispatch(event); 273 return event.getMessage(); 274 } 275 276 281 public UMOConnector getConnector() 282 { 283 return connector; 284 } 285 286 private String generateFilename(UMOMessage message, String pattern) 287 { 288 if (pattern == null) 289 { 290 pattern = connector.getOutputPattern(); 291 } 292 return connector.getFilenameParser().getFilename(message, pattern); 293 } 294 295 protected void doDispose() 296 { 297 } 299 300 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 301 { 302 } 304 305 protected void doDisconnect() throws Exception 306 { 307 } 309 310 } 311 | Popular Tags |