This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-17024 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 403e2b7263e2ec08fd1c7a361f899a2ab4bd7aff Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Sep 30 15:28:50 2021 +0200 CAMEL-17024: camel-aws2-s3 - Determine content-length in a smarter way --- .../camel/component/aws2/s3/AWS2S3Producer.java | 105 ++++++++++----------- .../camel/component/aws2/s3/utils/AWS2S3Utils.java | 28 ++++-- 2 files changed, 72 insertions(+), 61 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java index 95f6c6b..8427993 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java @@ -37,7 +37,6 @@ import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.FileUtil; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -78,14 +77,11 @@ import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequ /** * A Producer which sends messages to the Amazon Web Service Simple Storage Service * <a href="http://aws.amazon.com/s3/">AWS S3</a> - * */ public class AWS2S3Producer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(AWS2S3Producer.class); - private transient String s3ProducerToString; - public AWS2S3Producer(final Endpoint endpoint) { super(endpoint); } @@ -252,38 +248,59 @@ public class AWS2S3Producer extends DefaultProducer { } public void processSingleOp(final Exchange exchange) throws Exception { + PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder(); Map<String, String> objectMetadata = determineMetadata(exchange); - File filePayload = null; - InputStream is = null; - ByteArrayOutputStream baos = null; + // the content-length may already be known + long contentLength = Long.parseLong(objectMetadata.getOrDefault(Exchange.CONTENT_LENGTH, "-1")); + Object obj = exchange.getIn().getMandatoryBody(); - PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder(); - // Need to check if the message body is WrappedFile - if (obj instanceof WrappedFile) { - obj = ((WrappedFile<?>) obj).getFile(); - } - if (obj instanceof File) { - filePayload = (File) obj; - is = new FileInputStream(filePayload); - } else { - is = exchange.getIn().getMandatoryBody(InputStream.class); - if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) { - if (objectMetadata.get("Content-Length").equals("0") - && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { - LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory"); - baos = AWS2S3Utils.determineLengthInputStream(is); - objectMetadata.put("Content-Length", String.valueOf(baos.size())); - is = new ByteArrayInputStream(baos.toByteArray()); - } else { - if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { - objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class)); + InputStream inputStream = null; + File filePayload = null; + + try { + // Need to check if the message body is WrappedFile + if (obj instanceof WrappedFile) { + obj = ((WrappedFile<?>) obj).getFile(); + } + if (obj instanceof File) { + filePayload = (File) obj; + inputStream = new FileInputStream(filePayload); + contentLength = filePayload.length(); + } else { + inputStream = exchange.getIn().getMandatoryBody(InputStream.class); + if (contentLength <= 0) { + contentLength = AWS2S3Utils.determineLengthInputStream(inputStream); + if (contentLength == -1) { + // fallback to read into memory to calculate length + LOG.debug( + "The content length is not defined. It needs to be determined by reading the data into memory"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IOHelper.copyAndCloseInput(inputStream, baos); + byte[] arr = baos.toByteArray(); + contentLength = arr.length; + inputStream = new ByteArrayInputStream(arr); } } + if (contentLength > 0) { + objectMetadata.put(Exchange.CONTENT_LENGTH, String.valueOf(contentLength)); + } } + + doPutObject(exchange, putObjectRequest, objectMetadata, inputStream); + } finally { + IOHelper.close(inputStream); } + if (getConfiguration().isDeleteAfterWrite() && filePayload != null) { + FileUtil.deleteFile(filePayload); + } + } + + private void doPutObject( + Exchange exchange, PutObjectRequest.Builder putObjectRequest, Map<String, String> objectMetadata, + InputStream inputStream) { final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); final String key = AWS2S3Utils.determineKey(exchange, getConfiguration()); putObjectRequest.bucket(bucketName).key(key).metadata(objectMetadata); @@ -334,7 +351,7 @@ public class AWS2S3Producer extends DefaultProducer { LOG.trace("Put object [{}] from exchange [{}]...", putObjectRequest, exchange); PutObjectResponse putObjectResult = getEndpoint().getS3Client().putObject(putObjectRequest.build(), - RequestBody.fromBytes(SdkBytes.fromInputStream(is).asByteArray())); + RequestBody.fromBytes(SdkBytes.fromInputStream(inputStream).asByteArray())); LOG.trace("Received result [{}]", putObjectResult); @@ -343,12 +360,6 @@ public class AWS2S3Producer extends DefaultProducer { if (putObjectResult.versionId() != null) { message.setHeader(AWS2S3Constants.VERSION_ID, putObjectResult.versionId()); } - - IOHelper.close(is); - - if (getConfiguration().isDeleteAfterWrite() && filePayload != null) { - FileUtil.deleteFile(filePayload); - } } private void copyObject(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { @@ -406,6 +417,7 @@ public class AWS2S3Producer extends DefaultProducer { private void deleteObject(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); final String sourceKey = AWS2S3Utils.determineKey(exchange, getConfiguration()); + if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); if (payload instanceof DeleteObjectRequest) { @@ -414,7 +426,6 @@ public class AWS2S3Producer extends DefaultProducer { message.setBody(true); } } else { - DeleteObjectRequest.Builder deleteObjectRequest = DeleteObjectRequest.builder().bucket(bucketName).key(sourceKey); s3Client.deleteObject(deleteObjectRequest.build()); @@ -441,7 +452,6 @@ public class AWS2S3Producer extends DefaultProducer { message.setBody(resp); } } else { - DeleteBucketRequest.Builder deleteBucketRequest = DeleteBucketRequest.builder().bucket(bucketName); DeleteBucketResponse resp = s3Client.deleteBucket(deleteBucketRequest.build()); @@ -451,7 +461,6 @@ public class AWS2S3Producer extends DefaultProducer { } private void getObject(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); if (payload instanceof GetObjectRequest) { @@ -486,7 +495,6 @@ public class AWS2S3Producer extends DefaultProducer { message.setBody(res); } } else { - if (ObjectHelper.isEmpty(rangeStart) || ObjectHelper.isEmpty(rangeEnd)) { throw new IllegalArgumentException( "A Range start and range end header must be configured to perform a range get operation."); @@ -512,7 +520,6 @@ public class AWS2S3Producer extends DefaultProducer { message.setBody(objectList.contents()); } } else { - ListObjectsResponse objectList = s3Client.listObjects(ListObjectsRequest.builder().bucket(bucketName).build()); Message message = getMessageForResponse(exchange); @@ -576,32 +583,32 @@ public class AWS2S3Producer extends DefaultProducer { Long contentLength = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, Long.class); if (contentLength != null) { - objectMetadata.put("Content-Length", String.valueOf(contentLength)); + objectMetadata.put(Exchange.CONTENT_LENGTH, String.valueOf(contentLength)); } String contentType = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_TYPE, String.class); if (contentType != null) { - objectMetadata.put("Content-Type", String.valueOf(contentType)); + objectMetadata.put("Content-Type", contentType); } String cacheControl = exchange.getIn().getHeader(AWS2S3Constants.CACHE_CONTROL, String.class); if (cacheControl != null) { - objectMetadata.put("Cache-Control", String.valueOf(cacheControl)); + objectMetadata.put("Cache-Control", cacheControl); } String contentDisposition = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_DISPOSITION, String.class); if (contentDisposition != null) { - objectMetadata.put("Content-Disposition", String.valueOf(contentDisposition)); + objectMetadata.put("Content-Disposition", contentDisposition); } String contentEncoding = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_ENCODING, String.class); if (contentEncoding != null) { - objectMetadata.put("Content-Encoding", String.valueOf(contentEncoding)); + objectMetadata.put("Content-Encoding", contentEncoding); } String contentMD5 = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_MD5, String.class); if (contentMD5 != null) { - objectMetadata.put("Content-Md5", String.valueOf(contentMD5)); + objectMetadata.put("Content-Md5", contentMD5); } return objectMetadata; @@ -612,14 +619,6 @@ public class AWS2S3Producer extends DefaultProducer { } @Override - public String toString() { - if (s3ProducerToString == null) { - s3ProducerToString = "S3Producer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; - } - return s3ProducerToString; - } - - @Override public AWS2S3Endpoint getEndpoint() { return (AWS2S3Endpoint) super.getEndpoint(); } diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java index 3f8ab3d..d434ff9 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.aws2.s3.utils; -import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -80,14 +80,26 @@ public final class AWS2S3Utils { } } - public static ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] bytes = new byte[1024]; - int count; - while ((count = is.read(bytes)) > 0) { - out.write(bytes, 0, count); + public static long determineLengthInputStream(InputStream is) throws IOException { + if (!is.markSupported()) { + return -1; } - return out; + if (is instanceof ByteArrayInputStream) { + return is.available(); + } + long size = 0; + try { + is.mark(1024); + int i = is.available(); + while (i > 0) { + long skip = is.skip(i); + size += skip; + i = is.available(); + } + } finally { + is.reset(); + } + return size; } public static String determineKey(final Exchange exchange, AWS2S3Configuration configuration) {