This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push: new 0c124ff Fix KafkaIdempotentRepository flagging cache as ready incorrectly (#5075) 0c124ff is described below commit 0c124ff0881377311333fb25cc4a8d21d7f83a1d Author: Javier Holguera <javier.holgu...@gmail.com> AuthorDate: Thu Feb 11 16:32:51 2021 +0000 Fix KafkaIdempotentRepository flagging cache as ready incorrectly (#5075) --- .../services/org/apache/camel/component.properties | 2 +- .../org/apache/camel/component/kafka/kafka.json | 2 +- .../kafka/KafkaIdempotentRepository.java | 27 ++++++++-- .../kafka/KafkaIdempotentRepositoryEagerTest.java | 6 ++- .../KafkaIdempotentRepositoryNonEagerTest.java | 7 ++- ... KafkaIdempotentRepositoryPersistenceTest.java} | 58 +++++++++++----------- .../src/generated/resources/metadata.json | 2 +- 7 files changed, 68 insertions(+), 36 deletions(-) diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties index 95e9845..40484618 100644 --- a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties +++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties @@ -2,6 +2,6 @@ components=kafka groupId=org.apache.camel artifactId=camel-kafka -version=3.7.2-SNAPSHOT +version=3.7.3-SNAPSHOT projectName=Camel :: Kafka projectDescription=Camel Kafka support diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index bf991c6..8eec038 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -11,7 +11,7 @@ "supportLevel": "Stable", "groupId": "org.apache.camel", "artifactId": "camel-kafka", - "version": "3.7.2-SNAPSHOT", + "version": "3.7.3-SNAPSHOT", "scheme": "kafka", "extendsScheme": "", "syntax": "kafka:topic", diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index 123a956..7675aeb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor.idempotent.kafka; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -39,6 +40,7 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -46,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; @@ -426,9 +429,27 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot @Override public void run() { log.debug("Subscribing consumer to {}", topic); - consumer.subscribe(Collections.singleton(topic)); - log.debug("Seeking to beginning"); - consumer.seekToBeginning(consumer.assignment()); + consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> collection) { + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> collection) { + // Whenever a partition is assigned, we want to consume from the beginning to guarantee all the + // existing entries in the topic/partition are added to the cache + log.debug("Seeking to beginning"); + consumer.seekToBeginning(collection); + } + }); + + // According to the Kafka documentation: "Rebalances will only occur during an active call to poll, so + // callbacks will also only be invoked during that time". + // We can safely trigger a poll(0) because the consumer doesn't have any record pre-fetched. + log.debug("Forcing rebalance to get partitions assigned"); + if (!consumer.poll(0).isEmpty()) { + throw new IllegalStateException("Firts call to Kafka consumer.poll(0) should never return any record"); + } POLL_LOOP: while (running.get()) { log.trace("Polling"); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java index 183b89d..480a428 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.processor.idempotent.kafka; +import java.util.UUID; + import org.apache.camel.BindToRegistry; import org.apache.camel.CamelExecutionException; import org.apache.camel.EndpointInject; @@ -31,9 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; * Test for eager idempotentRepository usage. */ public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest { + + // Every instance of the repository must use a different topic to guarantee isolation between tests @BindToRegistry("kafkaIdempotentRepository") private KafkaIdempotentRepository kafkaIdempotentRepository - = new KafkaIdempotentRepository("TEST_IDEM", getBootstrapServers()); + = new KafkaIdempotentRepository("TEST_EAGER_" + UUID.randomUUID().toString(), getBootstrapServers()); @EndpointInject("mock:out") private MockEndpoint mockOut; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java index 9761214..f86b40e 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.processor.idempotent.kafka; +import java.util.UUID; + import org.apache.camel.BindToRegistry; import org.apache.camel.CamelExecutionException; import org.apache.camel.EndpointInject; @@ -31,9 +33,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; * Test for non-eager idempotentRepository usage. */ public class KafkaIdempotentRepositoryNonEagerTest extends BaseEmbeddedKafkaTest { + + // Every instance of the repository must use a different topic to guarantee isolation between tests @BindToRegistry("kafkaIdempotentRepository") private KafkaIdempotentRepository kafkaIdempotentRepository - = new KafkaIdempotentRepository("TEST_IDEM", getBootstrapServers()); + = new KafkaIdempotentRepository( + "TEST_NON_EAGER_" + UUID.randomUUID().toString(), getBootstrapServers()); @EndpointInject("mock:out") private MockEndpoint mockOut; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java similarity index 60% copy from components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java copy to components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java index 183b89d..6577011 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java @@ -17,23 +17,32 @@ package org.apache.camel.processor.idempotent.kafka; import org.apache.camel.BindToRegistry; -import org.apache.camel.CamelExecutionException; import org.apache.camel.EndpointInject; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.BaseEmbeddedKafkaTest; import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Test for eager idempotentRepository usage. + * Test whether the KafkaIdempotentRepository successfully recreates its cache from pre-existing topics. This guarantees + * that the de-duplication state survives application instance restarts. + * + * This test requires running in a certain order (which isn't great for unit testing), hence the ordering-related + * annotations. */ -public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest { +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class KafkaIdempotentRepositoryPersistenceTest extends BaseEmbeddedKafkaTest { + + // Every instance of the repository must use a different topic to guarantee isolation between tests @BindToRegistry("kafkaIdempotentRepository") private KafkaIdempotentRepository kafkaIdempotentRepository - = new KafkaIdempotentRepository("TEST_IDEM", getBootstrapServers()); + = new KafkaIdempotentRepository("TEST_PERSISTENCE", getBootstrapServers()); @EndpointInject("mock:out") private MockEndpoint mockOut; @@ -52,44 +61,37 @@ public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest { }; } + @Order(1) @Test - public void testRemovesDuplicates() throws InterruptedException { + public void testFirstPassFiltersAsExpected() throws InterruptedException { for (int i = 0; i < 10; i++) { template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5); } + // all records sent initially + assertEquals(10, mockBefore.getReceivedCounter()); + + // filters second attempt with same value assertEquals(5, kafkaIdempotentRepository.getDuplicateCount()); + // only first 1-4 records are received, the rest are filtered assertEquals(5, mockOut.getReceivedCounter()); - assertEquals(10, mockBefore.getReceivedCounter()); } + @Order(2) @Test - public void testRollsBackOnException() throws InterruptedException { - mockOut.whenAnyExchangeReceived(exchange -> { - int id = exchange.getIn().getHeader("id", Integer.class); - if (id == 0) { - throw new IllegalArgumentException("Boom!"); - } - }); - + public void testSecondPassFiltersEverything() throws InterruptedException { for (int i = 0; i < 10; i++) { - try { - template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5); - } catch (CamelExecutionException cex) { - // no-op; expected - } + template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5); } - assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // id{0} - // is - // not a - // duplicate - - assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through the - // idempotency check - // twice + // all records sent initially assertEquals(10, mockBefore.getReceivedCounter()); - } + // the state from the previous test guarantees that all attempts now are blocked + assertEquals(10, kafkaIdempotentRepository.getDuplicateCount()); + + // nothing gets passed the idempotent consumer this time + assertEquals(0, mockOut.getReceivedCounter()); + } } diff --git a/core/camel-componentdsl/src/generated/resources/metadata.json b/core/camel-componentdsl/src/generated/resources/metadata.json index 832254b..05ac7f9 100644 --- a/core/camel-componentdsl/src/generated/resources/metadata.json +++ b/core/camel-componentdsl/src/generated/resources/metadata.json @@ -4491,7 +4491,7 @@ "supportLevel": "Stable", "groupId": "org.apache.camel", "artifactId": "camel-kafka", - "version": "3.7.2-SNAPSHOT", + "version": "3.7.3-SNAPSHOT", "scheme": "kafka", "extendsScheme": "", "syntax": "kafka:topic",