This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 99e08d0 CAMEL-17021: camel-minio - Determine content-length in a smarter way (#6184) 99e08d0 is described below commit 99e08d0237368053b32935bfdb3106b67e7222d4 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Sep 30 12:32:06 2021 +0200 CAMEL-17021: camel-minio - Determine content-length in a smarter way (#6184) * CAMEL-17021: camel-minio - Determine content-length in a smarter way. Use content-length as the actual size when putting to minio. * Polished --- .../camel/component/minio/MinioProducer.java | 101 ++++++++++----------- 1 file changed, 48 insertions(+), 53 deletions(-) diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java index a0a1c29..b2d11ac 100644 --- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java +++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java @@ -55,7 +55,6 @@ import org.apache.camel.WrappedFile; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.FileUtil; import org.apache.camel.util.IOHelper; -import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,8 +68,6 @@ public class MinioProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class); - private transient String minioProducerToString; - public MinioProducer(final Endpoint endpoint) { super(endpoint); } @@ -118,7 +115,6 @@ public class MinioProducer extends DefaultProducer { } public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception { - if (getConfiguration().isPojoRequest()) { PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class); if (isNotEmpty(payload)) { @@ -135,25 +131,41 @@ public class MinioProducer extends DefaultProducer { Map<String, String> objectMetadata = determineMetadata(exchange); Map<String, String> extraHeaders = determineExtraHeaders(exchange); - File filePayload = null; + // the content-length may already be known + long contentLength = Long.parseLong(objectMetadata.getOrDefault(Exchange.CONTENT_LENGTH, "-1")); Object object = exchange.getIn().getMandatoryBody(); - - // Need to check if the message body is WrappedFile - if (object instanceof WrappedFile) { - object = ((WrappedFile<?>) object).getFile(); - } - InputStream inputStream = null; + File filePayload = null; try { + // Need to check if the message body is WrappedFile + if (object instanceof WrappedFile) { + object = ((WrappedFile<?>) object).getFile(); + } if (object instanceof File) { filePayload = (File) object; inputStream = new FileInputStream(filePayload); + contentLength = filePayload.length(); } else { - inputStream = getInputStreamFromExchange(exchange, objectMetadata); + inputStream = exchange.getMessage().getMandatoryBody(InputStream.class); + if (contentLength <= 0) { + contentLength = determineLengthInputStream(inputStream); + if (contentLength == -1) { + // fallback to read into memory to calculate length + LOG.debug( + "The content length is not defined. It needs to be determined by reading the data into memory"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IOHelper.copyAndCloseInput(inputStream, baos); + byte[] arr = baos.toByteArray(); + contentLength = arr.length; + inputStream = new ByteArrayInputStream(arr); + } + } } - - doPutObject(exchange, bucketName, objectName, objectMetadata, extraHeaders, inputStream); + if (contentLength > 0) { + objectMetadata.put(Exchange.CONTENT_LENGTH, String.valueOf(contentLength)); + } + doPutObject(exchange, bucketName, objectName, objectMetadata, extraHeaders, inputStream, contentLength); } finally { IOHelper.close(inputStream); } @@ -166,11 +178,11 @@ public class MinioProducer extends DefaultProducer { private void doPutObject( Exchange exchange, String bucketName, String objectName, Map<String, String> objectMetadata, - Map<String, String> extraHeaders, InputStream inputStream) + Map<String, String> extraHeaders, InputStream inputStream, long contentLength) throws IOException, ErrorResponseException, InsufficientDataException, InternalException, InvalidKeyException, InvalidResponseException, NoSuchAlgorithmException, ServerException, XmlParserException { PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder() - .stream(inputStream, inputStream.available(), -1) + .stream(inputStream, contentLength, -1) .bucket(bucketName) .object(objectName) .userMetadata(objectMetadata); @@ -192,27 +204,6 @@ public class MinioProducer extends DefaultProducer { } } - private InputStream getInputStreamFromExchange(Exchange exchange, Map<String, String> objectMetadata) - throws InvalidPayloadException, IOException { - InputStream inputStream = exchange.getIn().getMandatoryBody(InputStream.class); - - if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) { - if (objectMetadata.get("Content-Length").equals("0") - && isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { - LOG.debug( - "The content length is not defined. It needs to be determined by reading the data into memory"); - ByteArrayOutputStream baos = determineLengthInputStream(inputStream); - objectMetadata.put("Content-Length", String.valueOf(baos.size())); - inputStream = new ByteArrayInputStream(baos.toByteArray()); - } else { - if (isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { - objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class)); - } - } - } - return inputStream; - } - private Map<String, String> determineExtraHeaders(Exchange exchange) { Map<String, String> extraHeaders = new HashMap<>(); String storageClass = determineStorageClass(exchange); @@ -417,7 +408,7 @@ public class MinioProducer extends DefaultProducer { Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class); if (isNotEmpty(contentLength)) { - objectMetadata.put("Content-Length", String.valueOf(contentLength)); + objectMetadata.put(Exchange.CONTENT_LENGTH, String.valueOf(contentLength)); } String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class); @@ -490,14 +481,26 @@ public class MinioProducer extends DefaultProducer { return storageClass; } - private ByteArrayOutputStream determineLengthInputStream(InputStream inputStream) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] bytes = new byte[MinioConstants.BYTE_ARRAY_LENGTH]; - int count; - while ((count = inputStream.read(bytes)) > 0) { - out.write(bytes, 0, count); + private long determineLengthInputStream(InputStream is) throws IOException { + if (!is.markSupported()) { + return -1; + } + if (is instanceof ByteArrayInputStream) { + return is.available(); } - return out; + long size = 0; + try { + is.mark(MinioConstants.BYTE_ARRAY_LENGTH); + int i = is.available(); + while (i > 0) { + long skip = is.skip(i); + size += skip; + i = is.available(); + } + } finally { + is.reset(); + } + return size; } protected MinioConfiguration getConfiguration() { @@ -505,14 +508,6 @@ public class MinioProducer extends DefaultProducer { } @Override - public String toString() { - if (isEmpty(minioProducerToString)) { - minioProducerToString = "MinioProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; - } - return minioProducerToString; - } - - @Override public MinioEndpoint getEndpoint() { return (MinioEndpoint) super.getEndpoint(); }