ege-st commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1450846307
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ########## @@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i super(clientId, streamConfig, partition); } + @Override + public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) throws PermanentConsumerException { + final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); + Map<TopicPartition, Long> beginningOffsets = + _consumer.beginningOffsets(Collections.singletonList(_topicPartition)); + + final long beginningOffset = beginningOffsets.getOrDefault(_topicPartition, 0L); + if (startOffset < beginningOffset) { Review Comment: Coming back to this discussion, I realized my confusion: in this case Beginning Offset is the earliest offset in the Kafka topic and Start Offset is where the new Consuming Segment will start reading from. I had been thinking in terms of the last offset in the immediately preceding segment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org