noob-se7en commented on code in PR #17089:
URL: https://github.com/apache/pinot/pull/17089#discussion_r2468946398
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java:
##########
@@ -73,29 +78,40 @@ protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManag
// message is too old to pass the freshness check. We check this condition
separately to avoid hitting
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset =
rtSegmentDataManager.getCurrentOffset();
- StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.fetchLatestStreamOffset(5000);
+
+ StreamMetadataProvider streamMetadataProvider =
+
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+ StreamPartitionMsgOffset latestStreamOffset;
+ try {
+ int partitionId = rtSegmentDataManager.getPartitionGroupId();
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+
streamMetadataProvider.fetchLatestStreamOffset(Collections.singleton(partitionId),
+ STREAM_METADATA_FETCH_TIMEOUT_MS);
+ latestStreamOffset = partitionMsgOffsetMap.get(partitionId);
+ } catch (Exception e) {
+ _logger.error("Failed to fetch latest stream partition offset for
segment: {}", segmentName, e);
+ throw new RuntimeException(e);
+ }
+
if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
_logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
+ "But the current ingested offset is equal to the latest
available offset {}.", segmentName, freshnessMs,
_minFreshnessMs, currentOffset);
return true;
}
- StreamPartitionMsgOffset earliestStreamOffset =
rtSegmentDataManager.fetchEarliestStreamOffset(5000);
Review Comment:
I am removing this because earliestStreamOffset is only being used in logs.
This is extra IO call not much useful, hence removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]