This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new 0c124ff  Fix KafkaIdempotentRepository flagging cache as ready 
incorrectly (#5075)
0c124ff is described below

commit 0c124ff0881377311333fb25cc4a8d21d7f83a1d
Author: Javier Holguera <javier.holgu...@gmail.com>
AuthorDate: Thu Feb 11 16:32:51 2021 +0000

    Fix KafkaIdempotentRepository flagging cache as ready incorrectly (#5075)
---
 .../services/org/apache/camel/component.properties |  2 +-
 .../org/apache/camel/component/kafka/kafka.json    |  2 +-
 .../kafka/KafkaIdempotentRepository.java           | 27 ++++++++--
 .../kafka/KafkaIdempotentRepositoryEagerTest.java  |  6 ++-
 .../KafkaIdempotentRepositoryNonEagerTest.java     |  7 ++-
 ... KafkaIdempotentRepositoryPersistenceTest.java} | 58 +++++++++++-----------
 .../src/generated/resources/metadata.json          |  2 +-
 7 files changed, 68 insertions(+), 36 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
index 95e9845..40484618 100644
--- 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -2,6 +2,6 @@
 components=kafka
 groupId=org.apache.camel
 artifactId=camel-kafka
-version=3.7.2-SNAPSHOT
+version=3.7.3-SNAPSHOT
 projectName=Camel :: Kafka
 projectDescription=Camel Kafka support
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index bf991c6..8eec038 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -11,7 +11,7 @@
     "supportLevel": "Stable",
     "groupId": "org.apache.camel",
     "artifactId": "camel-kafka",
-    "version": "3.7.2-SNAPSHOT",
+    "version": "3.7.3-SNAPSHOT",
     "scheme": "kafka",
     "extendsScheme": "",
     "syntax": "kafka:topic",
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 123a956..7675aeb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor.idempotent.kafka;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -39,6 +40,7 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
@@ -426,9 +429,27 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         @Override
         public void run() {
             log.debug("Subscribing consumer to {}", topic);
-            consumer.subscribe(Collections.singleton(topic));
-            log.debug("Seeking to beginning");
-            consumer.seekToBeginning(consumer.assignment());
+            consumer.subscribe(Collections.singleton(topic), new 
ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
collection) {
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
collection) {
+                    // Whenever a partition is assigned, we want to consume 
from the beginning to guarantee all the
+                    // existing entries in the topic/partition are added to 
the cache
+                    log.debug("Seeking to beginning");
+                    consumer.seekToBeginning(collection);
+                }
+            });
+
+            // According to the Kafka documentation: "Rebalances will only 
occur during an active call to poll, so
+            // callbacks will also only be invoked during that time".
+            // We can safely trigger a poll(0) because the consumer doesn't 
have any record pre-fetched.
+            log.debug("Forcing rebalance to get partitions assigned");
+            if (!consumer.poll(0).isEmpty()) {
+                throw new IllegalStateException("Firts call to Kafka 
consumer.poll(0) should never return any record");
+            }
 
             POLL_LOOP: while (running.get()) {
                 log.trace("Polling");
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
index 183b89d..480a428 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor.idempotent.kafka;
 
+import java.util.UUID;
+
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.EndpointInject;
@@ -31,9 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * Test for eager idempotentRepository usage.
  */
 public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest {
+
+    // Every instance of the repository must use a different topic to 
guarantee isolation between tests
     @BindToRegistry("kafkaIdempotentRepository")
     private KafkaIdempotentRepository kafkaIdempotentRepository
-            = new KafkaIdempotentRepository("TEST_IDEM", 
getBootstrapServers());
+            = new KafkaIdempotentRepository("TEST_EAGER_" + 
UUID.randomUUID().toString(), getBootstrapServers());
 
     @EndpointInject("mock:out")
     private MockEndpoint mockOut;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
index 9761214..f86b40e 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor.idempotent.kafka;
 
+import java.util.UUID;
+
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.EndpointInject;
@@ -31,9 +33,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * Test for non-eager idempotentRepository usage.
  */
 public class KafkaIdempotentRepositoryNonEagerTest extends 
BaseEmbeddedKafkaTest {
+
+    // Every instance of the repository must use a different topic to 
guarantee isolation between tests
     @BindToRegistry("kafkaIdempotentRepository")
     private KafkaIdempotentRepository kafkaIdempotentRepository
-            = new KafkaIdempotentRepository("TEST_IDEM", 
getBootstrapServers());
+            = new KafkaIdempotentRepository(
+                    "TEST_NON_EAGER_" + UUID.randomUUID().toString(), 
getBootstrapServers());
 
     @EndpointInject("mock:out")
     private MockEndpoint mockOut;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java
similarity index 60%
copy from 
components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
copy to 
components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java
index 183b89d..6577011 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java
@@ -17,23 +17,32 @@
 package org.apache.camel.processor.idempotent.kafka;
 
 import org.apache.camel.BindToRegistry;
-import org.apache.camel.CamelExecutionException;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.BaseEmbeddedKafkaTest;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
- * Test for eager idempotentRepository usage.
+ * Test whether the KafkaIdempotentRepository successfully recreates its cache 
from pre-existing topics. This guarantees
+ * that the de-duplication state survives application instance restarts.
+ *
+ * This test requires running in a certain order (which isn't great for unit 
testing), hence the ordering-related
+ * annotations.
  */
-public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest {
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class KafkaIdempotentRepositoryPersistenceTest extends 
BaseEmbeddedKafkaTest {
+
+    // Every instance of the repository must use a different topic to 
guarantee isolation between tests
     @BindToRegistry("kafkaIdempotentRepository")
     private KafkaIdempotentRepository kafkaIdempotentRepository
-            = new KafkaIdempotentRepository("TEST_IDEM", 
getBootstrapServers());
+            = new KafkaIdempotentRepository("TEST_PERSISTENCE", 
getBootstrapServers());
 
     @EndpointInject("mock:out")
     private MockEndpoint mockOut;
@@ -52,44 +61,37 @@ public class KafkaIdempotentRepositoryEagerTest extends 
BaseEmbeddedKafkaTest {
         };
     }
 
+    @Order(1)
     @Test
-    public void testRemovesDuplicates() throws InterruptedException {
+    public void testFirstPassFiltersAsExpected() throws InterruptedException {
         for (int i = 0; i < 10; i++) {
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
 
+        // all records sent initially
+        assertEquals(10, mockBefore.getReceivedCounter());
+
+        // filters second attempt with same value
         assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
 
+        // only first 1-4 records are received, the rest are filtered
         assertEquals(5, mockOut.getReceivedCounter());
-        assertEquals(10, mockBefore.getReceivedCounter());
     }
 
+    @Order(2)
     @Test
-    public void testRollsBackOnException() throws InterruptedException {
-        mockOut.whenAnyExchangeReceived(exchange -> {
-            int id = exchange.getIn().getHeader("id", Integer.class);
-            if (id == 0) {
-                throw new IllegalArgumentException("Boom!");
-            }
-        });
-
+    public void testSecondPassFiltersEverything() throws InterruptedException {
         for (int i = 0; i < 10; i++) {
-            try {
-                template.sendBodyAndHeader("direct:in", "Test message", "id", 
i % 5);
-            } catch (CamelExecutionException cex) {
-                // no-op; expected
-            }
+            template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
 
-        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // 
id{0}
-                                                                       // is
-                                                                       // not a
-                                                                       // 
duplicate
-
-        assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through 
the
-                                                      // idempotency check
-                                                      // twice
+        // all records sent initially
         assertEquals(10, mockBefore.getReceivedCounter());
-    }
 
+        // the state from the previous test guarantees that all attempts now 
are blocked
+        assertEquals(10, kafkaIdempotentRepository.getDuplicateCount());
+
+        // nothing gets passed the idempotent consumer this time
+        assertEquals(0, mockOut.getReceivedCounter());
+    }
 }
diff --git a/core/camel-componentdsl/src/generated/resources/metadata.json 
b/core/camel-componentdsl/src/generated/resources/metadata.json
index 832254b..05ac7f9 100644
--- a/core/camel-componentdsl/src/generated/resources/metadata.json
+++ b/core/camel-componentdsl/src/generated/resources/metadata.json
@@ -4491,7 +4491,7 @@
     "supportLevel": "Stable",
     "groupId": "org.apache.camel",
     "artifactId": "camel-kafka",
-    "version": "3.7.2-SNAPSHOT",
+    "version": "3.7.3-SNAPSHOT",
     "scheme": "kafka",
     "extendsScheme": "",
     "syntax": "kafka:topic",

Reply via email to