This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 62fd4017653 fix(minio): fix polling working half the time because of continuationToken which restart previous polling (#7720) 62fd4017653 is described below commit 62fd4017653de963446a67b34e6784858987bc8d Author: Joan Bordeau <corwin...@gmail.com> AuthorDate: Wed Jun 8 06:24:32 2022 +0200 fix(minio): fix polling working half the time because of continuationToken which restart previous polling (#7720) Co-authored-by: jbordeau <joan.bord...@cleyrop.com> --- .../apache/camel/component/minio/MinioConsumer.java | 18 +++++++++++------- .../apache/camel/component/minio/MinioTestUtils.java | 13 ------------- .../component/minio/integration/MinioConsumerIT.java | 4 ++-- .../camel-minio/src/test/resources/log4j2.properties | 2 +- 4 files changed, 14 insertions(+), 23 deletions(-) diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java index 562d499a574..f693a53a79f 100644 --- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java +++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.util.Deque; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; @@ -121,7 +122,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { String bucketName = getConfiguration().getBucketName(); String objectName = getConfiguration().getObjectName(); - Queue<Exchange> exchanges; + Deque<Exchange> exchanges; if (isNotEmpty(objectName)) { LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName); @@ -168,6 +169,11 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { if (listObjects.hasNext()) { exchanges = createExchanges(listObjects); + if (maxMessagesPerPoll <= 0 || exchanges.size() < maxMessagesPerPoll) { + continuationToken = null; + } else { + continuationToken = exchanges.getLast().getIn().getHeader(MinioConstants.OBJECT_NAME, String.class); + } if (LOG.isTraceEnabled()) { LOG.trace("Found {} objects in bucket {}...", totalCounter, bucketName); } @@ -181,16 +187,16 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { } } - protected Queue<Exchange> createExchanges(String objectName) throws Exception { - Queue<Exchange> answer = new LinkedList<>(); + protected Deque<Exchange> createExchanges(String objectName) throws Exception { + Deque<Exchange> answer = new LinkedList<>(); Exchange exchange = createExchange(objectName); answer.add(exchange); return answer; } - protected Queue<Exchange> createExchanges(Iterator<Result<Item>> minioObjectSummaries) throws Exception { + protected Deque<Exchange> createExchanges(Iterator<Result<Item>> minioObjectSummaries) throws Exception { int messageCounter = 0; - Queue<Exchange> answer = new LinkedList<>(); + Deque<Exchange> answer = new LinkedList<>(); try { if (getConfiguration().isIncludeFolders()) { do { @@ -198,7 +204,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { Item minioObjectSummary = minioObjectSummaries.next().get(); Exchange exchange = createExchange(minioObjectSummary.objectName()); answer.add(exchange); - continuationToken = minioObjectSummary.objectName(); } while (minioObjectSummaries.hasNext()); } else { do { @@ -208,7 +213,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { if (!minioObjectSummary.isDir()) { Exchange exchange = createExchange(minioObjectSummary.objectName()); answer.add(exchange); - continuationToken = minioObjectSummary.objectName(); } } while (minioObjectSummaries.hasNext()); } diff --git a/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java b/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java index c2cae2e74ce..ea2153ece03 100644 --- a/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java +++ b/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java @@ -37,17 +37,4 @@ public final class MinioTestUtils { return properties; } - - static Properties loadMinioAccessFromJvmEnv() throws Exception { - final Properties properties = new Properties(); - if (System.getProperty("endpoint") == null || System.getProperty("accessKey") == null - || System.getProperty("secretKey") == null) { - throw new Exception("Make sure to supply minio endpoint and credentials"); - } - properties.setProperty("endpoint", System.getProperty("endpoint")); - properties.setProperty("access_key", System.getProperty("accessKey")); - properties.setProperty("secret_key", System.getProperty("secretKey")); - properties.setProperty("region", System.getProperty("region")); - return properties; - } } diff --git a/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java b/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java index 9c7f7bb6f5b..703369b3465 100644 --- a/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java +++ b/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java @@ -41,7 +41,7 @@ class MinioConsumerIT extends MinioIntegrationTestSupport { @Test void sendIn() throws Exception { - result.expectedMessageCount(3); + result.expectedMessageCount(6); template.send("direct:putObject", exchange -> { exchange.getIn().setHeader(MinioConstants.OBJECT_NAME, "test1.txt"); @@ -69,7 +69,7 @@ class MinioConsumerIT extends MinioIntegrationTestSupport { String minioEndpoint = "minio://mycamel?autoCreateBucket=true"; from("direct:putObject").startupOrder(1).to(minioEndpoint); - from("minio://mycamel?moveAfterRead=true&destinationBucketName=camel-kafka-connector&autoCreateBucket=true") + from(minioEndpoint + "&deleteAfterRead=false&destinationBucketName=camel-kafka-connector&repeatCount=2") .startupOrder(2).to("mock:result"); } diff --git a/components/camel-minio/src/test/resources/log4j2.properties b/components/camel-minio/src/test/resources/log4j2.properties index 7674a89fb88..da1de1dcddd 100644 --- a/components/camel-minio/src/test/resources/log4j2.properties +++ b/components/camel-minio/src/test/resources/log4j2.properties @@ -24,4 +24,4 @@ appender.out.name=out appender.out.layout.type=PatternLayout appender.out.layout.pattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level=INFO -rootLogger.appenderRef.file.ref=file +rootLogger.appenderRef.file.ref=file \ No newline at end of file