This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 772bea79be [kafka] StreamMetadataProvider.getTopics() to use admin
client instead of consumer (#14678)
772bea79be is described below
commit 772bea79beb33a0e035caa7bb215156f990e0523
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Dec 18 12:12:57 2024 -0800
[kafka] StreamMetadataProvider.getTopics() to use admin client instead of
consumer (#14678)
---
.../KafkaPartitionLevelConnectionHandler.java | 31 +++++++++++++++------
.../kafka20/KafkaStreamMetadataProvider.java | 22 ++++++++++-----
.../kafka20/KafkaPartitionLevelConsumerTest.java | 28 +++++++++++++++++++
.../KafkaPartitionLevelConnectionHandler.java | 32 ++++++++++++++++------
.../kafka30/KafkaStreamMetadataProvider.java | 22 ++++++++++-----
5 files changed, 104 insertions(+), 31 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index 7eab17c0e4..ea0a5093e8 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -53,12 +55,21 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final String _topic;
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
+ protected final Properties _consumerProp;
public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
_clientId = clientId;
_partition = partition;
_topic = _config.getKafkaTopicName();
+ _consumerProp = buildProperties(streamConfig);
+ KafkaSSLUtils.initSSL(_consumerProp);
+ _consumer = createConsumer(_consumerProp);
+ _topicPartition = new TopicPartition(_topic, _partition);
+ _consumer.assign(Collections.singletonList(_topicPartition));
+ }
+
+ private Properties buildProperties(StreamConfig streamConfig) {
Properties consumerProp = new Properties();
consumerProp.putAll(streamConfig.getStreamConfigsMap());
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
_config.getBootstrapHosts());
@@ -68,28 +79,32 @@ public abstract class KafkaPartitionLevelConnectionHandler {
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
_config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
- KafkaSSLUtils.initSSL(consumerProp);
- _consumer = createConsumer(consumerProp);
- _topicPartition = new TopicPartition(_topic, _partition);
- _consumer.assign(Collections.singletonList(_topicPartition));
+ return consumerProp;
}
private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+ return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+ }
+
+ protected AdminClient createAdminClient() {
+ return retry(() -> AdminClient.create(_consumerProp), 5);
+ }
+
+ private static <T> T retry(Supplier<T> s, int nRetries) {
// Creation of the KafkaConsumer can fail for multiple reasons including
DNS issues.
// We arbitrarily chose 5 retries with 2 seconds sleep in between retries.
10 seconds total felt
// like a good balance of not waiting too long for a retry, but also not
retrying too many times.
- int maxTries = 5;
int tries = 0;
while (true) {
try {
- return new KafkaConsumer<>(consumerProp);
+ return s.get();
} catch (KafkaException e) {
tries++;
- if (tries >= maxTries) {
+ if (tries >= nRetries) {
LOGGER.error("Caught exception while creating Kafka consumer, giving
up", e);
throw e;
}
- LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, maxTries, e);
+ LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, nRetries, e);
// We are choosing to sleepUniterruptibly here because other parts of
the Kafka consumer code do this
// as well. We don't want random interrupts to cause us to fail to
create the consumer and have the table
// stuck in ERROR state.
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index bf837b54e5..a04cca66d2 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -28,8 +28,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
@@ -169,14 +172,19 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public List<TopicMetadata> getTopics() {
- Map<String, List<PartitionInfo>> namePartitionsMap =
_consumer.listTopics();
- if (namePartitionsMap == null) {
- return Collections.emptyList();
+ try (AdminClient adminClient = createAdminClient()) {
+ ListTopicsResult result = adminClient.listTopics();
+ if (result == null) {
+ return Collections.emptyList();
+ }
+ return result.names()
+ .get()
+ .stream()
+ .map(topic -> new KafkaTopicMetadata().setName(topic))
+ .collect(Collectors.toList());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
}
- return namePartitionsMap.keySet()
- .stream()
- .map(topic -> new KafkaTopicMetadata().setName(topic))
- .collect(Collectors.toList());
}
public static class KafkaTopicMetadata implements TopicMetadata {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 6719a722c7..e879f868f0 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -20,9 +20,11 @@ package org.apache.pinot.plugin.stream.kafka20;
import java.time.Instant;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -39,6 +41,7 @@ import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -399,4 +402,29 @@ public class KafkaPartitionLevelConsumerTest {
}
assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "700");
}
+
+ @Test
+ public void testGetTopics() {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC");
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider(clientId, streamConfig);
+ List<StreamMetadataProvider.TopicMetadata> topics =
streamMetadataProvider.getTopics();
+ List<String> topicNames = topics.stream()
+ .map(StreamMetadataProvider.TopicMetadata::getName)
+ .collect(Collectors.toList());
+ assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2,
TEST_TOPIC_3)));
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
index 6ca665b569..92ee657a5a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -38,7 +40,6 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* KafkaPartitionLevelConnectionHandler provides low level APIs to access
Kafka partition level information.
* E.g. partition counts, offsets per partition.
@@ -53,12 +54,21 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final String _topic;
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
+ protected final Properties _consumerProp;
public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
_clientId = clientId;
_partition = partition;
_topic = _config.getKafkaTopicName();
+ _consumerProp = buildProperties(streamConfig);
+ KafkaSSLUtils.initSSL(_consumerProp);
+ _consumer = createConsumer(_consumerProp);
+ _topicPartition = new TopicPartition(_topic, _partition);
+ _consumer.assign(Collections.singletonList(_topicPartition));
+ }
+
+ private Properties buildProperties(StreamConfig streamConfig) {
Properties consumerProp = new Properties();
consumerProp.putAll(streamConfig.getStreamConfigsMap());
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
_config.getBootstrapHosts());
@@ -68,28 +78,32 @@ public abstract class KafkaPartitionLevelConnectionHandler {
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
_config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
- KafkaSSLUtils.initSSL(consumerProp);
- _consumer = createConsumer(consumerProp);
- _topicPartition = new TopicPartition(_topic, _partition);
- _consumer.assign(Collections.singletonList(_topicPartition));
+ return consumerProp;
}
private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+ return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+ }
+
+ protected AdminClient createAdminClient() {
+ return retry(() -> AdminClient.create(_consumerProp), 5);
+ }
+
+ private static <T> T retry(Supplier<T> s, int nRetries) {
// Creation of the KafkaConsumer can fail for multiple reasons including
DNS issues.
// We arbitrarily chose 5 retries with 2 seconds sleep in between retries.
10 seconds total felt
// like a good balance of not waiting too long for a retry, but also not
retrying too many times.
- int maxTries = 5;
int tries = 0;
while (true) {
try {
- return new KafkaConsumer<>(consumerProp);
+ return s.get();
} catch (KafkaException e) {
tries++;
- if (tries >= maxTries) {
+ if (tries >= nRetries) {
LOGGER.error("Caught exception while creating Kafka consumer, giving
up", e);
throw e;
}
- LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, maxTries, e);
+ LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, nRetries, e);
// We are choosing to sleepUniterruptibly here because other parts of
the Kafka consumer code do this
// as well. We don't want random interrupts to cause us to fail to
create the consumer and have the table
// stuck in ERROR state.
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 5fec5ddec2..96775641ca 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -28,8 +28,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
@@ -169,14 +172,19 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public List<TopicMetadata> getTopics() {
- Map<String, List<PartitionInfo>> namePartitionsMap =
_consumer.listTopics();
- if (namePartitionsMap == null) {
- return Collections.emptyList();
+ try (AdminClient adminClient = createAdminClient()) {
+ ListTopicsResult result = adminClient.listTopics();
+ if (result == null) {
+ return Collections.emptyList();
+ }
+ return result.names()
+ .get()
+ .stream()
+ .map(topic -> new KafkaTopicMetadata().setName(topic))
+ .collect(Collectors.toList());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
}
- return namePartitionsMap.keySet()
- .stream()
- .map(topic -> new KafkaTopicMetadata().setName(topic))
- .collect(Collectors.toList());
}
public static class KafkaTopicMetadata implements TopicMetadata {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]