This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 8e9ff54 CAMEL-17078: support file resume at offset 8e9ff54 is described below commit 8e9ff54a9d6f98c3233c14f473fbc8c6c083a790 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Oct 13 17:53:14 2021 +0200 CAMEL-17078: support file resume at offset This should improve support for reading very large files, since it can allow uses to implement a strategy for resuming reading at a certain offset. --- .../apache/camel/component/file/FileConsumer.java | 5 ++ .../apache/camel/component/file/GenericFile.java | 14 ++++- .../camel/component/file/GenericFileConverter.java | 12 +++-- .../file/consumer/FileConsumerResumeStrategy.java | 10 ++++ .../src/main/java/org/apache/camel/Resumable.java | 43 +++++++++++++++ ... FileConsumerResumeFromOffsetStrategyTest.java} | 61 ++++++++++++---------- .../file/FileConsumerResumeStrategyTest.java | 8 ++- .../main/java/org/apache/camel/util/IOHelper.java | 15 +++++- 8 files changed, 134 insertions(+), 34 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java index 14cf40b..f38f0ea 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -102,6 +102,11 @@ public class FileConsumer extends GenericFileConsumer<File> { GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset(), getEndpoint().isProbeContentType()); + if (resumeStrategy != null) { + long offset = resumeStrategy.lastOffset(file); + gf.setLastOffset(offset); + } + if (file.isDirectory()) { if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() && isValidFile(gf, true, files)) { boolean canPollMore = pollDirectory(file, fileList, depth); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java index 735e66f..e851f9e 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.util.Map; import org.apache.camel.Exchange; +import org.apache.camel.Resumable; import org.apache.camel.WrappedFile; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; @@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory; /** * Generic File. Specific implementations of a file based endpoint need to provide a File for transfer. */ -public class GenericFile<T> implements WrappedFile<T> { +public class GenericFile<T> implements WrappedFile<T>, Resumable<Long> { private static final Logger LOG = LoggerFactory.getLogger(GenericFile.class); private final boolean probeContentType; @@ -45,6 +46,7 @@ public class GenericFile<T> implements WrappedFile<T> { private String absoluteFilePath; private long fileLength; private long lastModified; + private long lastOffset; private T file; private GenericFileBinding<T> binding; private boolean absolute; @@ -413,6 +415,16 @@ public class GenericFile<T> implements WrappedFile<T> { this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath; } + @Override + public void setLastOffset(Long offset) { + this.lastOffset = offset; + } + + @Override + public Long getLastOffset() { + return lastOffset; + } + /** * Fixes the path separator to be according to the protocol */ diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java index 6cf547c..b854698 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java @@ -141,10 +141,12 @@ public final class GenericFileConverter { // use reader first as it supports the file charset Reader reader = genericFileToReader(file, exchange); if (reader != null) { + // Note: When resuming, the offset should've been reset at this point, so it may not be + // necessary to skip the bytes again return IOHelper.toString(reader); } if (exchange != null) { - // otherwise ensure the body is loaded as we want the content of the + // otherwise, ensure the body is loaded as we want the content of the // body file.getBinding().loadContent(exchange, file); return exchange.getContext().getTypeConverter().convertTo(String.class, exchange, file.getBody()); @@ -183,13 +185,17 @@ public final class GenericFileConverter { // and use the charset if the file was explicit configured with a // charset String charset = file.getCharset(); + Reader reader; if (charset != null) { LOG.debug("Read file {} with charset {}", f, file.getCharset()); - return IOHelper.toReader(f, charset); + reader = IOHelper.toReader(f, charset); } else { LOG.debug("Read file {} (no charset)", f); - return IOHelper.toReader(f, ExchangeHelper.getCharsetName(exchange)); + reader = IOHelper.toReader(f, ExchangeHelper.getCharsetName(exchange)); } + + reader.skip(file.getLastOffset()); + return reader; } return null; } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java index fc4a991..eedd3e2 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java @@ -17,6 +17,8 @@ package org.apache.camel.component.file.consumer; +import java.io.File; + import org.apache.camel.ResumeStrategy; /** @@ -25,6 +27,14 @@ import org.apache.camel.ResumeStrategy; public interface FileConsumerResumeStrategy extends ResumeStrategy<FileResumeInfo> { /** + * Returns the last offset read for the given file. + * + * @param file the file to check for the last offset + * @return The last offset read or 0 (zero) if none has been read. + */ + long lastOffset(File file); + + /** * Perform the resume operation. This runs in the scope of the FileConsumer instance thread. * * @param resumeInfo resume information diff --git a/core/camel-api/src/main/java/org/apache/camel/Resumable.java b/core/camel-api/src/main/java/org/apache/camel/Resumable.java new file mode 100644 index 0000000..35977b3 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/Resumable.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel; + +/** + * This provides an interface for resumable objects. Such objects allow its users to address them at a specific offset. + * For example, when reading large files, it may be possible to inform the last offset that was read, thus allowing + * users of this interface to skip to that offset. This can potentially improve resumable operations by allowing + * reprocessing of data. + * + * @param <T> the type of the addressable value for the resumable object (for example, a file would use a Long value) + */ +public interface Resumable<T> { + + /** + * Sets the last offset the last offset as appropriate for the user of the interface + * + * @param offset the offset value + */ + void setLastOffset(T offset); + + /** + * Gets the last offset value + * + * @return the last offset value according to the interface and type implemented + */ + T getLastOffset(); +} diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java similarity index 58% copy from core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java copy to core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index 5a804c1..8ddc246 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -17,58 +17,60 @@ package org.apache.camel.component.file; import java.io.File; -import java.util.Arrays; -import java.util.List; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; import org.apache.camel.component.file.consumer.FileResumeInfo; import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FileConsumerResumeStrategyTest extends ContextTestSupport { +public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport { private static class TestResumeStrategy implements FileConsumerResumeStrategy { private static final Logger LOG = LoggerFactory.getLogger(TestResumeStrategy.class); @Override - public void resume(FileResumeInfo resumeInfo) { - List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt"); - int count = 0; + public long lastOffset(File file) { + if (!file.getName().startsWith("resume-from-offset")) { + throw new RuntimeCamelException("Invalid file - resume strategy should not have been called!"); + } + return 3; + } + + @Override + public void resume(FileResumeInfo resumeInfo) { File[] input = resumeInfo.getInputFiles(); - File[] tmp = Arrays.copyOf(input, input.length); - - for (File file : resumeInfo.getInputFiles()) { - if (!processedFiles.contains(file.getName())) { - LOG.info("Including file {}", file); - tmp[count] = file; - count++; - } else { - LOG.info("Skipping file {}", file); - } - } - resumeInfo.setOutputFiles(Arrays.copyOf(tmp, count)); + resumeInfo.setOutputFiles(input); } } + @DisplayName("Tests whether we can resume from an offset") @Test - public void testDepth() throws Exception { + public void testResumeFromOffset() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6"); + mock.expectedBodiesReceivedInAnyOrder("34567890"); - template.sendBodyAndHeader(fileUri("resume"), "0", Exchange.FILE_NAME, "0.txt"); - template.sendBodyAndHeader(fileUri("resume"), "1", Exchange.FILE_NAME, "1.txt"); - template.sendBodyAndHeader(fileUri("resume"), "2", Exchange.FILE_NAME, "2.txt"); - template.sendBodyAndHeader(fileUri("resume"), "3", Exchange.FILE_NAME, "3.txt"); - template.sendBodyAndHeader(fileUri("resume"), "4", Exchange.FILE_NAME, "4.txt"); - template.sendBodyAndHeader(fileUri("resume"), "5", Exchange.FILE_NAME, "5.txt"); - template.sendBodyAndHeader(fileUri("resume"), "6", Exchange.FILE_NAME, "6.txt"); + template.sendBodyAndHeader(fileUri("resumeOff"), "01234567890", Exchange.FILE_NAME, "resume-from-offset.txt"); + + // only expect 4 of the 6 sent + assertMockEndpointsSatisfied(); + } + + @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)") + @Test + public void testNoResume() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceivedInAnyOrder("01234567890"); + + template.sendBodyAndHeader(fileUri("resumeNone"), "01234567890", Exchange.FILE_NAME, "resume-none.txt"); // only expect 4 of the 6 sent assertMockEndpointsSatisfied(); @@ -82,7 +84,10 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { bindToRegistry("testResumeStrategy", new TestResumeStrategy()); - from(fileUri("resume?noop=true&recursive=true&resumeStrategy=#testResumeStrategy")) + from(fileUri("resumeOff?noop=true&recursive=true&resumeStrategy=#testResumeStrategy")) + .convertBodyTo(String.class).to("mock:result"); + + from(fileUri("resumeNone?noop=true&recursive=true")) .convertBodyTo(String.class).to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java index 5a804c1..7ecbff5 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java @@ -26,6 +26,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; import org.apache.camel.component.file.consumer.FileResumeInfo; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.IOHelper; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,11 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { private static final Logger LOG = LoggerFactory.getLogger(TestResumeStrategy.class); @Override + public long lastOffset(File file) { + return IOHelper.INITIAL_OFFSET; + } + + @Override public void resume(FileResumeInfo resumeInfo) { List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt"); int count = 0; @@ -58,7 +64,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { } @Test - public void testDepth() throws Exception { + public void testResume() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6"); diff --git a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java index a54cd90..abfa1e4 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java @@ -55,6 +55,8 @@ public final class IOHelper { public static final int DEFAULT_BUFFER_SIZE = 1024 * 4; + public static final long INITIAL_OFFSET = 0; + private static final Logger LOG = LoggerFactory.getLogger(IOHelper.class); // allows to turn on backwards compatible to turn off regarding the first @@ -116,11 +118,22 @@ public final class IOHelper { } public static String toString(Reader reader) throws IOException { - return toString(buffered(reader)); + return toString(reader, INITIAL_OFFSET); + } + + public static String toString(Reader reader, long offset) throws IOException { + return toString(buffered(reader), offset); } public static String toString(BufferedReader reader) throws IOException { + return toString(reader, INITIAL_OFFSET); + } + + public static String toString(BufferedReader reader, long offset) throws IOException { StringBuilder sb = new StringBuilder(1024); + + reader.skip(offset); + char[] buf = new char[1024]; try { int len;