This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d80e2450cff67187e83a82c3b4ba8ef841d3ac36
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Apr 8 07:45:07 2021 +0200

    CAMEL-16469 - Camel-AWS2-S3 - Streaming upload: restart from the last index 
when using the progressive naming strategy
---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 38 +++++++++++++++-------
 1 file changed, 27 insertions(+), 11 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index aa981eb..e83719b 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws2.s3.stream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
@@ -38,17 +39,7 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.sync.RequestBody;
-import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.BucketCannedACL;
-import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
-import 
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
-import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
-import software.amazon.awssdk.services.s3.model.CompletedPart;
-import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
-import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
-import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
-import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.*;
 import software.amazon.awssdk.utils.IoUtils;
 
 /**
@@ -85,6 +76,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                     getConfiguration().getStreamingUploadTimeout(), 
getConfiguration().getStreamingUploadTimeout(),
                     TimeUnit.MILLISECONDS);
         }
+        setStartingPart();
     }
 
     @Override
@@ -278,6 +270,30 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
         return dynamicKeyName;
     }
 
+    private void setStartingPart() {
+        if 
(getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.progressive))
 {
+            ListObjectsRequest.Builder builder = 
ListObjectsRequest.builder().bucket(getConfiguration().getBucketName())
+                    
.prefix(AWS2S3Utils.determineFileName(getConfiguration().getKeyName()));
+            ListObjectsResponse o = 
getEndpoint().getS3Client().listObjects(builder.build());
+            if (o.contents().size() > 0) {
+                ArrayList<S3Object> list = new ArrayList<>();
+                list.addAll(o.contents());
+                list.sort(Comparator.comparing(S3Object::lastModified));
+                int listSize = list.size();
+                String fileName = 
AWS2S3Utils.determineFileName(list.get(listSize - 1).key());
+                int position = fileName.lastIndexOf("-");
+                if (position != -1) {
+                    String partString = fileName.substring(position + 1);
+                    if (ObjectHelper.isNotEmpty(partString)) {
+                        part.getAndSet(Integer.valueOf(partString) + 1);
+                    }
+                } else {
+                    part.getAndSet(1);
+                }
+            }
+        }
+    }
+
     protected AWS2S3Configuration getConfiguration() {
         return getEndpoint().getConfiguration();
     }

Reply via email to