vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473759897
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState( return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap); } + /** + * Checks if the stream partition is in a valid state. + * + * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to + * retention configuration of the stream which may lead to missed data. + * + * @param startOffset The offset of the first message desired, inclusive + */ + private void validateStartOffset(StreamPartitionMsgOffset startOffset) { + if (_partitionMetadataProvider == null) { + createPartitionMetadataProvider("validateStartOffset"); + } + + try { + StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset( + OffsetCriteria.SMALLEST_OFFSET_CRITERIA, + /*maxWaitTimeMs=*/5000 + ); + if (streamSmallestOffset.compareTo(startOffset) > 0) { + _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); + String message = "startOffset(" + startOffset + + ") is older than topic's beginning offset(" + streamSmallestOffset + ")"; + _segmentLogger.error(message); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, + new SegmentErrorInfo(String.valueOf(now()), message, "") Review Comment: Made this change locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org