This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 265ddd468c Use Kafka Admin client to Fetch offsets instead of Kafka 
Consumer (#15641)
265ddd468c is described below

commit 265ddd468c5964e1df3c0894a559ccba2b083211
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Wed Jun 4 13:18:53 2025 +0530

    Use Kafka Admin client to Fetch offsets instead of Kafka Consumer (#15641)
    
    * Use Kafka admin client to retrieve offsets instead of Kafka consumer
    
    * Set partition metadata provider as null on close
    
    * Fix tests
    
    * Address review comments
    
    * throw better exception in case offsets are null
    
    * Remove handling of illegal state exception
    
    * Remove setting _partitionMetadataProvider to null
    
    * Create admin client on request
    
    * Create admin client on request
    
    ---------
    
    Co-authored-by: KKCorps <kar...@startee.ai>
---
 .../kafka20/KafkaStreamMetadataProvider.java       | 73 ++++++++++++----------
 .../kafka30/KafkaStreamMetadataProvider.java       | 73 ++++++++++++----------
 2 files changed, 78 insertions(+), 68 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index a04cca66d2..a584a0ee59 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -29,12 +29,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
@@ -97,49 +100,51 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    long offset;
-    try {
+    try (AdminClient adminClient = createAdminClient()) {
+      // Build the offset spec request for this partition
+      Map<TopicPartition, OffsetSpec> request = new HashMap<>();
       if (offsetCriteria.isLargest()) {
-        offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-            .get(_topicPartition);
+        request.put(_topicPartition, OffsetSpec.latest());
       } else if (offsetCriteria.isSmallest()) {
-        offset =
-            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-                .get(_topicPartition);
+        request.put(_topicPartition, OffsetSpec.earliest());
       } else if (offsetCriteria.isPeriod()) {
-        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-                Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
-            .get(_topicPartition);
-        if (offsetAndTimestamp == null) {
-          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-              .get(_topicPartition);
-          LOGGER.warn(
-              "initial offset type is period and its value evaluates to null 
hence proceeding with offset {} for "
-                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
-        } else {
-          offset = offsetAndTimestamp.offset();
-        }
+        long ts = Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
       } else if (offsetCriteria.isTimestamp()) {
-        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-            
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
-        if (offsetAndTimestamp == null) {
-          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-              .get(_topicPartition);
-          LOGGER.warn(
-              "initial offset type is timestamp and its value evaluates to 
null hence proceeding with offset {} for "
-                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
-        } else {
-          offset = offsetAndTimestamp.offset();
-        }
+        long ts = 
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
       } else {
-        throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
+        throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
       }
-      return new LongMsgOffset(offset);
-    } catch (TimeoutException e) {
+      // Query via AdminClient (thread-safe)
+      ListOffsetsResult result = adminClient.listOffsets(request);
+      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+          result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+      if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || 
offsetCriteria.isPeriod())) {
+        // fetch endOffsets as fallback
+        request.put(_topicPartition, OffsetSpec.latest());
+        result = adminClient.listOffsets(request);
+        offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+        LOGGER.warn(
+            "initial offset type is {} and its value evaluates to null hence 
proceeding with offset {} " + "for "
+                + "topic {} partition {}", offsetCriteria, 
offsets.get(_topicPartition).offset(),
+            _topicPartition.topic(), _topicPartition.partition());
+      }
+      ListOffsetsResult.ListOffsetsResultInfo info = 
offsets.get(_topicPartition);
+      if (info == null) {
+        throw new TransientConsumerException(new RuntimeException(
+            String.format("Failed to fetch offset for topic: %s partition: 
%d", _topic, _topicPartition.partition())));
+      }
+      return new LongMsgOffset(info.offset());
+    } catch (InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
       throw new TransientConsumerException(e);
     }
   }
 
+  private boolean isValidOffsetInfo(Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> offsets) {
+    return offsets != null && offsets.containsKey(_topicPartition) && 
offsets.get(_topicPartition).offset() >= 0;
+  }
+
   @Override
   public Map<String, PartitionLagState> getCurrentPartitionLagState(
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 96775641ca..65c803804b 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -29,12 +29,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
@@ -97,49 +100,51 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    long offset;
-    try {
+    try (AdminClient adminClient = createAdminClient()) {
+      // Build the offset spec request for this partition
+      Map<TopicPartition, OffsetSpec> request = new HashMap<>();
       if (offsetCriteria.isLargest()) {
-        offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-            .get(_topicPartition);
+        request.put(_topicPartition, OffsetSpec.latest());
       } else if (offsetCriteria.isSmallest()) {
-        offset =
-            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-                .get(_topicPartition);
+        request.put(_topicPartition, OffsetSpec.earliest());
       } else if (offsetCriteria.isPeriod()) {
-        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-                Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
-            .get(_topicPartition);
-        if (offsetAndTimestamp == null) {
-          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-              .get(_topicPartition);
-          LOGGER.warn(
-              "initial offset type is period and its value evaluates to null 
hence proceeding with offset {} for "
-                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
-        } else {
-          offset = offsetAndTimestamp.offset();
-        }
+        long ts = Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
       } else if (offsetCriteria.isTimestamp()) {
-        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-            
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
-        if (offsetAndTimestamp == null) {
-          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-              .get(_topicPartition);
-          LOGGER.warn(
-              "initial offset type is timestamp and its value evaluates to 
null hence proceeding with offset {} for "
-                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
-        } else {
-          offset = offsetAndTimestamp.offset();
-        }
+        long ts = 
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
       } else {
-        throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
+        throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
       }
-      return new LongMsgOffset(offset);
-    } catch (TimeoutException e) {
+      // Query via AdminClient (thread-safe)
+      ListOffsetsResult result = adminClient.listOffsets(request);
+      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+          result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+      if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || 
offsetCriteria.isPeriod())) {
+        // fetch endOffsets as fallback
+        request.put(_topicPartition, OffsetSpec.latest());
+        result = adminClient.listOffsets(request);
+        offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+        LOGGER.warn(
+            "initial offset type is {} and its value evaluates to null hence 
proceeding with offset {} " + "for "
+                + "topic {} partition {}", offsetCriteria, 
offsets.get(_topicPartition).offset(),
+            _topicPartition.topic(), _topicPartition.partition());
+      }
+      ListOffsetsResult.ListOffsetsResultInfo info = 
offsets.get(_topicPartition);
+      if (info == null) {
+        throw new TransientConsumerException(new RuntimeException(
+            String.format("Failed to fetch offset for topic: %s partition: 
%d", _topic, _topicPartition.partition())));
+      }
+      return new LongMsgOffset(info.offset());
+    } catch (InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
       throw new TransientConsumerException(e);
     }
   }
 
+  private boolean isValidOffsetInfo(Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> offsets) {
+    return offsets != null && offsets.containsKey(_topicPartition) && 
offsets.get(_topicPartition).offset() >= 0;
+  }
+
   @Override
   public Map<String, PartitionLagState> getCurrentPartitionLagState(
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to