This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 772bea79be [kafka] StreamMetadataProvider.getTopics() to use admin 
client instead of consumer (#14678)
772bea79be is described below

commit 772bea79beb33a0e035caa7bb215156f990e0523
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Dec 18 12:12:57 2024 -0800

    [kafka] StreamMetadataProvider.getTopics() to use admin client instead of 
consumer (#14678)
---
 .../KafkaPartitionLevelConnectionHandler.java      | 31 +++++++++++++++------
 .../kafka20/KafkaStreamMetadataProvider.java       | 22 ++++++++++-----
 .../kafka20/KafkaPartitionLevelConsumerTest.java   | 28 +++++++++++++++++++
 .../KafkaPartitionLevelConnectionHandler.java      | 32 ++++++++++++++++------
 .../kafka30/KafkaStreamMetadataProvider.java       | 22 ++++++++++-----
 5 files changed, 104 insertions(+), 31 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index 7eab17c0e4..ea0a5093e8 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -53,12 +55,21 @@ public abstract class KafkaPartitionLevelConnectionHandler {
   protected final String _topic;
   protected final Consumer<String, Bytes> _consumer;
   protected final TopicPartition _topicPartition;
+  protected final Properties _consumerProp;
 
   public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition) {
     _config = new KafkaPartitionLevelStreamConfig(streamConfig);
     _clientId = clientId;
     _partition = partition;
     _topic = _config.getKafkaTopicName();
+    _consumerProp = buildProperties(streamConfig);
+    KafkaSSLUtils.initSSL(_consumerProp);
+    _consumer = createConsumer(_consumerProp);
+    _topicPartition = new TopicPartition(_topic, _partition);
+    _consumer.assign(Collections.singletonList(_topicPartition));
+  }
+
+  private Properties buildProperties(StreamConfig streamConfig) {
     Properties consumerProp = new Properties();
     consumerProp.putAll(streamConfig.getStreamConfigsMap());
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_config.getBootstrapHosts());
@@ -68,28 +79,32 @@ public abstract class KafkaPartitionLevelConnectionHandler {
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
_config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    KafkaSSLUtils.initSSL(consumerProp);
-    _consumer = createConsumer(consumerProp);
-    _topicPartition = new TopicPartition(_topic, _partition);
-    _consumer.assign(Collections.singletonList(_topicPartition));
+    return consumerProp;
   }
 
   private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+  }
+
+  protected AdminClient createAdminClient() {
+    return retry(() -> AdminClient.create(_consumerProp), 5);
+  }
+
+  private static <T> T retry(Supplier<T> s, int nRetries) {
     // Creation of the KafkaConsumer can fail for multiple reasons including 
DNS issues.
     // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 
10 seconds total felt
     // like a good balance of not waiting too long for a retry, but also not 
retrying too many times.
-    int maxTries = 5;
     int tries = 0;
     while (true) {
       try {
-        return new KafkaConsumer<>(consumerProp);
+        return s.get();
       } catch (KafkaException e) {
         tries++;
-        if (tries >= maxTries) {
+        if (tries >= nRetries) {
           LOGGER.error("Caught exception while creating Kafka consumer, giving 
up", e);
           throw e;
         }
-        LOGGER.warn("Caught exception while creating Kafka consumer, retrying 
{}/{}", tries, maxTries, e);
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying 
{}/{}", tries, nRetries, e);
         // We are choosing to sleepUniterruptibly here because other parts of 
the Kafka consumer code do this
         // as well. We don't want random interrupts to cause us to fail to 
create the consumer and have the table
         // stuck in ERROR state.
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index bf837b54e5..a04cca66d2 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -28,8 +28,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -169,14 +172,19 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
   @Override
   public List<TopicMetadata> getTopics() {
-    Map<String, List<PartitionInfo>> namePartitionsMap = 
_consumer.listTopics();
-    if (namePartitionsMap == null) {
-      return Collections.emptyList();
+    try (AdminClient adminClient = createAdminClient()) {
+      ListTopicsResult result = adminClient.listTopics();
+      if (result == null) {
+        return Collections.emptyList();
+      }
+      return result.names()
+          .get()
+          .stream()
+          .map(topic -> new KafkaTopicMetadata().setName(topic))
+          .collect(Collectors.toList());
+    } catch (ExecutionException | InterruptedException e) {
+      throw new RuntimeException(e);
     }
-    return namePartitionsMap.keySet()
-        .stream()
-        .map(topic -> new KafkaTopicMetadata().setName(topic))
-        .collect(Collectors.toList());
   }
 
   public static class KafkaTopicMetadata implements TopicMetadata {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 6719a722c7..e879f868f0 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -20,9 +20,11 @@ package org.apache.pinot.plugin.stream.kafka20;
 
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -39,6 +41,7 @@ import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMessage;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -399,4 +402,29 @@ public class KafkaPartitionLevelConsumerTest {
     }
     assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "700");
   }
+
+  @Test
+  public void testGetTopics() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC");
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider(clientId, streamConfig);
+    List<StreamMetadataProvider.TopicMetadata> topics = 
streamMetadataProvider.getTopics();
+    List<String> topicNames = topics.stream()
+        .map(StreamMetadataProvider.TopicMetadata::getName)
+        .collect(Collectors.toList());
+    assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2, 
TEST_TOPIC_3)));
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
index 6ca665b569..92ee657a5a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -38,7 +40,6 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * KafkaPartitionLevelConnectionHandler provides low level APIs to access 
Kafka partition level information.
  * E.g. partition counts, offsets per partition.
@@ -53,12 +54,21 @@ public abstract class KafkaPartitionLevelConnectionHandler {
   protected final String _topic;
   protected final Consumer<String, Bytes> _consumer;
   protected final TopicPartition _topicPartition;
+  protected final Properties _consumerProp;
 
   public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition) {
     _config = new KafkaPartitionLevelStreamConfig(streamConfig);
     _clientId = clientId;
     _partition = partition;
     _topic = _config.getKafkaTopicName();
+    _consumerProp = buildProperties(streamConfig);
+    KafkaSSLUtils.initSSL(_consumerProp);
+    _consumer = createConsumer(_consumerProp);
+    _topicPartition = new TopicPartition(_topic, _partition);
+    _consumer.assign(Collections.singletonList(_topicPartition));
+  }
+
+  private Properties buildProperties(StreamConfig streamConfig) {
     Properties consumerProp = new Properties();
     consumerProp.putAll(streamConfig.getStreamConfigsMap());
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_config.getBootstrapHosts());
@@ -68,28 +78,32 @@ public abstract class KafkaPartitionLevelConnectionHandler {
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
_config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    KafkaSSLUtils.initSSL(consumerProp);
-    _consumer = createConsumer(consumerProp);
-    _topicPartition = new TopicPartition(_topic, _partition);
-    _consumer.assign(Collections.singletonList(_topicPartition));
+    return consumerProp;
   }
 
   private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+  }
+
+  protected AdminClient createAdminClient() {
+    return retry(() -> AdminClient.create(_consumerProp), 5);
+  }
+
+  private static <T> T retry(Supplier<T> s, int nRetries) {
     // Creation of the KafkaConsumer can fail for multiple reasons including 
DNS issues.
     // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 
10 seconds total felt
     // like a good balance of not waiting too long for a retry, but also not 
retrying too many times.
-    int maxTries = 5;
     int tries = 0;
     while (true) {
       try {
-        return new KafkaConsumer<>(consumerProp);
+        return s.get();
       } catch (KafkaException e) {
         tries++;
-        if (tries >= maxTries) {
+        if (tries >= nRetries) {
           LOGGER.error("Caught exception while creating Kafka consumer, giving 
up", e);
           throw e;
         }
-        LOGGER.warn("Caught exception while creating Kafka consumer, retrying 
{}/{}", tries, maxTries, e);
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying 
{}/{}", tries, nRetries, e);
         // We are choosing to sleepUniterruptibly here because other parts of 
the Kafka consumer code do this
         // as well. We don't want random interrupts to cause us to fail to 
create the consumer and have the table
         // stuck in ERROR state.
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 5fec5ddec2..96775641ca 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -28,8 +28,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -169,14 +172,19 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
   @Override
   public List<TopicMetadata> getTopics() {
-    Map<String, List<PartitionInfo>> namePartitionsMap = 
_consumer.listTopics();
-    if (namePartitionsMap == null) {
-      return Collections.emptyList();
+    try (AdminClient adminClient = createAdminClient()) {
+      ListTopicsResult result = adminClient.listTopics();
+      if (result == null) {
+        return Collections.emptyList();
+      }
+      return result.names()
+          .get()
+          .stream()
+          .map(topic -> new KafkaTopicMetadata().setName(topic))
+          .collect(Collectors.toList());
+    } catch (ExecutionException | InterruptedException e) {
+      throw new RuntimeException(e);
     }
-    return namePartitionsMap.keySet()
-        .stream()
-        .map(topic -> new KafkaTopicMetadata().setName(topic))
-        .collect(Collectors.toList());
   }
 
   public static class KafkaTopicMetadata implements TopicMetadata {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to