This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 265ddd468c Use Kafka Admin client to Fetch offsets instead of Kafka Consumer (#15641) 265ddd468c is described below commit 265ddd468c5964e1df3c0894a559ccba2b083211 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed Jun 4 13:18:53 2025 +0530 Use Kafka Admin client to Fetch offsets instead of Kafka Consumer (#15641) * Use Kafka admin client to retrieve offsets instead of Kafka consumer * Set partition metadata provider as null on close * Fix tests * Address review comments * throw better exception in case offsets are null * Remove handling of illegal state exception * Remove setting _partitionMetadataProvider to null * Create admin client on request * Create admin client on request --------- Co-authored-by: KKCorps <kar...@startee.ai> --- .../kafka20/KafkaStreamMetadataProvider.java | 73 ++++++++++++---------- .../kafka30/KafkaStreamMetadataProvider.java | 73 ++++++++++++---------- 2 files changed, 78 insertions(+), 68 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index a04cca66d2..a584a0ee59 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -29,12 +29,15 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag; import org.apache.pinot.spi.stream.ConsumerPartitionState; @@ -97,49 +100,51 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); - long offset; - try { + try (AdminClient adminClient = createAdminClient()) { + // Build the offset spec request for this partition + Map<TopicPartition, OffsetSpec> request = new HashMap<>(); if (offsetCriteria.isLargest()) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); + request.put(_topicPartition, OffsetSpec.latest()); } else if (offsetCriteria.isSmallest()) { - offset = - _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); + request.put(_topicPartition, OffsetSpec.earliest()); } else if (offsetCriteria.isPeriod()) { - OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, - Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()))) - .get(_topicPartition); - if (offsetAndTimestamp == null) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); - LOGGER.warn( - "initial offset type is period and its value evaluates to null hence proceeding with offset {} for " - + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); - } else { - offset = offsetAndTimestamp.offset(); - } + long ts = Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()); + request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); } else if (offsetCriteria.isTimestamp()) { - OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, - TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition); - if (offsetAndTimestamp == null) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); - LOGGER.warn( - "initial offset type is timestamp and its value evaluates to null hence proceeding with offset {} for " - + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); - } else { - offset = offsetAndTimestamp.offset(); - } + long ts = TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()); + request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); } else { - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + throw new IllegalArgumentException("Unknown offset criteria: " + offsetCriteria); } - return new LongMsgOffset(offset); - } catch (TimeoutException e) { + // Query via AdminClient (thread-safe) + ListOffsetsResult result = adminClient.listOffsets(request); + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets = + result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); + if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || offsetCriteria.isPeriod())) { + // fetch endOffsets as fallback + request.put(_topicPartition, OffsetSpec.latest()); + result = adminClient.listOffsets(request); + offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); + LOGGER.warn( + "initial offset type is {} and its value evaluates to null hence proceeding with offset {} " + "for " + + "topic {} partition {}", offsetCriteria, offsets.get(_topicPartition).offset(), + _topicPartition.topic(), _topicPartition.partition()); + } + ListOffsetsResult.ListOffsetsResultInfo info = offsets.get(_topicPartition); + if (info == null) { + throw new TransientConsumerException(new RuntimeException( + String.format("Failed to fetch offset for topic: %s partition: %d", _topic, _topicPartition.partition()))); + } + return new LongMsgOffset(info.offset()); + } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { throw new TransientConsumerException(e); } } + private boolean isValidOffsetInfo(Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets) { + return offsets != null && offsets.containsKey(_topicPartition) && offsets.get(_topicPartition).offset() >= 0; + } + @Override public Map<String, PartitionLagState> getCurrentPartitionLagState( Map<String, ConsumerPartitionState> currentPartitionStateMap) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index 96775641ca..65c803804b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -29,12 +29,15 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag; import org.apache.pinot.spi.stream.ConsumerPartitionState; @@ -97,49 +100,51 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); - long offset; - try { + try (AdminClient adminClient = createAdminClient()) { + // Build the offset spec request for this partition + Map<TopicPartition, OffsetSpec> request = new HashMap<>(); if (offsetCriteria.isLargest()) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); + request.put(_topicPartition, OffsetSpec.latest()); } else if (offsetCriteria.isSmallest()) { - offset = - _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); + request.put(_topicPartition, OffsetSpec.earliest()); } else if (offsetCriteria.isPeriod()) { - OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, - Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()))) - .get(_topicPartition); - if (offsetAndTimestamp == null) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); - LOGGER.warn( - "initial offset type is period and its value evaluates to null hence proceeding with offset {} for " - + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); - } else { - offset = offsetAndTimestamp.offset(); - } + long ts = Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()); + request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); } else if (offsetCriteria.isTimestamp()) { - OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, - TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition); - if (offsetAndTimestamp == null) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); - LOGGER.warn( - "initial offset type is timestamp and its value evaluates to null hence proceeding with offset {} for " - + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); - } else { - offset = offsetAndTimestamp.offset(); - } + long ts = TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()); + request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); } else { - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + throw new IllegalArgumentException("Unknown offset criteria: " + offsetCriteria); } - return new LongMsgOffset(offset); - } catch (TimeoutException e) { + // Query via AdminClient (thread-safe) + ListOffsetsResult result = adminClient.listOffsets(request); + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets = + result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); + if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || offsetCriteria.isPeriod())) { + // fetch endOffsets as fallback + request.put(_topicPartition, OffsetSpec.latest()); + result = adminClient.listOffsets(request); + offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); + LOGGER.warn( + "initial offset type is {} and its value evaluates to null hence proceeding with offset {} " + "for " + + "topic {} partition {}", offsetCriteria, offsets.get(_topicPartition).offset(), + _topicPartition.topic(), _topicPartition.partition()); + } + ListOffsetsResult.ListOffsetsResultInfo info = offsets.get(_topicPartition); + if (info == null) { + throw new TransientConsumerException(new RuntimeException( + String.format("Failed to fetch offset for topic: %s partition: %d", _topic, _topicPartition.partition()))); + } + return new LongMsgOffset(info.offset()); + } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { throw new TransientConsumerException(e); } } + private boolean isValidOffsetInfo(Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets) { + return offsets != null && offsets.containsKey(_topicPartition) && offsets.get(_topicPartition).offset() >= 0; + } + @Override public Map<String, PartitionLagState> getCurrentPartitionLagState( Map<String, ConsumerPartitionState> currentPartitionStateMap) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org