This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch revert-15641-remove_noisy_logs
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit a2a565d6220dcb410461912e74ae614aa1c677ac
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Wed Jun 18 09:45:31 2025 +0530

    Revert "Use Kafka Admin client to Fetch offsets instead of Kafka Consumer 
(#1…"
    
    This reverts commit 265ddd468c5964e1df3c0894a559ccba2b083211.
---
 .../kafka20/KafkaStreamMetadataProvider.java       | 73 ++++++++++------------
 .../kafka30/KafkaStreamMetadataProvider.java       | 73 ++++++++++------------
 2 files changed, 68 insertions(+), 78 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index a584a0ee597..a04cca66d2a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -29,15 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
@@ -100,51 +97,49 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    try (AdminClient adminClient = createAdminClient()) {
-      // Build the offset spec request for this partition
-      Map<TopicPartition, OffsetSpec> request = new HashMap<>();
+    long offset;
+    try {
       if (offsetCriteria.isLargest()) {
-        request.put(_topicPartition, OffsetSpec.latest());
+        offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+            .get(_topicPartition);
       } else if (offsetCriteria.isSmallest()) {
-        request.put(_topicPartition, OffsetSpec.earliest());
+        offset =
+            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+                .get(_topicPartition);
       } else if (offsetCriteria.isPeriod()) {
-        long ts = Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
-        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
+        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+                Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
+            .get(_topicPartition);
+        if (offsetAndTimestamp == null) {
+          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+              .get(_topicPartition);
+          LOGGER.warn(
+              "initial offset type is period and its value evaluates to null 
hence proceeding with offset {} for "
+                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
+        } else {
+          offset = offsetAndTimestamp.offset();
+        }
       } else if (offsetCriteria.isTimestamp()) {
-        long ts = 
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
-        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
+        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+            
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
+        if (offsetAndTimestamp == null) {
+          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+              .get(_topicPartition);
+          LOGGER.warn(
+              "initial offset type is timestamp and its value evaluates to 
null hence proceeding with offset {} for "
+                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
+        } else {
+          offset = offsetAndTimestamp.offset();
+        }
       } else {
-        throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
-      }
-      // Query via AdminClient (thread-safe)
-      ListOffsetsResult result = adminClient.listOffsets(request);
-      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
-          result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
-      if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || 
offsetCriteria.isPeriod())) {
-        // fetch endOffsets as fallback
-        request.put(_topicPartition, OffsetSpec.latest());
-        result = adminClient.listOffsets(request);
-        offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
-        LOGGER.warn(
-            "initial offset type is {} and its value evaluates to null hence 
proceeding with offset {} " + "for "
-                + "topic {} partition {}", offsetCriteria, 
offsets.get(_topicPartition).offset(),
-            _topicPartition.topic(), _topicPartition.partition());
-      }
-      ListOffsetsResult.ListOffsetsResultInfo info = 
offsets.get(_topicPartition);
-      if (info == null) {
-        throw new TransientConsumerException(new RuntimeException(
-            String.format("Failed to fetch offset for topic: %s partition: 
%d", _topic, _topicPartition.partition())));
+        throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
       }
-      return new LongMsgOffset(info.offset());
-    } catch (InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
+      return new LongMsgOffset(offset);
+    } catch (TimeoutException e) {
       throw new TransientConsumerException(e);
     }
   }
 
-  private boolean isValidOffsetInfo(Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> offsets) {
-    return offsets != null && offsets.containsKey(_topicPartition) && 
offsets.get(_topicPartition).offset() >= 0;
-  }
-
   @Override
   public Map<String, PartitionLagState> getCurrentPartitionLagState(
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 65c803804b5..96775641ca3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -29,15 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
@@ -100,51 +97,49 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    try (AdminClient adminClient = createAdminClient()) {
-      // Build the offset spec request for this partition
-      Map<TopicPartition, OffsetSpec> request = new HashMap<>();
+    long offset;
+    try {
       if (offsetCriteria.isLargest()) {
-        request.put(_topicPartition, OffsetSpec.latest());
+        offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+            .get(_topicPartition);
       } else if (offsetCriteria.isSmallest()) {
-        request.put(_topicPartition, OffsetSpec.earliest());
+        offset =
+            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+                .get(_topicPartition);
       } else if (offsetCriteria.isPeriod()) {
-        long ts = Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
-        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
+        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+                Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
+            .get(_topicPartition);
+        if (offsetAndTimestamp == null) {
+          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+              .get(_topicPartition);
+          LOGGER.warn(
+              "initial offset type is period and its value evaluates to null 
hence proceeding with offset {} for "
+                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
+        } else {
+          offset = offsetAndTimestamp.offset();
+        }
       } else if (offsetCriteria.isTimestamp()) {
-        long ts = 
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
-        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
+        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
+            
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
+        if (offsetAndTimestamp == null) {
+          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
+              .get(_topicPartition);
+          LOGGER.warn(
+              "initial offset type is timestamp and its value evaluates to 
null hence proceeding with offset {} for "
+                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
+        } else {
+          offset = offsetAndTimestamp.offset();
+        }
       } else {
-        throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
-      }
-      // Query via AdminClient (thread-safe)
-      ListOffsetsResult result = adminClient.listOffsets(request);
-      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
-          result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
-      if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || 
offsetCriteria.isPeriod())) {
-        // fetch endOffsets as fallback
-        request.put(_topicPartition, OffsetSpec.latest());
-        result = adminClient.listOffsets(request);
-        offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
-        LOGGER.warn(
-            "initial offset type is {} and its value evaluates to null hence 
proceeding with offset {} " + "for "
-                + "topic {} partition {}", offsetCriteria, 
offsets.get(_topicPartition).offset(),
-            _topicPartition.topic(), _topicPartition.partition());
-      }
-      ListOffsetsResult.ListOffsetsResultInfo info = 
offsets.get(_topicPartition);
-      if (info == null) {
-        throw new TransientConsumerException(new RuntimeException(
-            String.format("Failed to fetch offset for topic: %s partition: 
%d", _topic, _topicPartition.partition())));
+        throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
       }
-      return new LongMsgOffset(info.offset());
-    } catch (InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
+      return new LongMsgOffset(offset);
+    } catch (TimeoutException e) {
       throw new TransientConsumerException(e);
     }
   }
 
-  private boolean isValidOffsetInfo(Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> offsets) {
-    return offsets != null && offsets.containsKey(_topicPartition) && 
offsets.get(_topicPartition).offset() >= 0;
-  }
-
   @Override
   public Map<String, PartitionLagState> getCurrentPartitionLagState(
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to