This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 62fd4017653 fix(minio): fix polling working half the time because of 
continuationToken which restart previous polling (#7720)
62fd4017653 is described below

commit 62fd4017653de963446a67b34e6784858987bc8d
Author: Joan Bordeau <corwin...@gmail.com>
AuthorDate: Wed Jun 8 06:24:32 2022 +0200

    fix(minio): fix polling working half the time because of continuationToken 
which restart previous polling (#7720)
    
    Co-authored-by: jbordeau <joan.bord...@cleyrop.com>
---
 .../apache/camel/component/minio/MinioConsumer.java    | 18 +++++++++++-------
 .../apache/camel/component/minio/MinioTestUtils.java   | 13 -------------
 .../component/minio/integration/MinioConsumerIT.java   |  4 ++--
 .../camel-minio/src/test/resources/log4j2.properties   |  2 +-
 4 files changed, 14 insertions(+), 23 deletions(-)

diff --git 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index 562d499a574..f693a53a79f 100644
--- 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -121,7 +122,7 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
 
         String bucketName = getConfiguration().getBucketName();
         String objectName = getConfiguration().getObjectName();
-        Queue<Exchange> exchanges;
+        Deque<Exchange> exchanges;
 
         if (isNotEmpty(objectName)) {
             LOG.trace("Getting object in bucket {} with object name {}...", 
bucketName, objectName);
@@ -168,6 +169,11 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
 
             if (listObjects.hasNext()) {
                 exchanges = createExchanges(listObjects);
+                if (maxMessagesPerPoll <= 0 || exchanges.size() < 
maxMessagesPerPoll) {
+                    continuationToken = null;
+                } else {
+                    continuationToken = 
exchanges.getLast().getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+                }
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Found {} objects in bucket {}...", 
totalCounter, bucketName);
                 }
@@ -181,16 +187,16 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
         }
     }
 
-    protected Queue<Exchange> createExchanges(String objectName) throws 
Exception {
-        Queue<Exchange> answer = new LinkedList<>();
+    protected Deque<Exchange> createExchanges(String objectName) throws 
Exception {
+        Deque<Exchange> answer = new LinkedList<>();
         Exchange exchange = createExchange(objectName);
         answer.add(exchange);
         return answer;
     }
 
-    protected Queue<Exchange> createExchanges(Iterator<Result<Item>> 
minioObjectSummaries) throws Exception {
+    protected Deque<Exchange> createExchanges(Iterator<Result<Item>> 
minioObjectSummaries) throws Exception {
         int messageCounter = 0;
-        Queue<Exchange> answer = new LinkedList<>();
+        Deque<Exchange> answer = new LinkedList<>();
         try {
             if (getConfiguration().isIncludeFolders()) {
                 do {
@@ -198,7 +204,6 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
                     Item minioObjectSummary = 
minioObjectSummaries.next().get();
                     Exchange exchange = 
createExchange(minioObjectSummary.objectName());
                     answer.add(exchange);
-                    continuationToken = minioObjectSummary.objectName();
                 } while (minioObjectSummaries.hasNext());
             } else {
                 do {
@@ -208,7 +213,6 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
                     if (!minioObjectSummary.isDir()) {
                         Exchange exchange = 
createExchange(minioObjectSummary.objectName());
                         answer.add(exchange);
-                        continuationToken = minioObjectSummary.objectName();
                     }
                 } while (minioObjectSummaries.hasNext());
             }
diff --git 
a/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java
 
b/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java
index c2cae2e74ce..ea2153ece03 100644
--- 
a/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java
+++ 
b/components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioTestUtils.java
@@ -37,17 +37,4 @@ public final class MinioTestUtils {
 
         return properties;
     }
-
-    static Properties loadMinioAccessFromJvmEnv() throws Exception {
-        final Properties properties = new Properties();
-        if (System.getProperty("endpoint") == null || 
System.getProperty("accessKey") == null
-                || System.getProperty("secretKey") == null) {
-            throw new Exception("Make sure to supply minio endpoint and 
credentials");
-        }
-        properties.setProperty("endpoint", System.getProperty("endpoint"));
-        properties.setProperty("access_key", System.getProperty("accessKey"));
-        properties.setProperty("secret_key", System.getProperty("secretKey"));
-        properties.setProperty("region", System.getProperty("region"));
-        return properties;
-    }
 }
diff --git 
a/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java
 
b/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java
index 9c7f7bb6f5b..703369b3465 100644
--- 
a/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java
+++ 
b/components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIT.java
@@ -41,7 +41,7 @@ class MinioConsumerIT extends MinioIntegrationTestSupport {
 
     @Test
     void sendIn() throws Exception {
-        result.expectedMessageCount(3);
+        result.expectedMessageCount(6);
 
         template.send("direct:putObject", exchange -> {
             exchange.getIn().setHeader(MinioConstants.OBJECT_NAME, 
"test1.txt");
@@ -69,7 +69,7 @@ class MinioConsumerIT extends MinioIntegrationTestSupport {
                 String minioEndpoint = "minio://mycamel?autoCreateBucket=true";
 
                 from("direct:putObject").startupOrder(1).to(minioEndpoint);
-                
from("minio://mycamel?moveAfterRead=true&destinationBucketName=camel-kafka-connector&autoCreateBucket=true")
+                from(minioEndpoint + 
"&deleteAfterRead=false&destinationBucketName=camel-kafka-connector&repeatCount=2")
                         .startupOrder(2).to("mock:result");
 
             }
diff --git a/components/camel-minio/src/test/resources/log4j2.properties 
b/components/camel-minio/src/test/resources/log4j2.properties
index 7674a89fb88..da1de1dcddd 100644
--- a/components/camel-minio/src/test/resources/log4j2.properties
+++ b/components/camel-minio/src/test/resources/log4j2.properties
@@ -24,4 +24,4 @@ appender.out.name=out
 appender.out.layout.type=PatternLayout
 appender.out.layout.pattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 rootLogger.level=INFO
-rootLogger.appenderRef.file.ref=file
+rootLogger.appenderRef.file.ref=file
\ No newline at end of file

Reply via email to