This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5e2c5a8b6664015adb95800c3ab62d9c27a7f06a Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Apr 8 07:51:39 2021 +0200 CAMEL-16469 - Camel-AWS2-S3 - Streaming upload: restart from the last index when using the progressive naming strategy --- .../camel/component/aws2/s3/AWS2S3Configuration.java | 20 +++++++++++++++++--- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 6 +++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java index 0f48a66..2d2782e 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java @@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.s3; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.aws2.s3.stream.AWSS3NamingStrategyEnum; +import org.apache.camel.component.aws2.s3.stream.AWSS3RestartingPolicyEnum; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -121,6 +122,8 @@ public class AWS2S3Configuration implements Cloneable { private AWSS3NamingStrategyEnum namingStrategy = AWSS3NamingStrategyEnum.progressive; @UriParam(label = "producer") private long streamingUploadTimeout; + @UriParam(defaultValue = "override", label = "producer") + private AWSS3RestartingPolicyEnum restartingPolicy = AWSS3RestartingPolicyEnum.override; public long getPartSize() { return partSize; @@ -602,7 +605,7 @@ public class AWS2S3Configuration implements Cloneable { } /** - * The number of messages composing a batch in stream mode + * The number of messages composing a batch in streaming upload mode */ public void setBatchMessageNumber(int batchMessageNumber) { this.batchMessageNumber = batchMessageNumber; @@ -613,7 +616,7 @@ public class AWS2S3Configuration implements Cloneable { } /** - * The batch size (in bytes) in stream mode + * The batch size (in bytes) in streaming upload mode */ public void setBatchSize(int batchSize) { this.batchSize = batchSize; @@ -624,7 +627,7 @@ public class AWS2S3Configuration implements Cloneable { } /** - * The naming strategy to use in stream mode + * The naming strategy to use in streaming upload mode */ public void setNamingStrategy(AWSS3NamingStrategyEnum namingStrategy) { this.namingStrategy = namingStrategy; @@ -641,6 +644,17 @@ public class AWS2S3Configuration implements Cloneable { this.streamingUploadTimeout = streamingUploadTimeout; } + public AWSS3RestartingPolicyEnum getRestartingPolicy() { + return restartingPolicy; + } + + /** + * The restarting policy to use in streaming upload mode + */ + public void setRestartingPolicy(AWSS3RestartingPolicyEnum restartingPolicy) { + this.restartingPolicy = restartingPolicy; + } + public AWS2S3Configuration copy() { try { return (AWS2S3Configuration) super.clone(); diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index e83719b..c9ce9d9 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -76,7 +76,9 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { getConfiguration().getStreamingUploadTimeout(), getConfiguration().getStreamingUploadTimeout(), TimeUnit.MILLISECONDS); } - setStartingPart(); + if (getConfiguration().getRestartingPolicy().equals(AWSS3RestartingPolicyEnum.lastPart)) { + setStartingPart(); + } } @Override @@ -291,6 +293,8 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { part.getAndSet(1); } } + } else { + LOG.info("lastPart restarting policy can be used only with progressive naming strategy"); } }