This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch topicMustExists in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/topicMustExists by this push: new ed825f84d36 CAMEL-21774: camel-kafka - Add topicMustExists option so consumer can fail if broker has no topic ed825f84d36 is described below commit ed825f84d36b3404909721a96d6ff81eab6b3c86 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Feb 21 16:29:50 2025 +0100 CAMEL-21774: camel-kafka - Add topicMustExists option so consumer can fail if broker has no topic --- .../support/subcription/DefaultSubscribeAdapter.java | 20 +++++++++++--------- .../KafkaConsumerCustomSubscribeAdapterIT.java | 6 +----- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java index 1fe6c1f01b5..6e9336a5db6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java @@ -30,6 +30,10 @@ public class DefaultSubscribeAdapter implements SubscribeAdapter { private final String topic; private final boolean topicMustExists; + public DefaultSubscribeAdapter() { + this(null, false); + } + public DefaultSubscribeAdapter(String topic, boolean topicMustExists) { this.topic = topic; this.topicMustExists = topicMustExists; @@ -49,15 +53,13 @@ public class DefaultSubscribeAdapter implements SubscribeAdapter { if (topicMustExists) { boolean found = false; - // check if a topic exists - var topics = consumer.listTopics(); - for (var id : topics.keySet()) { - if (!found) { - if (topicInfo.isPattern()) { - found = topicInfo.getPattern().matcher(id).matches(); - } else { - found = topicInfo.getTopics().contains(id); - } + var it = consumer.listTopics().keySet().iterator(); + while (!found && it.hasNext()) { + String id = it.next(); + if (topicInfo.isPattern()) { + found = topicInfo.getPattern().matcher(id).matches(); + } else { + found = topicInfo.getTopics().contains(id); } } if (!found) { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java index 281f7ccb930..421a92b590a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java @@ -44,10 +44,6 @@ public class KafkaConsumerCustomSubscribeAdapterIT extends BaseKafkaTestSupport private static class TestSubscribeAdapter extends DefaultSubscribeAdapter { private volatile boolean subscribeCalled = false; - public TestSubscribeAdapter(String topic, boolean topicMustExists) { - super(topic, topicMustExists); - } - @Override public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) { try { @@ -63,7 +59,7 @@ public class KafkaConsumerCustomSubscribeAdapterIT extends BaseKafkaTestSupport } @BindToRegistry(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER) - private TestSubscribeAdapter testSubscribeAdapter = new TestSubscribeAdapter(null, false); + private TestSubscribeAdapter testSubscribeAdapter = new TestSubscribeAdapter(); @BeforeEach public void before() {