This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 7de327d91eb CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691) 7de327d91eb is described below commit 7de327d91eb6b238a4c937a31962932e9178c9dc Author: Joan Bordeau <corwin...@gmail.com> AuthorDate: Wed Jun 1 06:38:33 2022 +0200 CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691) Co-authored-by: jbordeau <joan.bord...@cleyrop.com> --- .../camel/component/minio/MinioConsumer.java | 65 ++++++++++------------ 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java index f82d5fd3385..562d499a574 100644 --- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java +++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.io.InputStream; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; @@ -123,14 +121,12 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { String bucketName = getConfiguration().getBucketName(); String objectName = getConfiguration().getObjectName(); - MinioClient minioClient = getMinioClient(); Queue<Exchange> exchanges; if (isNotEmpty(objectName)) { LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName); - InputStream minioObject = getObject(bucketName, minioClient, objectName); - exchanges = createExchanges(minioObject, objectName); + exchanges = createExchanges(objectName); return processBatch(CastUtils.cast(exchanges)); } else { @@ -185,27 +181,22 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { } } - protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception { + protected Queue<Exchange> createExchanges(String objectName) throws Exception { Queue<Exchange> answer = new LinkedList<>(); - Exchange exchange = createExchange(objectStream, objectName); + Exchange exchange = createExchange(objectName); answer.add(exchange); - IOHelper.close(objectStream); return answer; } protected Queue<Exchange> createExchanges(Iterator<Result<Item>> minioObjectSummaries) throws Exception { int messageCounter = 0; - String bucketName = getConfiguration().getBucketName(); - Collection<InputStream> minioObjects = new ArrayList<>(); Queue<Exchange> answer = new LinkedList<>(); try { if (getConfiguration().isIncludeFolders()) { do { messageCounter++; Item minioObjectSummary = minioObjectSummaries.next().get(); - InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName()); - minioObjects.add(minioObject); - Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName()); + Exchange exchange = createExchange(minioObjectSummary.objectName()); answer.add(exchange); continuationToken = minioObjectSummary.objectName(); } while (minioObjectSummaries.hasNext()); @@ -215,9 +206,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { Item minioObjectSummary = minioObjectSummaries.next().get(); // ignore if directory if (!minioObjectSummary.isDir()) { - InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName()); - minioObjects.add(minioObject); - Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName()); + Exchange exchange = createExchange(minioObjectSummary.objectName()); answer.add(exchange); continuationToken = minioObjectSummary.objectName(); } @@ -233,10 +222,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { LOG.warn("Error getting MinioObject due: {}", e.getMessage()); throw e; - } finally { - // ensure all previous gathered minio objects are closed - // if there was an exception creating the exchanges in this batch - minioObjects.forEach(IOHelper::close); } return answer; @@ -261,7 +246,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { } @Override - public int processBatch(Queue<Object> exchanges) { + public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); for (int index = 0; index < total && isBatchAllowed(); index++) { @@ -275,6 +260,27 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { // update pending number of exchanges pendingExchanges = total - index - 1; + String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class); + String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class); + if (getConfiguration().isIncludeBody()) { + InputStream minioObject; + try { + minioObject = getObject(srcBucketName, getMinioClient(), srcObjectName); + exchange.getIn().setBody(IOUtils.toByteArray(minioObject)); + if (getConfiguration().isAutoCloseBody()) { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + IOHelper.close(minioObject); + } + }); + } + } catch (Exception e) { + LOG.warn("Error getting MinioObject due: {}", e.getMessage()); + throw e; + } + } + // add on completion to handle after work when the exchange is done exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { @@ -405,7 +411,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { return (MinioEndpoint) super.getEndpoint(); } - private Exchange createExchange(InputStream minioObject, String objectName) throws Exception { + private Exchange createExchange(String objectName) throws Exception { LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName()); Exchange exchange = createExchange(true); @@ -415,21 +421,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { getEndpoint().getObjectStat(objectName, message); - if (getConfiguration().isIncludeBody()) { - message.setBody(IOUtils.toByteArray(minioObject)); - if (getConfiguration().isAutoCloseBody()) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { - @Override - public void onDone(Exchange exchange) { - IOHelper.close(minioObject); - } - }); - } - } else { - message.setBody(null); - IOHelper.close(minioObject); - } - return exchange; }