npawar commented on code in PR #8321:
URL: https://github.com/apache/pinot/pull/8321#discussion_r905579881


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -55,7 +58,12 @@ public MessageBatch<byte[]> fetchMessages(long startOffset, 
long endOffset, int
       LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: 
{}ms", _topicPartition, startOffset,
           endOffset, timeoutMillis);
     }
-    _consumer.seek(_topicPartition, startOffset);
+    Map<TopicPartition, Long> beginningOffsets = 
_consumer.beginningOffsets(Lists.newArrayList(_topicPartition));
+    Long beginningOffset = beginningOffsets.values().iterator().next();
+    // explicitly check for OutOfRange, where startOffset < beginningOffset
+    // without this, _consumer.poll will auto offset reset to latest, 
resulting in data loss
+    _consumer.seek(_topicPartition, Math.max(startOffset, beginningOffset));

Review Comment:
   agreed on the one more call to kafka for all cases. Checking the fetched 
messages approach won't work well either. After the auto reset and before the 
seek, there could be a message appearing in the topic. Plus for a consumer 
which sees idle time, we'd still be making the extra calls.
   Can you look at this PR with the alternate approach: 
https://github.com/apache/pinot/pull/8309 ? This will be an opt-in approach 
that users who see the issue or anticipate it can turn on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to