This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2846fc08880c1316acc20e8b3df9f2e1019bd54d Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Mar 18 12:13:41 2020 +0100 CAMEL-14663 - Camel-AWS2 S3: Add support for multipart upload --- .../camel/component/aws2/s3/AWS2S3Producer.java | 114 ++++++++++++++++++++- .../src/test/resources/log4j2.properties | 2 +- 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java index a0d1fa2..71d2640 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java @@ -22,7 +22,9 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.camel.Endpoint; @@ -36,14 +38,22 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.BucketCannedACL; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.CopyObjectResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -55,6 +65,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; /** * A Producer which sends messages to the Amazon Web Service Simple Storage @@ -74,7 +85,11 @@ public class AWS2S3Producer extends DefaultProducer { public void process(final Exchange exchange) throws Exception { AWS2S3Operations operation = determineOperation(exchange); if (ObjectHelper.isEmpty(operation)) { - processSingleOp(exchange); + if (getConfiguration().isMultiPartUpload()) { + processMultiPart(exchange); + } else { + processSingleOp(exchange); + } } else { switch (operation) { case copyObject: @@ -103,6 +118,103 @@ public class AWS2S3Producer extends DefaultProducer { } } } + + public void processMultiPart(final Exchange exchange) throws Exception { + File filePayload = null; + Object obj = exchange.getIn().getMandatoryBody(); + // Need to check if the message body is WrappedFile + if (obj instanceof WrappedFile) { + obj = ((WrappedFile<?>)obj).getFile(); + } + if (obj instanceof File) { + filePayload = (File)obj; + } else { + throw new IllegalArgumentException("aws-s3: MultiPart upload requires a File input."); + } + + Map<String, String> objectMetadata = determineMetadata(exchange); + if (objectMetadata.containsKey("Content-Length")) { + if (objectMetadata.get("Content-Length").equalsIgnoreCase("0")) { + objectMetadata.put("Content-Length", String.valueOf(filePayload.length())); + } + } else { + objectMetadata.put("Content-Length", String.valueOf(filePayload.length())); + } + + final String keyName = determineKey(exchange); + CreateMultipartUploadRequest.Builder createMultipartUploadRequest = CreateMultipartUploadRequest.builder() + .bucket(getConfiguration().getBucketName()).key(keyName); + + String storageClass = determineStorageClass(exchange); + if (storageClass != null) { + createMultipartUploadRequest.storageClass(storageClass); + } + + String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class); + if (cannedAcl != null) { + ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl); + createMultipartUploadRequest.acl(objectAcl); + } + + BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class); + if (acl != null) { + // note: if cannedacl and acl are both specified the last one will + // be used. refer to + // PutObjectRequest#setAccessControlList for more details + createMultipartUploadRequest.acl(acl.toString()); + } + + if (getConfiguration().isUseAwsKMS()) { + createMultipartUploadRequest.ssekmsKeyId(getConfiguration().getAwsKMSKeyId()); + } + + LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange); + + CreateMultipartUploadResponse initResponse = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build()); + final long contentLength = Long.valueOf(objectMetadata.get("Content-Length")); + final List<String> partETags = new ArrayList<>(); + List<CompletedPart> completedParts = new ArrayList<CompletedPart>(); + long partSize = getConfiguration().getPartSize(); + CompleteMultipartUploadResponse uploadResult = null; + + long filePosition = 0; + + try { + for (int part = 1; filePosition < contentLength; part++) { + System.err.println("PART! " + part); + partSize = Math.min(partSize, contentLength - filePosition); + + UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName) + .uploadId(initResponse.uploadId()).partNumber(part).build(); + + LOG.trace("Uploading part [{}] for {}", part, keyName); + String etag = getEndpoint().getS3Client().uploadPart(uploadRequest, RequestBody.fromFile(filePayload)).eTag(); + partETags.add(etag); + CompletedPart partUpload = CompletedPart.builder().partNumber(part).eTag(etag).build(); + completedParts.add(partUpload); + filePosition += partSize; + System.err.println(filePosition); + } + CompletedMultipartUpload completeMultipartUpload = CompletedMultipartUpload.builder().parts(completedParts).build(); + CompleteMultipartUploadRequest compRequest = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload).bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId()).build(); + + uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest); + + } catch (Exception e) { + getEndpoint().getS3Client().abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId()).build()); + throw e; + } + + Message message = getMessageForResponse(exchange); + message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag()); + if (uploadResult.versionId() != null) { + message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId()); + } + + if (getConfiguration().isDeleteAfterWrite()) { + FileUtil.deleteFile(filePayload); + } + } public void processSingleOp(final Exchange exchange) throws Exception { diff --git a/components/camel-aws2-s3/src/test/resources/log4j2.properties b/components/camel-aws2-s3/src/test/resources/log4j2.properties index a287c66..e33abee 100644 --- a/components/camel-aws2-s3/src/test/resources/log4j2.properties +++ b/components/camel-aws2-s3/src/test/resources/log4j2.properties @@ -24,5 +24,5 @@ appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n -rootLogger.level = INFO +rootLogger.level = DEBUG rootLogger.appenderRef.file.ref = file