This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f76e37196 Reuse Kafka admin client for better performance (#16129)
2f76e37196 is described below

commit 2f76e371968152fd428e08b2e66b196328920786
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Jun 20 10:25:39 2025 +0530

    Reuse Kafka admin client for better performance (#16129)
    
    Co-authored-by: KKCorps <kar...@startee.ai>
---
 .../plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java | 3 +++
 .../pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java    | 6 +++---
 .../plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java | 2 ++
 .../pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java    | 6 +++---
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index ea0a5093e8..fb7052f6ac 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -56,6 +56,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
   protected final Consumer<String, Bytes> _consumer;
   protected final TopicPartition _topicPartition;
   protected final Properties _consumerProp;
+  protected final AdminClient _adminClient;
 
   public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition) {
     _config = new KafkaPartitionLevelStreamConfig(streamConfig);
@@ -67,6 +68,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
     _consumer = createConsumer(_consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
+    _adminClient = createAdminClient();
   }
 
   private Properties buildProperties(StreamConfig streamConfig) {
@@ -116,6 +118,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
   public void close()
       throws IOException {
     _consumer.close();
+    _adminClient.close();
   }
 
   @VisibleForTesting
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index a584a0ee59..0d963ea65b 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -100,7 +100,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    try (AdminClient adminClient = createAdminClient()) {
+    try {
       // Build the offset spec request for this partition
       Map<TopicPartition, OffsetSpec> request = new HashMap<>();
       if (offsetCriteria.isLargest()) {
@@ -117,13 +117,13 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
         throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
       }
       // Query via AdminClient (thread-safe)
-      ListOffsetsResult result = adminClient.listOffsets(request);
+      ListOffsetsResult result = _adminClient.listOffsets(request);
       Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
           result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
       if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || 
offsetCriteria.isPeriod())) {
         // fetch endOffsets as fallback
         request.put(_topicPartition, OffsetSpec.latest());
-        result = adminClient.listOffsets(request);
+        result = _adminClient.listOffsets(request);
         offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
         LOGGER.warn(
             "initial offset type is {} and its value evaluates to null hence 
proceeding with offset {} " + "for "
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
index 92ee657a5a..81690b5380 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
@@ -55,6 +55,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
   protected final Consumer<String, Bytes> _consumer;
   protected final TopicPartition _topicPartition;
   protected final Properties _consumerProp;
+  protected final AdminClient _adminClient;
 
   public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition) {
     _config = new KafkaPartitionLevelStreamConfig(streamConfig);
@@ -66,6 +67,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
     _consumer = createConsumer(_consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
+    _adminClient = createAdminClient();
   }
 
   private Properties buildProperties(StreamConfig streamConfig) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 65c803804b..c3e9aadbbc 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -100,7 +100,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    try (AdminClient adminClient = createAdminClient()) {
+    try {
       // Build the offset spec request for this partition
       Map<TopicPartition, OffsetSpec> request = new HashMap<>();
       if (offsetCriteria.isLargest()) {
@@ -117,13 +117,13 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
         throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
       }
       // Query via AdminClient (thread-safe)
-      ListOffsetsResult result = adminClient.listOffsets(request);
+      ListOffsetsResult result = _adminClient.listOffsets(request);
       Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
           result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
       if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || 
offsetCriteria.isPeriod())) {
         // fetch endOffsets as fallback
         request.put(_topicPartition, OffsetSpec.latest());
-        result = adminClient.listOffsets(request);
+        result = _adminClient.listOffsets(request);
         offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
         LOGGER.warn(
             "initial offset type is {} and its value evaluates to null hence 
proceeding with offset {} " + "for "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to