This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch revert-15641-remove_noisy_logs in repository https://gitbox.apache.org/repos/asf/pinot.git
commit a2a565d6220dcb410461912e74ae614aa1c677ac Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed Jun 18 09:45:31 2025 +0530 Revert "Use Kafka Admin client to Fetch offsets instead of Kafka Consumer (#1…" This reverts commit 265ddd468c5964e1df3c0894a559ccba2b083211. --- .../kafka20/KafkaStreamMetadataProvider.java | 73 ++++++++++------------ .../kafka30/KafkaStreamMetadataProvider.java | 73 ++++++++++------------ 2 files changed, 68 insertions(+), 78 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index a584a0ee597..a04cca66d2a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -29,15 +29,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag; import org.apache.pinot.spi.stream.ConsumerPartitionState; @@ -100,51 +97,49 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); - try (AdminClient adminClient = createAdminClient()) { - // Build the offset spec request for this partition - Map<TopicPartition, OffsetSpec> request = new HashMap<>(); + long offset; + try { if (offsetCriteria.isLargest()) { - request.put(_topicPartition, OffsetSpec.latest()); + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); } else if (offsetCriteria.isSmallest()) { - request.put(_topicPartition, OffsetSpec.earliest()); + offset = + _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); } else if (offsetCriteria.isPeriod()) { - long ts = Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()); - request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); + OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, + Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()))) + .get(_topicPartition); + if (offsetAndTimestamp == null) { + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + LOGGER.warn( + "initial offset type is period and its value evaluates to null hence proceeding with offset {} for " + + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); + } else { + offset = offsetAndTimestamp.offset(); + } } else if (offsetCriteria.isTimestamp()) { - long ts = TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()); - request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); + OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, + TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition); + if (offsetAndTimestamp == null) { + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + LOGGER.warn( + "initial offset type is timestamp and its value evaluates to null hence proceeding with offset {} for " + + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); + } else { + offset = offsetAndTimestamp.offset(); + } } else { - throw new IllegalArgumentException("Unknown offset criteria: " + offsetCriteria); - } - // Query via AdminClient (thread-safe) - ListOffsetsResult result = adminClient.listOffsets(request); - Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets = - result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); - if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || offsetCriteria.isPeriod())) { - // fetch endOffsets as fallback - request.put(_topicPartition, OffsetSpec.latest()); - result = adminClient.listOffsets(request); - offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); - LOGGER.warn( - "initial offset type is {} and its value evaluates to null hence proceeding with offset {} " + "for " - + "topic {} partition {}", offsetCriteria, offsets.get(_topicPartition).offset(), - _topicPartition.topic(), _topicPartition.partition()); - } - ListOffsetsResult.ListOffsetsResultInfo info = offsets.get(_topicPartition); - if (info == null) { - throw new TransientConsumerException(new RuntimeException( - String.format("Failed to fetch offset for topic: %s partition: %d", _topic, _topicPartition.partition()))); + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } - return new LongMsgOffset(info.offset()); - } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + return new LongMsgOffset(offset); + } catch (TimeoutException e) { throw new TransientConsumerException(e); } } - private boolean isValidOffsetInfo(Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets) { - return offsets != null && offsets.containsKey(_topicPartition) && offsets.get(_topicPartition).offset() >= 0; - } - @Override public Map<String, PartitionLagState> getCurrentPartitionLagState( Map<String, ConsumerPartitionState> currentPartitionStateMap) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index 65c803804b5..96775641ca3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -29,15 +29,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag; import org.apache.pinot.spi.stream.ConsumerPartitionState; @@ -100,51 +97,49 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); - try (AdminClient adminClient = createAdminClient()) { - // Build the offset spec request for this partition - Map<TopicPartition, OffsetSpec> request = new HashMap<>(); + long offset; + try { if (offsetCriteria.isLargest()) { - request.put(_topicPartition, OffsetSpec.latest()); + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); } else if (offsetCriteria.isSmallest()) { - request.put(_topicPartition, OffsetSpec.earliest()); + offset = + _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); } else if (offsetCriteria.isPeriod()) { - long ts = Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()); - request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); + OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, + Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()))) + .get(_topicPartition); + if (offsetAndTimestamp == null) { + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + LOGGER.warn( + "initial offset type is period and its value evaluates to null hence proceeding with offset {} for " + + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); + } else { + offset = offsetAndTimestamp.offset(); + } } else if (offsetCriteria.isTimestamp()) { - long ts = TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()); - request.put(_topicPartition, OffsetSpec.forTimestamp(ts)); + OffsetAndTimestamp offsetAndTimestamp = _consumer.offsetsForTimes(Collections.singletonMap(_topicPartition, + TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition); + if (offsetAndTimestamp == null) { + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + LOGGER.warn( + "initial offset type is timestamp and its value evaluates to null hence proceeding with offset {} for " + + "topic {} partition {}", offset, _topicPartition.topic(), _topicPartition.partition()); + } else { + offset = offsetAndTimestamp.offset(); + } } else { - throw new IllegalArgumentException("Unknown offset criteria: " + offsetCriteria); - } - // Query via AdminClient (thread-safe) - ListOffsetsResult result = adminClient.listOffsets(request); - Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets = - result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); - if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || offsetCriteria.isPeriod())) { - // fetch endOffsets as fallback - request.put(_topicPartition, OffsetSpec.latest()); - result = adminClient.listOffsets(request); - offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); - LOGGER.warn( - "initial offset type is {} and its value evaluates to null hence proceeding with offset {} " + "for " - + "topic {} partition {}", offsetCriteria, offsets.get(_topicPartition).offset(), - _topicPartition.topic(), _topicPartition.partition()); - } - ListOffsetsResult.ListOffsetsResultInfo info = offsets.get(_topicPartition); - if (info == null) { - throw new TransientConsumerException(new RuntimeException( - String.format("Failed to fetch offset for topic: %s partition: %d", _topic, _topicPartition.partition()))); + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } - return new LongMsgOffset(info.offset()); - } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + return new LongMsgOffset(offset); + } catch (TimeoutException e) { throw new TransientConsumerException(e); } } - private boolean isValidOffsetInfo(Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets) { - return offsets != null && offsets.containsKey(_topicPartition) && offsets.get(_topicPartition).offset() >= 0; - } - @Override public Map<String, PartitionLagState> getCurrentPartitionLagState( Map<String, ConsumerPartitionState> currentPartitionStateMap) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org