This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 12fdd32  camel-kafka: fix integration test failing on CI
12fdd32 is described below

commit 12fdd32fc6030a50f8cc75634f6510e8ea7ced1e
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Fri Sep 17 14:31:19 2021 +0200

    camel-kafka: fix integration test failing on CI
    
    The test kafkaMessageIsConsumedByCamelSeekedToBeginning is failing on CI 
because the code was trying to run a seek call without a partition being 
assigned to the consumer.
    
    This ensures the seek is only run when an assignment happens.
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 13 ------------
 .../support/PartitionAssignmentListener.java       | 24 +++++++---------------
 .../consumer/support/SeekPolicyResumeStrategy.java |  8 --------
 .../kafka/integration/KafkaConsumerFullIT.java     | 10 +++++++++
 4 files changed, 17 insertions(+), 38 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 086432e..720ee8d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -33,8 +33,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import 
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
-import org.apache.camel.spi.StateRepository;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -80,10 +78,6 @@ class KafkaFetchRecords implements Runnable {
 
     void preInit() {
         createConsumer();
-
-        StateRepository<String, String> offsetRepository = 
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
-        String seekPolicy = 
kafkaConsumer.getEndpoint().getConfiguration().getSeekTo();
-        resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, 
offsetRepository, seekPolicy);
     }
 
     @Override
@@ -134,9 +128,6 @@ class KafkaFetchRecords implements Runnable {
         if (isReconnecting()) {
             subscribe();
 
-            // on first run or reconnecting
-            resume();
-
             // set reconnect to false as the connection and resume is done at 
this point
             setReconnect(false);
 
@@ -148,10 +139,6 @@ class KafkaFetchRecords implements Runnable {
         startPolling();
     }
 
-    protected void resume() {
-        resumeStrategy.resume();
-    }
-
     private void subscribe() {
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
                 threadId, topicName,
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index b67ab0c..ce02e37 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
 import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
 
 public class PartitionAssignmentListener implements ConsumerRebalanceListener {
@@ -40,6 +39,7 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     private final KafkaConfiguration configuration;
     private final KafkaConsumer consumer;
     private final Map<String, Long> lastProcessedOffset;
+    private final ResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, String topicName, 
KafkaConfiguration configuration,
@@ -51,13 +51,12 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
         this.consumer = consumer;
         this.lastProcessedOffset = lastProcessedOffset;
         this.stopStateSupplier = stopStateSupplier;
-    }
 
-    private void resumeFromOffset(TopicPartition topicPartition, String 
offsetState) {
-        // The state contains the last read offset, so you need to seek from 
the next one
-        long offset = deserializeOffsetValue(offsetState) + 1;
-        LOG.debug("Resuming partition {} from offset {} from state", 
topicPartition.partition(), offset);
-        consumer.seek(topicPartition, offset);
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+        String seekPolicy = configuration.getSeekTo();
+
+        LOG.info("Performing resume as {} ", seekPolicy);
+        resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, 
offsetRepository, seekPolicy);
     }
 
     @Override
@@ -92,15 +91,6 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
         LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, 
topicName);
 
-        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
-        if (offsetRepository != null) {
-            for (TopicPartition partition : partitions) {
-                String offsetState = 
offsetRepository.getState(serializeOffsetKey(partition));
-                if (offsetState != null && !offsetState.isEmpty()) {
-                    // The state contains the last read offset, so you need to 
seek from the next one
-                    resumeFromOffset(partition, offsetState);
-                }
-            }
-        }
+        resumeStrategy.resume();
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
index 8e5361d..d7606df 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
@@ -17,8 +17,6 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import java.time.Duration;
-
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,15 +39,9 @@ public class SeekPolicyResumeStrategy implements 
ResumeStrategy {
     public void resume() {
         if (seekPolicy.equals("beginning")) {
             LOG.debug("Seeking from the beginning of topic");
-            // This poll to ensure we have an assigned partition
-            // otherwise seek won't work
-            consumer.poll(Duration.ofMillis(100));
             consumer.seekToBeginning(consumer.assignment());
         } else if (seekPolicy.equals("end")) {
             LOG.debug("Seeking from the end off the topic");
-            // This poll to ensure we have an assigned partition
-            // otherwise seek won't work
-            consumer.poll(Duration.ofMillis(100));
             consumer.seekToEnd(consumer.assignment());
         }
     }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index f143bd2..f4a9dd4 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -35,7 +35,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
@@ -45,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "true",
                           disabledReason = "Runtime conflicts with the 
idempotency tests")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
 
     public static final String TOPIC = "test";
@@ -89,6 +93,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         };
     }
 
+    @Order(3)
     @Test
     public void kafkaMessageIsConsumedByCamel() throws InterruptedException, 
IOException {
         String propagatedHeaderKey = "PropagatedCustomHeader";
@@ -119,6 +124,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive 
propagated header");
     }
 
+    @Order(2)
     @Test
     public void kafkaRecordSpecificHeadersAreNotOverwritten() throws 
InterruptedException, IOException {
         String propagatedHeaderKey = KafkaConstants.TOPIC;
@@ -137,6 +143,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
     }
 
     @Test
+    @Order(1)
     public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws 
Exception {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
@@ -150,6 +157,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         to.reset();
 
         to.expectedMessageCount(5);
+
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
 
         // Restart endpoint,
@@ -164,6 +172,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         to.assertIsSatisfied(3000);
     }
 
+    @Order(4)
     @Test
     public void kafkaMessageIsConsumedByCamelSeekedToEnd() throws Exception {
         to.expectedMessageCount(5);
@@ -194,6 +203,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         to.assertIsSatisfied(3000);
     }
 
+    @Order(5)
     @Test
     public void headerDeserializerCouldBeOverridden() {
         KafkaEndpoint kafkaEndpoint

Reply via email to