vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473759897


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState> 
getPartitionToLagState(
     return 
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the stream partition is in a valid state.
+   *
+   * The type of checks is dependent on the stream type. An example is if the 
startOffset has expired due to
+   * retention configuration of the stream which may lead to missed data.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("validateStartOffset");
+    }
+
+    try {
+      StreamPartitionMsgOffset streamSmallestOffset = 
_partitionMetadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+          /*maxWaitTimeMs=*/5000
+      );
+      if (streamSmallestOffset.compareTo(startOffset) > 0) {
+        _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 1L);
+        String message = "startOffset(" + startOffset
+            + ") is older than topic's beginning offset(" + 
streamSmallestOffset + ")";
+        _segmentLogger.error(message);
+        _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+            new SegmentErrorInfo(String.valueOf(now()), message, "")

Review Comment:
   Made this change locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to