This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.23.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.23.x by this push: new 5811070 CAMEL-13433 - S3: Exchange body stream is loaded into memory to calculate content length which is already set via headers 5811070 is described below commit 58110700436e9f4f6b657cb7d4edb60418f00b10 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Apr 23 11:06:28 2019 +0200 CAMEL-13433 - S3: Exchange body stream is loaded into memory to calculate content length which is already set via headers --- .../apache/camel/component/aws/s3/S3Producer.java | 13 +- .../aws/s3/S3ComponentContentLengthFileTest.java | 187 +++++++++++++++++++++ 2 files changed, 197 insertions(+), 3 deletions(-) diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java index 39b83bf..5b29ec1 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java @@ -227,9 +227,16 @@ public class S3Producer extends DefaultProducer { is = new FileInputStream(filePayload); } else { is = exchange.getIn().getMandatoryBody(InputStream.class); - baos = determineLengthInputStream(is); - objectMetadata.setContentLength(baos.size()); - is = new ByteArrayInputStream(baos.toByteArray()); + if (objectMetadata.getContentLength() == 0 && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { + log.debug("The content length is not defined. It needs to be determined by reading the data into memory"); + baos = determineLengthInputStream(is); + objectMetadata.setContentLength(baos.size()); + is = new ByteArrayInputStream(baos.toByteArray()); + } else { + if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { + objectMetadata.setContentLength(Long.valueOf(exchange.getProperty(Exchange.CONTENT_LENGTH, String.class))); + } + } } String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class); diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentContentLengthFileTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentContentLengthFileTest.java new file mode 100644 index 0000000..896a2dd --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentContentLengthFileTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.s3; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.InputStream; +import java.util.Map; + +import com.amazonaws.services.s3.model.PutObjectRequest; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.util.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class S3ComponentContentLengthFileTest extends CamelTestSupport { + + @EndpointInject(uri = "direct:startKeep") + ProducerTemplate templateKeep; + + @EndpointInject(uri = "direct:startDelete") + ProducerTemplate templateDelete; + + @EndpointInject(uri = "mock:result") + MockEndpoint result; + + AmazonS3ClientMock client; + + File testFile; + + String getCamelBucket() { + return "mycamelbucket"; + } + + @Before + public void setup() throws Exception { + super.setUp(); + + testFile = FileUtil.createTempFile("test", "file", new File("target/tmp")); + + FileWriter writer = new FileWriter(testFile); + writer.write("This is my bucket content."); + writer.close(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + FileUtil.deleteFile(testFile); + } + + @Test + public void sendFile() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = templateKeep.send("direct:startKeep", ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest"); + exchange.getIn().setBody(new FileInputStream(testFile)); + } + }); + + assertMockEndpointsSatisfied(); + + assertResultExchange(result.getExchanges().get(0), true); + + PutObjectRequest putObjectRequest = client.putObjectRequests.get(0); + assertEquals(getCamelBucket(), putObjectRequest.getBucketName()); + + assertResponseMessage(exchange.getIn()); + + assertFileExists(testFile.getAbsolutePath()); + } + + @Test + public void sendFileWithContentLength() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = templateKeep.send("direct:startKeep", ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest"); + exchange.getIn().setHeader(S3Constants.CONTENT_LENGTH, testFile.length()); + exchange.getIn().setBody(new FileInputStream(testFile)); + } + }); + + assertMockEndpointsSatisfied(); + + assertResultExchange(result.getExchanges().get(0), true); + + PutObjectRequest putObjectRequest = client.putObjectRequests.get(0); + assertEquals(getCamelBucket(), putObjectRequest.getBucketName()); + + assertResponseMessage(exchange.getIn()); + + assertFileExists(testFile.getAbsolutePath()); + } + + void assertResultExchange(Exchange resultExchange, boolean delete) { + assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody()); + + if (!delete) { + // assert on the file content only in case the "deleteAfterWrite" + // option is NOT enabled + // in which case we would still have the file and thereby could + // assert on it's content + assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class)); + } + + assertEquals(getCamelBucket(), resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME)); + assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY)); + assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not + // enabled + // on + // this + // bucket + assertNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED)); + assertNull(resultExchange.getIn().getHeader(S3Constants.E_TAG)); + assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE)); + assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING)); + assertEquals(0L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH)); + assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION)); + assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5)); + assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL)); + assertNotNull(resultExchange.getIn().getHeader(S3Constants.USER_METADATA)); + assertEquals(0, resultExchange.getIn().getHeader(S3Constants.S3_HEADERS, Map.class).size()); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + client = new AmazonS3ClientMock(); + registry.bind("amazonS3Client", client); + + return registry; + } + + void assertResponseMessage(Message message) { + assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG)); + assertNull(message.getHeader(S3Constants.VERSION_ID)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + String awsEndpoint = "aws-s3://" + getCamelBucket() + "?amazonS3Client=#amazonS3Client"; + + from("direct:startKeep").to(awsEndpoint + "&deleteAfterWrite=false"); + + from("direct:startDelete").to(awsEndpoint + "&deleteAfterWrite=true"); + + from(awsEndpoint + "&maxMessagesPerPoll=5").to("mock:result"); + } + }; + } +} +