This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d80e2450cff67187e83a82c3b4ba8ef841d3ac36 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Apr 8 07:45:07 2021 +0200 CAMEL-16469 - Camel-AWS2-S3 - Streaming upload: restart from the last index when using the progressive naming strategy --- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 38 +++++++++++++++------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index aa981eb..e83719b 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.aws2.s3.stream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; @@ -38,17 +39,7 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.BucketCannedACL; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; -import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.ObjectCannedACL; -import software.amazon.awssdk.services.s3.model.ServerSideEncryption; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.utils.IoUtils; /** @@ -85,6 +76,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { getConfiguration().getStreamingUploadTimeout(), getConfiguration().getStreamingUploadTimeout(), TimeUnit.MILLISECONDS); } + setStartingPart(); } @Override @@ -278,6 +270,30 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { return dynamicKeyName; } + private void setStartingPart() { + if (getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.progressive)) { + ListObjectsRequest.Builder builder = ListObjectsRequest.builder().bucket(getConfiguration().getBucketName()) + .prefix(AWS2S3Utils.determineFileName(getConfiguration().getKeyName())); + ListObjectsResponse o = getEndpoint().getS3Client().listObjects(builder.build()); + if (o.contents().size() > 0) { + ArrayList<S3Object> list = new ArrayList<>(); + list.addAll(o.contents()); + list.sort(Comparator.comparing(S3Object::lastModified)); + int listSize = list.size(); + String fileName = AWS2S3Utils.determineFileName(list.get(listSize - 1).key()); + int position = fileName.lastIndexOf("-"); + if (position != -1) { + String partString = fileName.substring(position + 1); + if (ObjectHelper.isNotEmpty(partString)) { + part.getAndSet(Integer.valueOf(partString) + 1); + } + } else { + part.getAndSet(1); + } + } + } + } + protected AWS2S3Configuration getConfiguration() { return getEndpoint().getConfiguration(); }