Author: davsclaus Date: Mon Dec 17 08:32:58 2012 New Revision: 1422792 URL: http://svn.apache.org/viewvc?rev=1422792&view=rev Log: CAMEL-5848: file/ftp consumers - When using doneFileName then avoid picking up files in middle of group if done file is written during scanning
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=1422792&r1=1422791&r2=1422792&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Dec 17 08:32:58 2012 @@ -17,6 +17,7 @@ package org.apache.camel.component.file; import java.io.File; +import java.util.Arrays; import java.util.List; import org.apache.camel.Processor; @@ -51,8 +52,8 @@ public class FileConsumer extends Generi } log.trace("Polling directory: {}", directory.getPath()); - File[] files = directory.listFiles(); - if (files == null || files.length == 0) { + File[] dirFiles = directory.listFiles(); + if (dirFiles == null || dirFiles.length == 0) { // no files in this directory to poll if (log.isTraceEnabled()) { log.trace("No files found in directory: {}", directory.getPath()); @@ -61,9 +62,10 @@ public class FileConsumer extends Generi } else { // we found some files if (log.isTraceEnabled()) { - log.trace("Found {} in directory: {}", files.length, directory.getPath()); + log.trace("Found {} in directory: {}", dirFiles.length, directory.getPath()); } } + List<File> files = Arrays.asList(dirFiles); for (File file : files) { // check if we can continue polling in files @@ -81,7 +83,7 @@ public class FileConsumer extends Generi GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset()); if (file.isDirectory()) { - if (endpoint.isRecursive() && isValidFile(gf, true) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && isValidFile(gf, true, files) && depth < endpoint.getMaxDepth()) { // recursive scan and add the sub files and folders String subDirectory = fileName + File.separator + file.getName(); boolean canPollMore = pollDirectory(subDirectory, fileList, depth); @@ -91,7 +93,7 @@ public class FileConsumer extends Generi } } else { // Windows can report false to a file on a share so regard it always as a file (if its not a directory) - if (isValidFile(gf, false) && depth >= endpoint.minDepth) { + if (isValidFile(gf, false, files) && depth >= endpoint.minDepth) { if (isInProgress(gf)) { if (log.isTraceEnabled()) { log.trace("Skipping as file is already in progress: {}", gf.getFileName()); @@ -109,6 +111,19 @@ public class FileConsumer extends Generi return true; } + @Override + protected boolean isMatched(GenericFile<File> file, String doneFileName, List<File> files) { + String onlyName = FileUtil.stripPath(doneFileName); + // the done file name must be among the files + for (File f : files) { + if (f.getName().equals(onlyName)) { + return true; + } + } + log.trace("Done file: {} does not exist", doneFileName); + return false; + } + /** * Creates a new GenericFile<File> based on the given file. * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1422792&r1=1422791&r2=1422792&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Dec 17 08:32:58 2012 @@ -326,7 +326,7 @@ public abstract class GenericFileConsume log.trace("Retrieved file: {} from: {}", name, endpoint); } else { - log.trace("Skiped retrieval of file: {} from: {}", name, endpoint); + log.trace("Skipped retrieval of file: {} from: {}", name, endpoint); exchange.getIn().setBody(null); } @@ -404,10 +404,11 @@ public abstract class GenericFileConsume * * @param file the file * @param isDirectory whether the file is a directory or a file + * @param files files in the directory * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it */ - protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) { - if (!isMatched(file, isDirectory)) { + protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { + if (!isMatched(file, isDirectory, files)) { log.trace("File did not match. Will skip this file: {}", file); return false; } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { @@ -431,9 +432,10 @@ public abstract class GenericFileConsume * * @param file the file * @param isDirectory whether the file is a directory or a file + * @param files files in the directory * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not */ - protected boolean isMatched(GenericFile<T> file, boolean isDirectory) { + protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { String name = file.getFileNameOnly(); // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") @@ -497,7 +499,7 @@ public abstract class GenericFileConsume return false; } - if (!isMatched(file, doneFileName)) { + if (!isMatched(file, doneFileName, files)) { return false; } } @@ -509,19 +511,11 @@ public abstract class GenericFileConsume * Strategy to perform file matching based on endpoint configuration in terms of done file name. * * @param file the file - * @param doneFileName the done file name + * @param doneFileName the done file name (without any paths) + * @param files files in the directory * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not */ - protected boolean isMatched(GenericFile<T> file, String doneFileName) { - // the file is only valid if the done file exist - if (!operations.existsFile(doneFileName)) { - log.trace("Done file: {} does not exist", doneFileName); - return false; - } - - // assume matched - return true; - } + protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); /** * Is the given file already in progress. Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1422792&r1=1422791&r2=1422792&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Mon Dec 17 08:32:58 2012 @@ -806,7 +806,6 @@ public abstract class GenericFileEndpoin if (ObjectHelper.isNotEmpty(path) && ObjectHelper.isNotEmpty(pattern)) { // done file must always be in same directory as the real file name answer = path + getFileSeparator() + pattern; - answer = path + File.separator + pattern; } if (getConfiguration().needToNormalize()) { Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java?rev=1422792&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java Mon Dec 17 08:32:58 2012 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; + +/** + * CAMEL-5848 + */ +public class FileConsumeDoneFileIssueTest extends ContextTestSupport { + + public void testFileConsumeDoneFileIssue() throws Exception { + template.sendBodyAndHeader("file:target/done", "A", Exchange.FILE_NAME, "foo-a.txt"); + template.sendBodyAndHeader("file:target/done", "B", Exchange.FILE_NAME, "foo-b.txt"); + template.sendBodyAndHeader("file:target/done", "C", Exchange.FILE_NAME, "foo-c.txt"); + template.sendBodyAndHeader("file:target/done", "D", Exchange.FILE_NAME, "foo-d.txt"); + template.sendBodyAndHeader("file:target/done", "E", Exchange.FILE_NAME, "foo-e.txt"); + template.sendBodyAndHeader("file:target/done", "E", Exchange.FILE_NAME, "foo.done"); + + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("A", "B", "C", "D", "E"); + + context.startRoute("foo"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/done?doneFileName=foo.done").routeId("foo").noAutoStartup() + .convertBodyTo(String.class) + .to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=1422792&r1=1422791&r2=1422792&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Mon Dec 17 08:32:58 2012 @@ -108,7 +108,7 @@ public class FtpConsumer extends RemoteF if (file.isDirectory()) { RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file); - if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) { // recursive scan and add the sub files and folders String subDirectory = file.getName(); String path = absolutePath + "/" + subDirectory; @@ -119,7 +119,7 @@ public class FtpConsumer extends RemoteF } } else if (file.isFile()) { RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file); - if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) { + if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) { if (isInProgress(remote)) { log.trace("Skipping as file is already in progress: {}", remote.getFileName()); } else { @@ -135,6 +135,20 @@ public class FtpConsumer extends RemoteF return true; } + @Override + protected boolean isMatched(GenericFile<FTPFile> file, String doneFileName, List<FTPFile> files) { + String onlyName = FileUtil.stripPath(doneFileName); + + for (FTPFile f : files) { + if (f.getName().equals(onlyName)) { + return true; + } + } + + log.trace("Done file: {} does not exist", doneFileName); + return false; + } + private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) { RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>(); Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1422792&r1=1422791&r2=1422792&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Mon Dec 17 08:32:58 2012 @@ -20,10 +20,8 @@ import java.io.IOException; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileConsumer; import org.apache.camel.component.file.GenericFileOperationFailedException; -import org.apache.camel.util.FileUtil; /** * Base class for remote file consumers. @@ -160,14 +158,4 @@ public abstract class RemoteFileConsumer return ((RemoteFileEndpoint<?>) endpoint).remoteServerInformation(); } - @Override - protected boolean isMatched(GenericFile<T> file, String doneFileName) { - // ftp specific as we need to cater for stepwise - if (getEndpoint().getConfiguration().isStepwise()) { - // stepwise enabled, so done file should always be without path - doneFileName = FileUtil.stripPath(doneFileName); - } - - return super.isMatched(file, doneFileName); - } } Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=1422792&r1=1422791&r2=1422792&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Mon Dec 17 08:32:58 2012 @@ -105,7 +105,7 @@ public class SftpConsumer extends Remote if (file.getAttrs().isDir()) { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file); - if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) { // recursive scan and add the sub files and folders String subDirectory = file.getFilename(); String path = absolutePath + "/" + subDirectory; @@ -118,7 +118,7 @@ public class SftpConsumer extends Remote // just assuming its a file we should poll } else { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file); - if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) { + if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) { if (isInProgress(remote)) { if (log.isTraceEnabled()) { log.trace("Skipping as file is already in progress: {}", remote.getFileName()); @@ -134,6 +134,20 @@ public class SftpConsumer extends Remote return true; } + @Override + protected boolean isMatched(GenericFile<ChannelSftp.LsEntry> file, String doneFileName, List<ChannelSftp.LsEntry> files) { + String onlyName = FileUtil.stripPath(doneFileName); + + for (ChannelSftp.LsEntry f : files) { + if (f.getFilename().equals(onlyName)) { + return true; + } + } + + log.trace("Done file: {} does not exist", doneFileName); + return false; + } + private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String absolutePath, ChannelSftp.LsEntry file) { RemoteFile<ChannelSftp.LsEntry> answer = new RemoteFile<ChannelSftp.LsEntry>();