This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5e2c5a8b6664015adb95800c3ab62d9c27a7f06a
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Apr 8 07:51:39 2021 +0200

    CAMEL-16469 - Camel-AWS2-S3 - Streaming upload: restart from the last index 
when using the progressive naming strategy
---
 .../camel/component/aws2/s3/AWS2S3Configuration.java | 20 +++++++++++++++++---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java   |  6 +++++-
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
index 0f48a66..2d2782e 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.s3;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.aws2.s3.stream.AWSS3NamingStrategyEnum;
+import org.apache.camel.component.aws2.s3.stream.AWSS3RestartingPolicyEnum;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -121,6 +122,8 @@ public class AWS2S3Configuration implements Cloneable {
     private AWSS3NamingStrategyEnum namingStrategy = 
AWSS3NamingStrategyEnum.progressive;
     @UriParam(label = "producer")
     private long streamingUploadTimeout;
+    @UriParam(defaultValue = "override", label = "producer")
+    private AWSS3RestartingPolicyEnum restartingPolicy = 
AWSS3RestartingPolicyEnum.override;
 
     public long getPartSize() {
         return partSize;
@@ -602,7 +605,7 @@ public class AWS2S3Configuration implements Cloneable {
     }
 
     /**
-     * The number of messages composing a batch in stream mode
+     * The number of messages composing a batch in streaming upload mode
      */
     public void setBatchMessageNumber(int batchMessageNumber) {
         this.batchMessageNumber = batchMessageNumber;
@@ -613,7 +616,7 @@ public class AWS2S3Configuration implements Cloneable {
     }
 
     /**
-     * The batch size (in bytes) in stream mode
+     * The batch size (in bytes) in streaming upload mode
      */
     public void setBatchSize(int batchSize) {
         this.batchSize = batchSize;
@@ -624,7 +627,7 @@ public class AWS2S3Configuration implements Cloneable {
     }
 
     /**
-     * The naming strategy to use in stream mode
+     * The naming strategy to use in streaming upload mode
      */
     public void setNamingStrategy(AWSS3NamingStrategyEnum namingStrategy) {
         this.namingStrategy = namingStrategy;
@@ -641,6 +644,17 @@ public class AWS2S3Configuration implements Cloneable {
         this.streamingUploadTimeout = streamingUploadTimeout;
     }
 
+    public AWSS3RestartingPolicyEnum getRestartingPolicy() {
+        return restartingPolicy;
+    }
+
+    /**
+     * The restarting policy to use in streaming upload mode
+     */
+    public void setRestartingPolicy(AWSS3RestartingPolicyEnum 
restartingPolicy) {
+        this.restartingPolicy = restartingPolicy;
+    }
+
     public AWS2S3Configuration copy() {
         try {
             return (AWS2S3Configuration) super.clone();
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index e83719b..c9ce9d9 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -76,7 +76,9 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                     getConfiguration().getStreamingUploadTimeout(), 
getConfiguration().getStreamingUploadTimeout(),
                     TimeUnit.MILLISECONDS);
         }
-        setStartingPart();
+        if 
(getConfiguration().getRestartingPolicy().equals(AWSS3RestartingPolicyEnum.lastPart))
 {
+            setStartingPart();
+        }
     }
 
     @Override
@@ -291,6 +293,8 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                     part.getAndSet(1);
                 }
             }
+        } else {
+            LOG.info("lastPart restarting policy can be used only with 
progressive naming strategy");
         }
     }
 

Reply via email to