ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1450846307


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, 
StreamConfig streamConfig, i
     super(clientId, streamConfig, partition);
   }
 
+  @Override
+  public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) 
throws PermanentConsumerException {
+    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+    Map<TopicPartition, Long> beginningOffsets =
+            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition));
+
+    final long beginningOffset = 
beginningOffsets.getOrDefault(_topicPartition, 0L);
+    if (startOffset < beginningOffset) {

Review Comment:
   Coming back to this discussion, I realized my confusion: in this case 
Beginning Offset is the earliest offset in the Kafka topic and Start Offset is 
where the new Consuming Segment will start reading from.  
   
   I had been thinking in terms of the last offset in the immediately preceding 
segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to