This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 11559d4de8f [CAMEL-20728] add S3 multipart upload support for stream 
producer in camel-aws2-s3 (#14062)
11559d4de8f is described below

commit 11559d4de8fcd114900fabe1e790c1a6dd1c3d74
Author: Benjamin BONNET <benjamin.bon...@m4x.org>
AuthorDate: Tue May 7 10:36:46 2024 +0200

    [CAMEL-20728] add S3 multipart upload support for stream producer in 
camel-aws2-s3 (#14062)
    
    * add S3 multipart upload support for stream producer
    
    * clean useless condition
    
    * review feedback on logging
---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java |  72 +++++++++++----
 .../integration/S3StreamUploadS3MultipartIT.java   | 102 +++++++++++++++++++++
 2 files changed, 154 insertions(+), 20 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 9fe8bb11042..8d23db6f1a5 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -127,21 +127,36 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
         UploadState state = null;
         int totalSize = 0;
         byte[] b;
-        while ((b = AWS2S3Utils.toByteArray(is, 
getConfiguration().getBufferSize())).length > 0) {
+        int maxRead = (getConfiguration().isMultiPartUpload()
+                ? Math.toIntExact(getConfiguration().getPartSize()) : 
getConfiguration().getBufferSize());
+        if (uploadAggregate != null) {
+            uploadAggregate.index++;
+            maxRead -= uploadAggregate.buffer.size();
+        }
+
+        while ((b = AWS2S3Utils.toByteArray(is, maxRead)).length
+               > 0) {
             totalSize += b.length;
+            if (getConfiguration().isMultiPartUpload())
+                maxRead -= b.length;
             synchronized (lock) {
                 // aggregate with previously received exchanges
                 if (ObjectHelper.isNotEmpty(uploadAggregate)) {
                     uploadAggregate.buffer.write(b);
-                    uploadAggregate.index++;
-
+                    if (getConfiguration().isMultiPartUpload() &&
+                            uploadAggregate.buffer.size() >= 
getConfiguration().getPartSize()) {
+                        uploadPart(uploadAggregate);
+                        maxRead = 
Math.toIntExact(getConfiguration().getPartSize());
+                        continue;
+                    }
                     if (uploadAggregate.buffer.size() >= 
getConfiguration().getBatchSize()
-                            || uploadAggregate.index == 
getConfiguration().getBatchMessageNumber()) {
+                            || (uploadAggregate.index >= 
getConfiguration().getBatchMessageNumber()
+                                    && uploadAggregate.buffer.size() < 
getConfiguration().getPartSize())) {
 
-                        uploadPart(uploadAggregate);
+                        if (uploadAggregate.buffer.size() > 0)
+                            uploadPart(uploadAggregate);
                         CompleteMultipartUploadResponse uploadResult = 
completeUpload(uploadAggregate);
                         this.uploadAggregate = null;
-
                         Message message = getMessageForResponse(exchange);
                         message.setHeader(AWS2S3Constants.E_TAG, 
uploadResult.eTag());
                         if (uploadResult.versionId() != null) {
@@ -151,11 +166,10 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                     continue;
                 }
             }
-
             if (state == null) {
                 state = new UploadState();
             } else {
-                state.index++;
+                state.index = 1;
             }
             state.buffer.write(b);
 
@@ -201,17 +215,21 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
             try {
                 if (totalSize >= getConfiguration().getBatchSize()
                         || state.buffer.size() >= 
getConfiguration().getBatchSize()
-                        || state.index == 
getConfiguration().getBatchMessageNumber()) {
+                        || state.index >= 
getConfiguration().getBatchMessageNumber()) {
 
                     uploadPart(state);
                     CompleteMultipartUploadResponse uploadResult = 
completeUpload(state);
-
                     Message message = getMessageForResponse(exchange);
                     message.setHeader(AWS2S3Constants.E_TAG, 
uploadResult.eTag());
                     if (uploadResult.versionId() != null) {
                         message.setHeader(AWS2S3Constants.VERSION_ID, 
uploadResult.versionId());
                     }
                     state = null;
+                    continue;
+                }
+                if (getConfiguration().isMultiPartUpload() && 
state.buffer.size() >= getConfiguration().getPartSize()) {
+                    uploadPart(state);
+                    maxRead = 
Math.toIntExact(getConfiguration().getPartSize());
                 }
 
             } catch (Exception e) {
@@ -244,29 +262,41 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                         .uploadId(state.initResponse.uploadId())
                         .build();
 
-        CompleteMultipartUploadResponse uploadResult = 
getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+        try {
+            final CompleteMultipartUploadResponse uploadResult
+                    = 
getEndpoint().getS3Client().completeMultipartUpload(compRequest);
 
-        // Converting the index to String can cause extra overhead
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Completed upload for the part {} with etag {} at index 
{}", part, uploadResult.eTag(),
-                    state.index);
+            // Converting the index to String can cause extra overhead
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Completed upload for the part {}, multipart {} with 
etag {} at index {}", part, state.multipartIndex,
+                        uploadResult.eTag(),
+                        state.index);
+            }
+            part.getAndIncrement();
+            return uploadResult;
+        } catch (Exception e) {
+            LOG.warn("Error completing multipart upload - Multipart upload 
will be aborted", e);
+            getEndpoint().getS3Client()
+                    
.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+                            
.key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()).build());
+            throw e;
         }
-        return uploadResult;
     }
 
     private void uploadPart(UploadState state) {
         UploadPartRequest uploadRequest = 
UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
                 
.key(state.dynamicKeyName).uploadId(state.initResponse.uploadId())
-                .partNumber(state.index).build();
+                .partNumber(state.multipartIndex).build();
 
-        LOG.trace("Uploading part {} at index {} for {}", state.part, 
state.index, getConfiguration().getKeyName());
+        LOG.trace("Uploading part {}, multipart {} at index {} for {}", 
state.part, state.multipartIndex, state.index,
+                getConfiguration().getKeyName());
 
         String etag = getEndpoint().getS3Client()
                 .uploadPart(uploadRequest, 
RequestBody.fromBytes(state.buffer.toByteArray())).eTag();
-        CompletedPart partUpload = 
CompletedPart.builder().partNumber(state.index).eTag(etag).build();
+        CompletedPart partUpload = 
CompletedPart.builder().partNumber(state.multipartIndex).eTag(etag).build();
         state.completedParts.add(partUpload);
         state.buffer.reset();
-        part.getAndIncrement();
+        state.multipartIndex++;
     }
 
     private String fileNameToUpload(
@@ -360,6 +390,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
 
     private class UploadState {
         int index;
+        int multipartIndex;
         int part;
         List<CompletedPart> completedParts = new ArrayList<>();
         ByteArrayOutputStream buffer;
@@ -369,6 +400,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
 
         UploadState() {
             index = 1;
+            multipartIndex = 1;
             part = AWS2S3StreamUploadProducer.this.part.get();
             buffer = new ByteArrayOutputStream();
         }
diff --git 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java
 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java
new file mode 100644
index 00000000000..c78817420ba
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3.integration;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.aws2.s3.AWS2S3Operations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class S3StreamUploadS3MultipartIT extends Aws2S3Base {
+
+    @EndpointInject
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendIn() throws Exception {
+        result.expectedMessageCount(10);
+
+        for (int i = 0; i < 10; i++) {
+            template.send("direct:stream1", new Processor() {
+
+                @Override
+                public void process(Exchange exchange) {
+                    exchange.getIn().setHeader(AWS2S3Constants.KEY, 
"empty.bin");
+                    exchange.getIn().setBody(new 
File("src/test/resources/empty.bin"));
+                }
+            });
+        }
+
+        Exchange ex = template.request("direct:listObjects", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, 
AWS2S3Operations.listObjects);
+            }
+        });
+
+        List<S3Object> resp = ex.getMessage().getBody(List.class);
+        // expect 1 file
+        assertEquals(1, resp.size());
+        // file size: 5,242,880 bytes
+        assertEquals(10 * 
Files.size(Paths.get("src/test/resources/empty.bin")),
+                resp.stream().mapToLong(S3Object::size).sum());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                String awsEndpoint1
+                        = "aws2-s3://mycamel-1?autoCreateBucket=true" +
+                          "&streamingUploadMode=true" +
+                          "&keyName=fileTest.txt" +
+                          "&batchMessageNumber=10" +
+                          "&batchSize=1000000000" +
+                          "&namingStrategy=random" +
+                          "&multiPartUpload=true" +
+                          "&bufferSize=0" +
+                          "&partSize=10000000";
+
+                from("direct:stream1").process(exchange -> {
+                }).to(awsEndpoint1).process(exchange -> {
+                }).to("mock:result");
+
+                String awsEndpoint = 
"aws2-s3://mycamel-1?autoCreateBucket=true";
+
+                from("direct:listObjects").to(awsEndpoint);
+            }
+        };
+    }
+}

Reply via email to