This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 844da25b4ebf690d2625a543dd154eb9fdabc37d Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Sun Apr 4 16:37:24 2021 +0200 CAMEL-16185 - AWS S3: improve multipart support - streaming upload --- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 89 ++++++++++++++-------- .../S3StreamUploadOperationLocalstackTest.java | 3 + ...va => S3StreamUploadTimeoutLocalstackTest.java} | 29 ++----- 3 files changed, 68 insertions(+), 53 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index 32c6f28..f4d544d 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -66,9 +66,9 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { AtomicInteger part = new AtomicInteger(); UUID id; String dynamicKeyName; + CompleteMultipartUploadResponse uploadResult; private transient String s3ProducerToString; private ScheduledExecutorService timeoutCheckerExecutorService; - private boolean timeout; @Override protected void doStart() throws Exception { @@ -76,7 +76,21 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { timeoutCheckerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "timeout_checker"); - timeoutCheckerExecutorService.scheduleAtFixedRate(new AggregationIntervalTask(), 1, 1, TimeUnit.SECONDS); + timeoutCheckerExecutorService.scheduleAtFixedRate(new AggregationIntervalTask(), 10, 10, TimeUnit.SECONDS); + } + + @Override + protected void doStop() throws Exception { + if (ObjectHelper.isNotEmpty(initResponse)) { + if (ObjectHelper.isNotEmpty(initResponse.uploadId())) { + if (index.get() > 0) { + uploadPart(); + completeUpload(); + } + } + } + super.doStop(); + } /** @@ -86,8 +100,14 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { @Override public void run() { - timeout = true; - LOG.info("timeout triggered"); + if (ObjectHelper.isNotEmpty(initResponse)) { + if (ObjectHelper.isNotEmpty(initResponse.uploadId())) { + if (index.get() > 0) { + uploadPart(); + completeUpload(); + } + } + } } } @@ -157,41 +177,15 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { try { if (buffer.size() >= getConfiguration().getBatchSize() || index.get() == getConfiguration().getBatchMessageNumber()) { - LOG.info("Timeout " + timeout); - - UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()) - .key(dynamicKeyName).uploadId(initResponse.uploadId()) - .partNumber(index.get()).build(); - LOG.trace("Uploading part {} at index {} for {}", part, index, keyName); + uploadPart(); + completeUpload(); - String etag = getEndpoint().getS3Client() - .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag(); - CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build(); - completedParts.add(partUpload); - buffer.reset(); - part.getAndIncrement(); - } - - if (index.get() == getConfiguration().getBatchMessageNumber() || timeout) { - CompletedMultipartUpload completeMultipartUpload - = CompletedMultipartUpload.builder().parts(completedParts).build(); - CompleteMultipartUploadRequest compRequest - = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload) - .bucket(getConfiguration().getBucketName()).key(dynamicKeyName) - .uploadId(initResponse.uploadId()) - .build(); - - CompleteMultipartUploadResponse uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest); - LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(), - index); Message message = getMessageForResponse(exchange); message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag()); if (uploadResult.versionId() != null) { message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId()); } - timeout = false; - index.getAndSet(0); } } catch (Exception e) { @@ -204,6 +198,37 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { index.getAndIncrement(); } + private void completeUpload() { + CompletedMultipartUpload completeMultipartUpload + = CompletedMultipartUpload.builder().parts(completedParts).build(); + CompleteMultipartUploadRequest compRequest + = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload) + .bucket(getConfiguration().getBucketName()).key(dynamicKeyName) + .uploadId(initResponse.uploadId()) + .build(); + + uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest); + LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(), + index); + + index.getAndSet(0); + } + + private void uploadPart() { + UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()) + .key(dynamicKeyName).uploadId(initResponse.uploadId()) + .partNumber(index.get()).build(); + + LOG.trace("Uploading part {} at index {} for {}", part, index, getConfiguration().getKeyName()); + + String etag = getEndpoint().getS3Client() + .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag(); + CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build(); + completedParts.add(partUpload); + buffer.reset(); + part.getAndIncrement(); + } + private String fileNameToUpload( String fileName, AWSS3NamingStrategyEnum strategy, String ext, AtomicInteger part, UUID id) { String dynamicKeyName; diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java index 6a37b9ad..0db9740 100644 --- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java +++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java @@ -60,6 +60,9 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest { Thread.sleep(30000); List<S3Object> resp = ex.getMessage().getBody(List.class); assertEquals(40, resp.size()); + for (S3Object s3Object : resp) { + System.err.println(s3Object.key()); + } } @Override diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java similarity index 65% copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java index 6a37b9ad..f42a153 100644 --- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java +++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java @@ -1,19 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.camel.component.aws2.s3.localstack; import java.util.List; @@ -31,7 +15,7 @@ import software.amazon.awssdk.services.s3.model.S3Object; import static org.junit.jupiter.api.Assertions.assertEquals; -public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest { +public class S3StreamUploadTimeoutLocalstackTest extends Aws2S3BaseTest { @EndpointInject private ProducerTemplate template; @@ -41,12 +25,13 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest { @Test public void sendIn() throws Exception { - result.expectedMessageCount(1000); + result.expectedMessageCount(23); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 23; i++) { template.sendBody("direct:stream1", "Andrea\n"); } + Thread.sleep(11000); assertMockEndpointsSatisfied(); Exchange ex = template.request("direct:listObjects", new Processor() { @@ -57,9 +42,11 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest { } }); - Thread.sleep(30000); List<S3Object> resp = ex.getMessage().getBody(List.class); - assertEquals(40, resp.size()); + assertEquals(1, resp.size()); + for (S3Object s3Object : resp) { + System.err.println(s3Object.key()); + } } @Override