This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a17ddae5b34 CAMEL-17027 ensure all file chunks are uploaded when 
streaming large files (#11208)
a17ddae5b34 is described below

commit a17ddae5b3440f4ff0a6d8d0083f3ebdc0c8998a
Author: Jono Morris <jono.mor...@xtra.co.nz>
AuthorDate: Mon Aug 28 17:08:46 2023 +1200

    CAMEL-17027 ensure all file chunks are uploaded when streaming large files 
(#11208)
---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 106 +++++++++++----------
 .../s3/integration/S3StreamUploadMultipartIT.java  |  94 ++++++++++++++++++
 2 files changed, 150 insertions(+), 50 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index e82398a5b77..e491479f961 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -132,68 +132,74 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     public void process(final Exchange exchange) throws Exception {
         InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
 
-        buffer.write(AWS2S3Utils.toByteArray(is, 
getConfiguration().getBufferSize()));
-
-        final String keyName = getConfiguration().getKeyName();
-        final String fileName = AWS2S3Utils.determineFileName(keyName);
-        final String extension = AWS2S3Utils.determineFileExtension(keyName);
-        if (index.get() == 1 && 
getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) {
-            id = UUID.randomUUID();
-        }
-        dynamicKeyName = fileNameToUpload(fileName, 
getConfiguration().getNamingStrategy(), extension, part, id);
-        CreateMultipartUploadRequest.Builder createMultipartUploadRequest
-                = 
CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName);
+        int totalSize = 0;
+        byte[] b;
+        while ((b = AWS2S3Utils.toByteArray(is, 
getConfiguration().getBufferSize())).length > 0) {
+            totalSize += b.length;
+            buffer.write(b);
+
+            final String keyName = getConfiguration().getKeyName();
+            final String fileName = AWS2S3Utils.determineFileName(keyName);
+            final String extension = 
AWS2S3Utils.determineFileExtension(keyName);
+            if (index.get() == 1 && 
getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) {
+                id = UUID.randomUUID();
+            }
+            dynamicKeyName = fileNameToUpload(fileName, 
getConfiguration().getNamingStrategy(), extension, part, id);
+            CreateMultipartUploadRequest.Builder createMultipartUploadRequest
+                    = 
CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName);
 
-        String storageClass = AWS2S3Utils.determineStorageClass(exchange, 
getConfiguration());
-        if (storageClass != null) {
-            createMultipartUploadRequest.storageClass(storageClass);
-        }
+            String storageClass = AWS2S3Utils.determineStorageClass(exchange, 
getConfiguration());
+            if (storageClass != null) {
+                createMultipartUploadRequest.storageClass(storageClass);
+            }
 
-        String cannedAcl = 
exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class);
-        if (cannedAcl != null) {
-            ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl);
-            createMultipartUploadRequest.acl(objectAcl);
-        }
+            String cannedAcl = 
exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class);
+            if (cannedAcl != null) {
+                ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl);
+                createMultipartUploadRequest.acl(objectAcl);
+            }
 
-        BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, 
BucketCannedACL.class);
-        if (acl != null) {
-            // note: if cannedacl and acl are both specified the last one will
-            // be used. refer to
-            // PutObjectRequest#setAccessControlList for more details
-            createMultipartUploadRequest.acl(acl.toString());
-        }
+            BucketCannedACL acl = 
exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class);
+            if (acl != null) {
+                // note: if cannedacl and acl are both specified the last one 
will
+                // be used. refer to
+                // PutObjectRequest#setAccessControlList for more details
+                createMultipartUploadRequest.acl(acl.toString());
+            }
 
-        AWS2S3Utils.setEncryption(createMultipartUploadRequest, 
getConfiguration());
+            AWS2S3Utils.setEncryption(createMultipartUploadRequest, 
getConfiguration());
 
-        LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", 
createMultipartUploadRequest, exchange);
-        if (index.get() == 1) {
-            initResponse
-                    = 
getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
-            completedParts = new ArrayList<>();
-        }
+            LOG.trace("Initiating multipart upload [{}] from exchange 
[{}]...", createMultipartUploadRequest, exchange);
+            if (index.get() == 1) {
+                initResponse
+                        = 
getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
+                completedParts = new ArrayList<>();
+            }
 
-        try {
-            if (buffer.size() >= getConfiguration().getBatchSize()
-                    || index.get() == 
getConfiguration().getBatchMessageNumber()) {
+            try {
+                if (totalSize >= getConfiguration().getBatchSize()
+                        || buffer.size() >= getConfiguration().getBatchSize()
+                        || index.get() == 
getConfiguration().getBatchMessageNumber()) {
 
-                uploadPart();
-                completeUpload();
+                    uploadPart();
+                    completeUpload();
 
-                Message message = getMessageForResponse(exchange);
-                message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
-                if (uploadResult.versionId() != null) {
-                    message.setHeader(AWS2S3Constants.VERSION_ID, 
uploadResult.versionId());
+                    Message message = getMessageForResponse(exchange);
+                    message.setHeader(AWS2S3Constants.E_TAG, 
uploadResult.eTag());
+                    if (uploadResult.versionId() != null) {
+                        message.setHeader(AWS2S3Constants.VERSION_ID, 
uploadResult.versionId());
+                    }
                 }
+
+            } catch (Exception e) {
+                getEndpoint().getS3Client()
+                        
.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+                                
.key(dynamicKeyName).uploadId(initResponse.uploadId()).build());
+                throw e;
             }
 
-        } catch (Exception e) {
-            getEndpoint().getS3Client()
-                    
.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
-                            
.key(dynamicKeyName).uploadId(initResponse.uploadId()).build());
-            throw e;
+            index.getAndIncrement();
         }
-
-        index.getAndIncrement();
     }
 
     private void completeUpload() {
diff --git 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
new file mode 100644
index 00000000000..8069c22c257
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3.integration;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.aws2.s3.AWS2S3Operations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class S3StreamUploadMultipartIT extends Aws2S3Base {
+
+    @EndpointInject
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendIn() throws Exception {
+        result.expectedMessageCount(10);
+
+        for (int i = 0; i < 10; i++) {
+            template.send("direct:stream1", new Processor() {
+
+                @Override
+                public void process(Exchange exchange) {
+                    exchange.getIn().setHeader(AWS2S3Constants.KEY, 
"empty.bin");
+                    exchange.getIn().setBody(new 
File("src/test/resources/empty.bin"));
+                }
+            });
+        }
+
+        MockEndpoint.assertIsSatisfied(context);
+
+        Exchange ex = template.request("direct:listObjects", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, 
AWS2S3Operations.listObjects);
+            }
+        });
+
+        // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder 
of 242,880)
+        List<S3Object> resp = ex.getMessage().getBody(List.class);
+        assertEquals(60, resp.size());
+
+        assertEquals( 10 * 
Files.size(Paths.get("src/test/resources/empty.bin")),
+                resp.stream().mapToLong(S3Object::size).sum());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                String awsEndpoint1
+                        = 
"aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
+
+                from("direct:stream1").to(awsEndpoint1).to("mock:result");
+
+                String awsEndpoint = 
"aws2-s3://mycamel-1?autoCreateBucket=true";
+
+                from("direct:listObjects").to(awsEndpoint);
+            }
+        };
+    }
+}

Reply via email to