This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e9ff54  CAMEL-17078: support file resume at offset
8e9ff54 is described below

commit 8e9ff54a9d6f98c3233c14f473fbc8c6c083a790
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Oct 13 17:53:14 2021 +0200

    CAMEL-17078: support file resume at offset
    
    This should improve support for reading very large files, since it can
    allow uses to implement a strategy for resuming reading at a certain
    offset.
---
 .../apache/camel/component/file/FileConsumer.java  |  5 ++
 .../apache/camel/component/file/GenericFile.java   | 14 ++++-
 .../camel/component/file/GenericFileConverter.java | 12 +++--
 .../file/consumer/FileConsumerResumeStrategy.java  | 10 ++++
 .../src/main/java/org/apache/camel/Resumable.java  | 43 +++++++++++++++
 ... FileConsumerResumeFromOffsetStrategyTest.java} | 61 ++++++++++++----------
 .../file/FileConsumerResumeStrategyTest.java       |  8 ++-
 .../main/java/org/apache/camel/util/IOHelper.java  | 15 +++++-
 8 files changed, 134 insertions(+), 34 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 14cf40b..f38f0ea 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -102,6 +102,11 @@ public class FileConsumer extends 
GenericFileConsumer<File> {
             GenericFile<File> gf
                     = asGenericFile(endpointPath, file, 
getEndpoint().getCharset(), getEndpoint().isProbeContentType());
 
+            if (resumeStrategy != null) {
+                long offset = resumeStrategy.lastOffset(file);
+                gf.setLastOffset(offset);
+            }
+
             if (file.isDirectory()) {
                 if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() 
&& isValidFile(gf, true, files)) {
                     boolean canPollMore = pollDirectory(file, fileList, depth);
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index 735e66f..e851f9e 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Generic File. Specific implementations of a file based endpoint need to 
provide a File for transfer.
  */
-public class GenericFile<T> implements WrappedFile<T> {
+public class GenericFile<T> implements WrappedFile<T>, Resumable<Long> {
     private static final Logger LOG = 
LoggerFactory.getLogger(GenericFile.class);
 
     private final boolean probeContentType;
@@ -45,6 +46,7 @@ public class GenericFile<T> implements WrappedFile<T> {
     private String absoluteFilePath;
     private long fileLength;
     private long lastModified;
+    private long lastOffset;
     private T file;
     private GenericFileBinding<T> binding;
     private boolean absolute;
@@ -413,6 +415,16 @@ public class GenericFile<T> implements WrappedFile<T> {
         this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath;
     }
 
+    @Override
+    public void setLastOffset(Long offset) {
+        this.lastOffset = offset;
+    }
+
+    @Override
+    public Long getLastOffset() {
+        return lastOffset;
+    }
+
     /**
      * Fixes the path separator to be according to the protocol
      */
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
index 6cf547c..b854698 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
@@ -141,10 +141,12 @@ public final class GenericFileConverter {
         // use reader first as it supports the file charset
         Reader reader = genericFileToReader(file, exchange);
         if (reader != null) {
+            // Note: When resuming, the offset should've been reset at this 
point, so it may not be
+            // necessary to skip the bytes again
             return IOHelper.toString(reader);
         }
         if (exchange != null) {
-            // otherwise ensure the body is loaded as we want the content of 
the
+            // otherwise, ensure the body is loaded as we want the content of 
the
             // body
             file.getBinding().loadContent(exchange, file);
             return 
exchange.getContext().getTypeConverter().convertTo(String.class, exchange, 
file.getBody());
@@ -183,13 +185,17 @@ public final class GenericFileConverter {
             // and use the charset if the file was explicit configured with a
             // charset
             String charset = file.getCharset();
+            Reader reader;
             if (charset != null) {
                 LOG.debug("Read file {} with charset {}", f, 
file.getCharset());
-                return IOHelper.toReader(f, charset);
+                reader = IOHelper.toReader(f, charset);
             } else {
                 LOG.debug("Read file {} (no charset)", f);
-                return IOHelper.toReader(f, 
ExchangeHelper.getCharsetName(exchange));
+                reader = IOHelper.toReader(f, 
ExchangeHelper.getCharsetName(exchange));
             }
+
+            reader.skip(file.getLastOffset());
+            return reader;
         }
         return null;
     }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
index fc4a991..eedd3e2 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.component.file.consumer;
 
+import java.io.File;
+
 import org.apache.camel.ResumeStrategy;
 
 /**
@@ -25,6 +27,14 @@ import org.apache.camel.ResumeStrategy;
 public interface FileConsumerResumeStrategy extends 
ResumeStrategy<FileResumeInfo> {
 
     /**
+     * Returns the last offset read for the given file.
+     * 
+     * @param  file the file to check for the last offset
+     * @return      The last offset read or 0 (zero) if none has been read.
+     */
+    long lastOffset(File file);
+
+    /**
      * Perform the resume operation. This runs in the scope of the 
FileConsumer instance thread.
      *
      * @param resumeInfo resume information
diff --git a/core/camel-api/src/main/java/org/apache/camel/Resumable.java 
b/core/camel-api/src/main/java/org/apache/camel/Resumable.java
new file mode 100644
index 0000000..35977b3
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/Resumable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+/**
+ * This provides an interface for resumable objects. Such objects allow its 
users to address them at a specific offset.
+ * For example, when reading large files, it may be possible to inform the 
last offset that was read, thus allowing
+ * users of this interface to skip to that offset. This can potentially 
improve resumable operations by allowing
+ * reprocessing of data.
+ * 
+ * @param <T> the type of the addressable value for the resumable object (for 
example, a file would use a Long value)
+ */
+public interface Resumable<T> {
+
+    /**
+     * Sets the last offset the last offset as appropriate for the user of the 
interface
+     * 
+     * @param offset the offset value
+     */
+    void setLastOffset(T offset);
+
+    /**
+     * Gets the last offset value
+     * 
+     * @return the last offset value according to the interface and type 
implemented
+     */
+    T getLastOffset();
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
similarity index 58%
copy from 
core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
copy to 
core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index 5a804c1..8ddc246 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -17,58 +17,60 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.List;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy;
 import org.apache.camel.component.file.consumer.FileResumeInfo;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileConsumerResumeStrategyTest extends ContextTestSupport {
+public class FileConsumerResumeFromOffsetStrategyTest extends 
ContextTestSupport {
 
     private static class TestResumeStrategy implements 
FileConsumerResumeStrategy {
         private static final Logger LOG = 
LoggerFactory.getLogger(TestResumeStrategy.class);
 
         @Override
-        public void resume(FileResumeInfo resumeInfo) {
-            List<String> processedFiles = Arrays.asList("0.txt", "1.txt", 
"2.txt");
-            int count = 0;
+        public long lastOffset(File file) {
+            if (!file.getName().startsWith("resume-from-offset")) {
+                throw new RuntimeCamelException("Invalid file - resume 
strategy should not have been called!");
+            }
 
+            return 3;
+        }
+
+        @Override
+        public void resume(FileResumeInfo resumeInfo) {
             File[] input = resumeInfo.getInputFiles();
-            File[] tmp = Arrays.copyOf(input, input.length);
-
-            for (File file : resumeInfo.getInputFiles()) {
-                if (!processedFiles.contains(file.getName())) {
-                    LOG.info("Including file {}", file);
-                    tmp[count] = file;
-                    count++;
-                } else {
-                    LOG.info("Skipping file {}", file);
-                }
-            }
 
-            resumeInfo.setOutputFiles(Arrays.copyOf(tmp, count));
+            resumeInfo.setOutputFiles(input);
         }
     }
 
+    @DisplayName("Tests whether we can resume from an offset")
     @Test
-    public void testDepth() throws Exception {
+    public void testResumeFromOffset() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6");
+        mock.expectedBodiesReceivedInAnyOrder("34567890");
 
-        template.sendBodyAndHeader(fileUri("resume"), "0", Exchange.FILE_NAME, 
"0.txt");
-        template.sendBodyAndHeader(fileUri("resume"), "1", Exchange.FILE_NAME, 
"1.txt");
-        template.sendBodyAndHeader(fileUri("resume"), "2", Exchange.FILE_NAME, 
"2.txt");
-        template.sendBodyAndHeader(fileUri("resume"), "3", Exchange.FILE_NAME, 
"3.txt");
-        template.sendBodyAndHeader(fileUri("resume"), "4", Exchange.FILE_NAME, 
"4.txt");
-        template.sendBodyAndHeader(fileUri("resume"), "5", Exchange.FILE_NAME, 
"5.txt");
-        template.sendBodyAndHeader(fileUri("resume"), "6", Exchange.FILE_NAME, 
"6.txt");
+        template.sendBodyAndHeader(fileUri("resumeOff"), "01234567890", 
Exchange.FILE_NAME, "resume-from-offset.txt");
+
+        // only expect 4 of the 6 sent
+        assertMockEndpointsSatisfied();
+    }
+
+    @DisplayName("Tests whether we can start from the beginning (i.e.: no 
resume strategy)")
+    @Test
+    public void testNoResume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("01234567890");
+
+        template.sendBodyAndHeader(fileUri("resumeNone"), "01234567890", 
Exchange.FILE_NAME, "resume-none.txt");
 
         // only expect 4 of the 6 sent
         assertMockEndpointsSatisfied();
@@ -82,7 +84,10 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
 
                 bindToRegistry("testResumeStrategy", new TestResumeStrategy());
 
-                
from(fileUri("resume?noop=true&recursive=true&resumeStrategy=#testResumeStrategy"))
+                
from(fileUri("resumeOff?noop=true&recursive=true&resumeStrategy=#testResumeStrategy"))
+                        .convertBodyTo(String.class).to("mock:result");
+
+                from(fileUri("resumeNone?noop=true&recursive=true"))
                         .convertBodyTo(String.class).to("mock:result");
             }
         };
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 5a804c1..7ecbff5 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy;
 import org.apache.camel.component.file.consumer.FileResumeInfo;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.IOHelper;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +37,11 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
         private static final Logger LOG = 
LoggerFactory.getLogger(TestResumeStrategy.class);
 
         @Override
+        public long lastOffset(File file) {
+            return IOHelper.INITIAL_OFFSET;
+        }
+
+        @Override
         public void resume(FileResumeInfo resumeInfo) {
             List<String> processedFiles = Arrays.asList("0.txt", "1.txt", 
"2.txt");
             int count = 0;
@@ -58,7 +64,7 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
     }
 
     @Test
-    public void testDepth() throws Exception {
+    public void testResume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6");
 
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java 
b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
index a54cd90..abfa1e4 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
@@ -55,6 +55,8 @@ public final class IOHelper {
 
     public static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
 
+    public static final long INITIAL_OFFSET = 0;
+
     private static final Logger LOG = LoggerFactory.getLogger(IOHelper.class);
 
     // allows to turn on backwards compatible to turn off regarding the first
@@ -116,11 +118,22 @@ public final class IOHelper {
     }
 
     public static String toString(Reader reader) throws IOException {
-        return toString(buffered(reader));
+        return toString(reader, INITIAL_OFFSET);
+    }
+
+    public static String toString(Reader reader, long offset) throws 
IOException {
+        return toString(buffered(reader), offset);
     }
 
     public static String toString(BufferedReader reader) throws IOException {
+        return toString(reader, INITIAL_OFFSET);
+    }
+
+    public static String toString(BufferedReader reader, long offset) throws 
IOException {
         StringBuilder sb = new StringBuilder(1024);
+
+        reader.skip(offset);
+
         char[] buf = new char[1024];
         try {
             int len;

Reply via email to