This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 11559d4de8f [CAMEL-20728] add S3 multipart upload support for stream producer in camel-aws2-s3 (#14062) 11559d4de8f is described below commit 11559d4de8fcd114900fabe1e790c1a6dd1c3d74 Author: Benjamin BONNET <benjamin.bon...@m4x.org> AuthorDate: Tue May 7 10:36:46 2024 +0200 [CAMEL-20728] add S3 multipart upload support for stream producer in camel-aws2-s3 (#14062) * add S3 multipart upload support for stream producer * clean useless condition * review feedback on logging --- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 72 +++++++++++---- .../integration/S3StreamUploadS3MultipartIT.java | 102 +++++++++++++++++++++ 2 files changed, 154 insertions(+), 20 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index 9fe8bb11042..8d23db6f1a5 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -127,21 +127,36 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { UploadState state = null; int totalSize = 0; byte[] b; - while ((b = AWS2S3Utils.toByteArray(is, getConfiguration().getBufferSize())).length > 0) { + int maxRead = (getConfiguration().isMultiPartUpload() + ? Math.toIntExact(getConfiguration().getPartSize()) : getConfiguration().getBufferSize()); + if (uploadAggregate != null) { + uploadAggregate.index++; + maxRead -= uploadAggregate.buffer.size(); + } + + while ((b = AWS2S3Utils.toByteArray(is, maxRead)).length + > 0) { totalSize += b.length; + if (getConfiguration().isMultiPartUpload()) + maxRead -= b.length; synchronized (lock) { // aggregate with previously received exchanges if (ObjectHelper.isNotEmpty(uploadAggregate)) { uploadAggregate.buffer.write(b); - uploadAggregate.index++; - + if (getConfiguration().isMultiPartUpload() && + uploadAggregate.buffer.size() >= getConfiguration().getPartSize()) { + uploadPart(uploadAggregate); + maxRead = Math.toIntExact(getConfiguration().getPartSize()); + continue; + } if (uploadAggregate.buffer.size() >= getConfiguration().getBatchSize() - || uploadAggregate.index == getConfiguration().getBatchMessageNumber()) { + || (uploadAggregate.index >= getConfiguration().getBatchMessageNumber() + && uploadAggregate.buffer.size() < getConfiguration().getPartSize())) { - uploadPart(uploadAggregate); + if (uploadAggregate.buffer.size() > 0) + uploadPart(uploadAggregate); CompleteMultipartUploadResponse uploadResult = completeUpload(uploadAggregate); this.uploadAggregate = null; - Message message = getMessageForResponse(exchange); message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag()); if (uploadResult.versionId() != null) { @@ -151,11 +166,10 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { continue; } } - if (state == null) { state = new UploadState(); } else { - state.index++; + state.index = 1; } state.buffer.write(b); @@ -201,17 +215,21 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { try { if (totalSize >= getConfiguration().getBatchSize() || state.buffer.size() >= getConfiguration().getBatchSize() - || state.index == getConfiguration().getBatchMessageNumber()) { + || state.index >= getConfiguration().getBatchMessageNumber()) { uploadPart(state); CompleteMultipartUploadResponse uploadResult = completeUpload(state); - Message message = getMessageForResponse(exchange); message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag()); if (uploadResult.versionId() != null) { message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId()); } state = null; + continue; + } + if (getConfiguration().isMultiPartUpload() && state.buffer.size() >= getConfiguration().getPartSize()) { + uploadPart(state); + maxRead = Math.toIntExact(getConfiguration().getPartSize()); } } catch (Exception e) { @@ -244,29 +262,41 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { .uploadId(state.initResponse.uploadId()) .build(); - CompleteMultipartUploadResponse uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest); + try { + final CompleteMultipartUploadResponse uploadResult + = getEndpoint().getS3Client().completeMultipartUpload(compRequest); - // Converting the index to String can cause extra overhead - if (LOG.isInfoEnabled()) { - LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(), - state.index); + // Converting the index to String can cause extra overhead + if (LOG.isDebugEnabled()) { + LOG.debug("Completed upload for the part {}, multipart {} with etag {} at index {}", part, state.multipartIndex, + uploadResult.eTag(), + state.index); + } + part.getAndIncrement(); + return uploadResult; + } catch (Exception e) { + LOG.warn("Error completing multipart upload - Multipart upload will be aborted", e); + getEndpoint().getS3Client() + .abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()) + .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()).build()); + throw e; } - return uploadResult; } private void uploadPart(UploadState state) { UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()) .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()) - .partNumber(state.index).build(); + .partNumber(state.multipartIndex).build(); - LOG.trace("Uploading part {} at index {} for {}", state.part, state.index, getConfiguration().getKeyName()); + LOG.trace("Uploading part {}, multipart {} at index {} for {}", state.part, state.multipartIndex, state.index, + getConfiguration().getKeyName()); String etag = getEndpoint().getS3Client() .uploadPart(uploadRequest, RequestBody.fromBytes(state.buffer.toByteArray())).eTag(); - CompletedPart partUpload = CompletedPart.builder().partNumber(state.index).eTag(etag).build(); + CompletedPart partUpload = CompletedPart.builder().partNumber(state.multipartIndex).eTag(etag).build(); state.completedParts.add(partUpload); state.buffer.reset(); - part.getAndIncrement(); + state.multipartIndex++; } private String fileNameToUpload( @@ -360,6 +390,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { private class UploadState { int index; + int multipartIndex; int part; List<CompletedPart> completedParts = new ArrayList<>(); ByteArrayOutputStream buffer; @@ -369,6 +400,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { UploadState() { index = 1; + multipartIndex = 1; part = AWS2S3StreamUploadProducer.this.part.get(); buffer = new ByteArrayOutputStream(); } diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java new file mode 100644 index 00000000000..c78817420ba --- /dev/null +++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.s3.integration; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.component.aws2.s3.AWS2S3Operations; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.model.S3Object; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class S3StreamUploadS3MultipartIT extends Aws2S3Base { + + @EndpointInject + private ProducerTemplate template; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @Test + public void sendIn() throws Exception { + result.expectedMessageCount(10); + + for (int i = 0; i < 10; i++) { + template.send("direct:stream1", new Processor() { + + @Override + public void process(Exchange exchange) { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "empty.bin"); + exchange.getIn().setBody(new File("src/test/resources/empty.bin")); + } + }); + } + + Exchange ex = template.request("direct:listObjects", new Processor() { + + @Override + public void process(Exchange exchange) { + exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects); + } + }); + + List<S3Object> resp = ex.getMessage().getBody(List.class); + // expect 1 file + assertEquals(1, resp.size()); + // file size: 5,242,880 bytes + assertEquals(10 * Files.size(Paths.get("src/test/resources/empty.bin")), + resp.stream().mapToLong(S3Object::size).sum()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + String awsEndpoint1 + = "aws2-s3://mycamel-1?autoCreateBucket=true" + + "&streamingUploadMode=true" + + "&keyName=fileTest.txt" + + "&batchMessageNumber=10" + + "&batchSize=1000000000" + + "&namingStrategy=random" + + "&multiPartUpload=true" + + "&bufferSize=0" + + "&partSize=10000000"; + + from("direct:stream1").process(exchange -> { + }).to(awsEndpoint1).process(exchange -> { + }).to("mock:result"); + + String awsEndpoint = "aws2-s3://mycamel-1?autoCreateBucket=true"; + + from("direct:listObjects").to(awsEndpoint); + } + }; + } +}