Author: davsclaus Date: Mon Dec 17 08:39:51 2012 New Revision: 1422794 URL: http://svn.apache.org/viewvc?rev=1422794&view=rev Log: CAMEL-5848: file/ftp consumers - When using doneFileName then avoid picking up files in middle of group if done file is written during scanning
Added: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java - copied unchanged from r1422793, camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1422792 Merged /camel/branches/camel-2.10.x:r1422793 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=1422794&r1=1422793&r2=1422794&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Dec 17 08:39:51 2012 @@ -17,6 +17,7 @@ package org.apache.camel.component.file; import java.io.File; +import java.util.Arrays; import java.util.List; import org.apache.camel.Processor; @@ -51,8 +52,8 @@ public class FileConsumer extends Generi } log.trace("Polling directory: {}", directory.getPath()); - File[] files = directory.listFiles(); - if (files == null || files.length == 0) { + File[] dirFiles = directory.listFiles(); + if (dirFiles == null || dirFiles.length == 0) { // no files in this directory to poll if (log.isTraceEnabled()) { log.trace("No files found in directory: {}", directory.getPath()); @@ -61,9 +62,10 @@ public class FileConsumer extends Generi } else { // we found some files if (log.isTraceEnabled()) { - log.trace("Found {} in directory: {}", files.length, directory.getPath()); + log.trace("Found {} in directory: {}", dirFiles.length, directory.getPath()); } } + List<File> files = Arrays.asList(dirFiles); for (File file : files) { // check if we can continue polling in files @@ -81,7 +83,7 @@ public class FileConsumer extends Generi GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset()); if (file.isDirectory()) { - if (endpoint.isRecursive() && isValidFile(gf, true) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && isValidFile(gf, true, files) && depth < endpoint.getMaxDepth()) { // recursive scan and add the sub files and folders String subDirectory = fileName + File.separator + file.getName(); boolean canPollMore = pollDirectory(subDirectory, fileList, depth); @@ -91,7 +93,7 @@ public class FileConsumer extends Generi } } else { // Windows can report false to a file on a share so regard it always as a file (if its not a directory) - if (isValidFile(gf, false) && depth >= endpoint.minDepth) { + if (isValidFile(gf, false, files) && depth >= endpoint.minDepth) { if (isInProgress(gf)) { if (log.isTraceEnabled()) { log.trace("Skipping as file is already in progress: {}", gf.getFileName()); @@ -109,6 +111,19 @@ public class FileConsumer extends Generi return true; } + @Override + protected boolean isMatched(GenericFile<File> file, String doneFileName, List<File> files) { + String onlyName = FileUtil.stripPath(doneFileName); + // the done file name must be among the files + for (File f : files) { + if (f.getName().equals(onlyName)) { + return true; + } + } + log.trace("Done file: {} does not exist", doneFileName); + return false; + } + /** * Creates a new GenericFile<File> based on the given file. * Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1422794&r1=1422793&r2=1422794&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Dec 17 08:39:51 2012 @@ -441,10 +441,11 @@ public abstract class GenericFileConsume * * @param file the file * @param isDirectory whether the file is a directory or a file + * @param files files in the directory * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it */ - protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) { - if (!isMatched(file, isDirectory)) { + protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { + if (!isMatched(file, isDirectory, files)) { log.trace("File did not match. Will skip this file: {}", file); return false; } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { @@ -468,9 +469,10 @@ public abstract class GenericFileConsume * * @param file the file * @param isDirectory whether the file is a directory or a file + * @param files files in the directory * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not */ - protected boolean isMatched(GenericFile<T> file, boolean isDirectory) { + protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { String name = file.getFileNameOnly(); // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") @@ -528,7 +530,7 @@ public abstract class GenericFileConsume return false; } - if (!isMatched(file, doneFileName)) { + if (!isMatched(file, doneFileName, files)) { return false; } } @@ -540,19 +542,11 @@ public abstract class GenericFileConsume * Strategy to perform file matching based on endpoint configuration in terms of done file name. * * @param file the file - * @param doneFileName the done file name + * @param doneFileName the done file name (without any paths) + * @param files files in the directory * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not */ - protected boolean isMatched(GenericFile<T> file, String doneFileName) { - // the file is only valid if the done file exist - if (!operations.existsFile(doneFileName)) { - log.trace("Done file: {} does not exist", doneFileName); - return false; - } - - // assume matched - return true; - } + protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); /** * Is the given file already in progress. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1422794&r1=1422793&r2=1422794&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Mon Dec 17 08:39:51 2012 @@ -741,7 +741,6 @@ public abstract class GenericFileEndpoin if (ObjectHelper.isNotEmpty(path) && ObjectHelper.isNotEmpty(pattern)) { // done file must always be in same directory as the real file name answer = path + getFileSeparator() + pattern; - answer = path + File.separator + pattern; } if (getConfiguration().needToNormalize()) { Modified: camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=1422794&r1=1422793&r2=1422794&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original) +++ camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Mon Dec 17 08:39:51 2012 @@ -108,7 +108,7 @@ public class FtpConsumer extends RemoteF if (file.isDirectory()) { RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file); - if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) { // recursive scan and add the sub files and folders String subDirectory = file.getName(); String path = absolutePath + "/" + subDirectory; @@ -119,7 +119,7 @@ public class FtpConsumer extends RemoteF } } else if (file.isFile()) { RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file); - if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) { + if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) { if (isInProgress(remote)) { log.trace("Skipping as file is already in progress: {}", remote.getFileName()); } else { @@ -135,6 +135,20 @@ public class FtpConsumer extends RemoteF return true; } + @Override + protected boolean isMatched(GenericFile<FTPFile> file, String doneFileName, List<FTPFile> files) { + String onlyName = FileUtil.stripPath(doneFileName); + + for (FTPFile f : files) { + if (f.getName().equals(onlyName)) { + return true; + } + } + + log.trace("Done file: {} does not exist", doneFileName); + return false; + } + private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) { RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>(); Modified: camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1422794&r1=1422793&r2=1422794&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original) +++ camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Mon Dec 17 08:39:51 2012 @@ -20,10 +20,8 @@ import java.io.IOException; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileConsumer; import org.apache.camel.component.file.GenericFileOperationFailedException; -import org.apache.camel.util.FileUtil; /** * Base class for remote file consumers. @@ -155,14 +153,4 @@ public abstract class RemoteFileConsumer return ((RemoteFileEndpoint) endpoint).remoteServerInformation(); } - @Override - protected boolean isMatched(GenericFile<T> file, String doneFileName) { - // ftp specific as we need to cater for stepwise - if (getEndpoint().getConfiguration().isStepwise()) { - // stepwise enabled, so done file should always be without path - doneFileName = FileUtil.stripPath(doneFileName); - } - - return super.isMatched(file, doneFileName); - } } Modified: camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=1422794&r1=1422793&r2=1422794&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original) +++ camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Mon Dec 17 08:39:51 2012 @@ -105,7 +105,7 @@ public class SftpConsumer extends Remote if (file.getAttrs().isDir()) { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file); - if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) { // recursive scan and add the sub files and folders String subDirectory = file.getFilename(); String path = absolutePath + "/" + subDirectory; @@ -118,7 +118,7 @@ public class SftpConsumer extends Remote // just assuming its a file we should poll } else { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file); - if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) { + if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) { if (isInProgress(remote)) { if (log.isTraceEnabled()) { log.trace("Skipping as file is already in progress: {}", remote.getFileName()); @@ -134,6 +134,20 @@ public class SftpConsumer extends Remote return true; } + @Override + protected boolean isMatched(GenericFile<ChannelSftp.LsEntry> file, String doneFileName, List<ChannelSftp.LsEntry> files) { + String onlyName = FileUtil.stripPath(doneFileName); + + for (ChannelSftp.LsEntry f : files) { + if (f.getFilename().equals(onlyName)) { + return true; + } + } + + log.trace("Done file: {} does not exist", doneFileName); + return false; + } + private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String absolutePath, ChannelSftp.LsEntry file) { RemoteFile<ChannelSftp.LsEntry> answer = new RemoteFile<ChannelSftp.LsEntry>();