This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a17ddae5b34 CAMEL-17027 ensure all file chunks are uploaded when streaming large files (#11208) a17ddae5b34 is described below commit a17ddae5b3440f4ff0a6d8d0083f3ebdc0c8998a Author: Jono Morris <jono.mor...@xtra.co.nz> AuthorDate: Mon Aug 28 17:08:46 2023 +1200 CAMEL-17027 ensure all file chunks are uploaded when streaming large files (#11208) --- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 106 +++++++++++---------- .../s3/integration/S3StreamUploadMultipartIT.java | 94 ++++++++++++++++++ 2 files changed, 150 insertions(+), 50 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index e82398a5b77..e491479f961 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -132,68 +132,74 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { public void process(final Exchange exchange) throws Exception { InputStream is = exchange.getIn().getMandatoryBody(InputStream.class); - buffer.write(AWS2S3Utils.toByteArray(is, getConfiguration().getBufferSize())); - - final String keyName = getConfiguration().getKeyName(); - final String fileName = AWS2S3Utils.determineFileName(keyName); - final String extension = AWS2S3Utils.determineFileExtension(keyName); - if (index.get() == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) { - id = UUID.randomUUID(); - } - dynamicKeyName = fileNameToUpload(fileName, getConfiguration().getNamingStrategy(), extension, part, id); - CreateMultipartUploadRequest.Builder createMultipartUploadRequest - = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName); + int totalSize = 0; + byte[] b; + while ((b = AWS2S3Utils.toByteArray(is, getConfiguration().getBufferSize())).length > 0) { + totalSize += b.length; + buffer.write(b); + + final String keyName = getConfiguration().getKeyName(); + final String fileName = AWS2S3Utils.determineFileName(keyName); + final String extension = AWS2S3Utils.determineFileExtension(keyName); + if (index.get() == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) { + id = UUID.randomUUID(); + } + dynamicKeyName = fileNameToUpload(fileName, getConfiguration().getNamingStrategy(), extension, part, id); + CreateMultipartUploadRequest.Builder createMultipartUploadRequest + = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName); - String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration()); - if (storageClass != null) { - createMultipartUploadRequest.storageClass(storageClass); - } + String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration()); + if (storageClass != null) { + createMultipartUploadRequest.storageClass(storageClass); + } - String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class); - if (cannedAcl != null) { - ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl); - createMultipartUploadRequest.acl(objectAcl); - } + String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class); + if (cannedAcl != null) { + ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl); + createMultipartUploadRequest.acl(objectAcl); + } - BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class); - if (acl != null) { - // note: if cannedacl and acl are both specified the last one will - // be used. refer to - // PutObjectRequest#setAccessControlList for more details - createMultipartUploadRequest.acl(acl.toString()); - } + BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class); + if (acl != null) { + // note: if cannedacl and acl are both specified the last one will + // be used. refer to + // PutObjectRequest#setAccessControlList for more details + createMultipartUploadRequest.acl(acl.toString()); + } - AWS2S3Utils.setEncryption(createMultipartUploadRequest, getConfiguration()); + AWS2S3Utils.setEncryption(createMultipartUploadRequest, getConfiguration()); - LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange); - if (index.get() == 1) { - initResponse - = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build()); - completedParts = new ArrayList<>(); - } + LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange); + if (index.get() == 1) { + initResponse + = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build()); + completedParts = new ArrayList<>(); + } - try { - if (buffer.size() >= getConfiguration().getBatchSize() - || index.get() == getConfiguration().getBatchMessageNumber()) { + try { + if (totalSize >= getConfiguration().getBatchSize() + || buffer.size() >= getConfiguration().getBatchSize() + || index.get() == getConfiguration().getBatchMessageNumber()) { - uploadPart(); - completeUpload(); + uploadPart(); + completeUpload(); - Message message = getMessageForResponse(exchange); - message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag()); - if (uploadResult.versionId() != null) { - message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId()); + Message message = getMessageForResponse(exchange); + message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag()); + if (uploadResult.versionId() != null) { + message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId()); + } } + + } catch (Exception e) { + getEndpoint().getS3Client() + .abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()) + .key(dynamicKeyName).uploadId(initResponse.uploadId()).build()); + throw e; } - } catch (Exception e) { - getEndpoint().getS3Client() - .abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()) - .key(dynamicKeyName).uploadId(initResponse.uploadId()).build()); - throw e; + index.getAndIncrement(); } - - index.getAndIncrement(); } private void completeUpload() { diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java new file mode 100644 index 00000000000..8069c22c257 --- /dev/null +++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.s3.integration; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.component.aws2.s3.AWS2S3Operations; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.model.S3Object; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class S3StreamUploadMultipartIT extends Aws2S3Base { + + @EndpointInject + private ProducerTemplate template; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @Test + public void sendIn() throws Exception { + result.expectedMessageCount(10); + + for (int i = 0; i < 10; i++) { + template.send("direct:stream1", new Processor() { + + @Override + public void process(Exchange exchange) { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "empty.bin"); + exchange.getIn().setBody(new File("src/test/resources/empty.bin")); + } + }); + } + + MockEndpoint.assertIsSatisfied(context); + + Exchange ex = template.request("direct:listObjects", new Processor() { + + @Override + public void process(Exchange exchange) { + exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects); + } + }); + + // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder of 242,880) + List<S3Object> resp = ex.getMessage().getBody(List.class); + assertEquals(60, resp.size()); + + assertEquals( 10 * Files.size(Paths.get("src/test/resources/empty.bin")), + resp.stream().mapToLong(S3Object::size).sum()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + String awsEndpoint1 + = "aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random"; + + from("direct:stream1").to(awsEndpoint1).to("mock:result"); + + String awsEndpoint = "aws2-s3://mycamel-1?autoCreateBucket=true"; + + from("direct:listObjects").to(awsEndpoint); + } + }; + } +}