Repository: camel Updated Branches: refs/heads/master 092c7053e -> 5c47d946e
CAMEL-9970: CamelFileLength header is wrong for long write file. Implemented a generic solution. Thanks to Sergey Monichev for the test patch we are using. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5c47d946 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5c47d946 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5c47d946 Branch: refs/heads/master Commit: 5c47d946e1f469f515203b594b720b9f5a297ddc Parents: 092c705 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 20 08:57:27 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri May 20 08:57:27 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/file/FileConsumer.java | 16 ++++++++++++++++ .../camel/component/file/GenericFileConsumer.java | 15 +++++++++++++++ .../file/strategy/FileChangedReadLockTest.java | 10 ++++++++++ .../camel/component/file/remote/FtpConsumer.java | 15 +++++++++++++++ .../camel/component/file/remote/SftpConsumer.java | 15 +++++++++++++++ .../remote/RemoteFileIgnoreDoPollErrorTest.java | 6 ++++++ 6 files changed, 77 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java index 7ffc1a7..5e3a180 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; @@ -234,6 +236,20 @@ public class FileConsumer extends GenericFileConsumer<File> { } @Override + protected void updateFileHeaders(GenericFile<File> file, Message message) { + long length = file.getFile().length(); + long modified = file.getFile().lastModified(); + file.setFileLength(length); + file.setLastModified(modified); + if (length >= 0) { + message.setHeader(Exchange.FILE_LENGTH, length); + } + if (modified >= 0) { + message.setHeader(Exchange.FILE_LAST_MODIFIED, modified); + } + } + + @Override public FileEndpoint getEndpoint() { return (FileEndpoint) super.getEndpoint(); } http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 399dabd..bf7dc23 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.ScheduledBatchPollingConsumer; @@ -396,6 +397,11 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // must use file from exchange as it can be updated due the // preMoveNamePrefix/preMoveNamePostfix options final GenericFile<T> target = getExchangeFileProperty(exchange); + + // we can begin processing the file so update file headers on the Camel message + // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc + updateFileHeaders(target, exchange.getIn()); + // must use full name when downloading so we have the correct path final String name = target.getAbsoluteFilePath(); try { @@ -476,6 +482,15 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum } /** + * Updates the information on {@link Message} after we have acquired read-lock and + * can begin process the file. + * + * @param file the file + * @param message the Camel message to update its headers + */ + protected abstract void updateFileHeaders(GenericFile<T> file, Message message); + + /** * Override if required. Files are retrieved / returns true by default * * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java index ede2496..5373bd7 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileOutputStream; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.slf4j.Logger; @@ -43,6 +44,7 @@ public class FileChangedReadLockTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedFileExists("target/changed/out/slowfile.dat"); + mock.expectedHeaderReceived(Exchange.FILE_LENGTH, expectedFileLength()); writeSlowFile(); @@ -71,6 +73,14 @@ public class FileChangedReadLockTest extends ContextTestSupport { LOG.debug("Writing slow file DONE..."); } + long expectedFileLength() { + long length = 0; + for (int i = 0; i < 20; i++) { + length += ("Line " + i + LS).getBytes().length; + } + return length; + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java index c0a37fe..6ece7e0 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileOperationFailedException; @@ -253,6 +254,20 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { return answer; } + @Override + protected void updateFileHeaders(GenericFile<FTPFile> file, Message message) { + long length = file.getFile().getSize(); + long modified = file.getFile().getTimestamp() != null ? file.getFile().getTimestamp().getTimeInMillis() : -1; + file.setFileLength(length); + file.setLastModified(modified); + if (length >= 0) { + message.setHeader(Exchange.FILE_LENGTH, length); + } + if (modified >= 0) { + message.setHeader(Exchange.FILE_LAST_MODIFIED, modified); + } + } + private boolean isStepwise() { RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration(); return config.isStepwise(); http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java index 4be7394..048c5ac 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java @@ -21,6 +21,7 @@ import java.util.List; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.SftpException; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileOperationFailedException; @@ -223,6 +224,20 @@ public class SftpConsumer extends RemoteFileConsumer<ChannelSftp.LsEntry> { return answer; } + @Override + protected void updateFileHeaders(GenericFile<ChannelSftp.LsEntry> file, Message message) { + long length = file.getFile().getAttrs().getSize(); + long modified = file.getFile().getAttrs().getMTime() * 1000L; + file.setFileLength(length); + file.setLastModified(modified); + if (length >= 0) { + message.setHeader(Exchange.FILE_LENGTH, length); + } + if (modified >= 0) { + message.setHeader(Exchange.FILE_LAST_MODIFIED, modified); + } + } + private boolean isStepwise() { RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration(); return config.isStepwise(); http://git-wip-us.apache.org/repos/asf/camel/blob/5c47d946/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java index bcd4201..b225fa9 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileOperationFailedException; @@ -110,6 +111,11 @@ public class RemoteFileIgnoreDoPollErrorTest { protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) { return ignoreCannotRetrieveFile; } + + @Override + protected void updateFileHeaders(GenericFile<Object> genericFile, Message message) { + // noop + } }; } }