This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 12fdd32 camel-kafka: fix integration test failing on CI 12fdd32 is described below commit 12fdd32fc6030a50f8cc75634f6510e8ea7ced1e Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Sep 17 14:31:19 2021 +0200 camel-kafka: fix integration test failing on CI The test kafkaMessageIsConsumedByCamelSeekedToBeginning is failing on CI because the code was trying to run a seek call without a partition being assigned to the consumer. This ensures the seek is only run when an assignment happens. --- .../camel/component/kafka/KafkaFetchRecords.java | 13 ------------ .../support/PartitionAssignmentListener.java | 24 +++++++--------------- .../consumer/support/SeekPolicyResumeStrategy.java | 8 -------- .../kafka/integration/KafkaConsumerFullIT.java | 10 +++++++++ 4 files changed, 17 insertions(+), 38 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 086432e..720ee8d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -33,8 +33,6 @@ import org.apache.camel.Exchange; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener; import org.apache.camel.component.kafka.consumer.support.ResumeStrategy; -import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory; -import org.apache.camel.spi.StateRepository; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.util.IOHelper; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -80,10 +78,6 @@ class KafkaFetchRecords implements Runnable { void preInit() { createConsumer(); - - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - String seekPolicy = kafkaConsumer.getEndpoint().getConfiguration().getSeekTo(); - resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, offsetRepository, seekPolicy); } @Override @@ -134,9 +128,6 @@ class KafkaFetchRecords implements Runnable { if (isReconnecting()) { subscribe(); - // on first run or reconnecting - resume(); - // set reconnect to false as the connection and resume is done at this point setReconnect(false); @@ -148,10 +139,6 @@ class KafkaFetchRecords implements Runnable { startPolling(); } - protected void resume() { - resumeStrategy.resume(); - } - private void subscribe() { PartitionAssignmentListener listener = new PartitionAssignmentListener( threadId, topicName, diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java index b67ab0c..ce02e37 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue; import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey; public class PartitionAssignmentListener implements ConsumerRebalanceListener { @@ -40,6 +39,7 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { private final KafkaConfiguration configuration; private final KafkaConsumer consumer; private final Map<String, Long> lastProcessedOffset; + private final ResumeStrategy resumeStrategy; private Supplier<Boolean> stopStateSupplier; public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration, @@ -51,13 +51,12 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { this.consumer = consumer; this.lastProcessedOffset = lastProcessedOffset; this.stopStateSupplier = stopStateSupplier; - } - private void resumeFromOffset(TopicPartition topicPartition, String offsetState) { - // The state contains the last read offset, so you need to seek from the next one - long offset = deserializeOffsetValue(offsetState) + 1; - LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); - consumer.seek(topicPartition, offset); + StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); + String seekPolicy = configuration.getSeekTo(); + + LOG.info("Performing resume as {} ", seekPolicy); + resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, offsetRepository, seekPolicy); } @Override @@ -92,15 +91,6 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection<TopicPartition> partitions) { LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, topicName); - StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); - if (offsetRepository != null) { - for (TopicPartition partition : partitions) { - String offsetState = offsetRepository.getState(serializeOffsetKey(partition)); - if (offsetState != null && !offsetState.isEmpty()) { - // The state contains the last read offset, so you need to seek from the next one - resumeFromOffset(partition, offsetState); - } - } - } + resumeStrategy.resume(); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java index 8e5361d..d7606df 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java @@ -17,8 +17,6 @@ package org.apache.camel.component.kafka.consumer.support; -import java.time.Duration; - import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,15 +39,9 @@ public class SeekPolicyResumeStrategy implements ResumeStrategy { public void resume() { if (seekPolicy.equals("beginning")) { LOG.debug("Seeking from the beginning of topic"); - // This poll to ensure we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); consumer.seekToBeginning(consumer.assignment()); } else if (seekPolicy.equals("end")) { LOG.debug("Seeking from the end off the topic"); - // This poll to ensure we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); consumer.seekToEnd(consumer.assignment()); } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java index f143bd2..f4a9dd4 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java @@ -35,7 +35,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; @@ -45,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "true", disabledReason = "Runtime conflicts with the idempotency tests") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { public static final String TOPIC = "test"; @@ -89,6 +93,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { }; } + @Order(3) @Test public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException { String propagatedHeaderKey = "PropagatedCustomHeader"; @@ -119,6 +124,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive propagated header"); } + @Order(2) @Test public void kafkaRecordSpecificHeadersAreNotOverwritten() throws InterruptedException, IOException { String propagatedHeaderKey = KafkaConstants.TOPIC; @@ -137,6 +143,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { } @Test + @Order(1) public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws Exception { to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); @@ -150,6 +157,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { to.reset(); to.expectedMessageCount(5); + to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); // Restart endpoint, @@ -164,6 +172,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { to.assertIsSatisfied(3000); } + @Order(4) @Test public void kafkaMessageIsConsumedByCamelSeekedToEnd() throws Exception { to.expectedMessageCount(5); @@ -194,6 +203,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { to.assertIsSatisfied(3000); } + @Order(5) @Test public void headerDeserializerCouldBeOverridden() { KafkaEndpoint kafkaEndpoint