This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch topicMustExists
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/topicMustExists by this push:
     new ed825f84d36 CAMEL-21774: camel-kafka - Add topicMustExists option so 
consumer can fail if broker has no topic
ed825f84d36 is described below

commit ed825f84d36b3404909721a96d6ff81eab6b3c86
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Feb 21 16:29:50 2025 +0100

    CAMEL-21774: camel-kafka - Add topicMustExists option so consumer can fail 
if broker has no topic
---
 .../support/subcription/DefaultSubscribeAdapter.java | 20 +++++++++++---------
 .../KafkaConsumerCustomSubscribeAdapterIT.java       |  6 +-----
 2 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
index 1fe6c1f01b5..6e9336a5db6 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
@@ -30,6 +30,10 @@ public class DefaultSubscribeAdapter implements 
SubscribeAdapter {
     private final String topic;
     private final boolean topicMustExists;
 
+    public DefaultSubscribeAdapter() {
+        this(null, false);
+    }
+
     public DefaultSubscribeAdapter(String topic, boolean topicMustExists) {
         this.topic = topic;
         this.topicMustExists = topicMustExists;
@@ -49,15 +53,13 @@ public class DefaultSubscribeAdapter implements 
SubscribeAdapter {
 
         if (topicMustExists) {
             boolean found = false;
-            // check if a topic exists
-            var topics = consumer.listTopics();
-            for (var id : topics.keySet()) {
-                if (!found) {
-                    if (topicInfo.isPattern()) {
-                        found = topicInfo.getPattern().matcher(id).matches();
-                    } else {
-                        found = topicInfo.getTopics().contains(id);
-                    }
+            var it = consumer.listTopics().keySet().iterator();
+            while (!found && it.hasNext()) {
+                String id = it.next();
+                if (topicInfo.isPattern()) {
+                    found = topicInfo.getPattern().matcher(id).matches();
+                } else {
+                    found = topicInfo.getTopics().contains(id);
                 }
             }
             if (!found) {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
index 281f7ccb930..421a92b590a 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
@@ -44,10 +44,6 @@ public class KafkaConsumerCustomSubscribeAdapterIT extends 
BaseKafkaTestSupport
     private static class TestSubscribeAdapter extends DefaultSubscribeAdapter {
         private volatile boolean subscribeCalled = false;
 
-        public TestSubscribeAdapter(String topic, boolean topicMustExists) {
-            super(topic, topicMustExists);
-        }
-
         @Override
         public void subscribe(Consumer<?, ?> consumer, 
ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) {
             try {
@@ -63,7 +59,7 @@ public class KafkaConsumerCustomSubscribeAdapterIT extends 
BaseKafkaTestSupport
     }
 
     @BindToRegistry(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER)
-    private TestSubscribeAdapter testSubscribeAdapter = new 
TestSubscribeAdapter(null, false);
+    private TestSubscribeAdapter testSubscribeAdapter = new 
TestSubscribeAdapter();
 
     @BeforeEach
     public void before() {

Reply via email to