npawar commented on code in PR #8321: URL: https://github.com/apache/pinot/pull/8321#discussion_r905579881
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ########## @@ -55,7 +58,12 @@ public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, endOffset, timeoutMillis); } - _consumer.seek(_topicPartition, startOffset); + Map<TopicPartition, Long> beginningOffsets = _consumer.beginningOffsets(Lists.newArrayList(_topicPartition)); + Long beginningOffset = beginningOffsets.values().iterator().next(); + // explicitly check for OutOfRange, where startOffset < beginningOffset + // without this, _consumer.poll will auto offset reset to latest, resulting in data loss + _consumer.seek(_topicPartition, Math.max(startOffset, beginningOffset)); Review Comment: agreed on the one more call to kafka for all cases. Checking the fetched messages approach won't work well either. After the auto reset and before the seek, there could be a message appearing in the topic. Plus for a consumer which sees idle time, we'd still be making the extra calls. Can you look at this PR with the alternate approach: https://github.com/apache/pinot/pull/8309 ? This will be an opt-in approach that users who see the issue or anticipate it can turn on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org