mcvsubbu commented on code in PR #9244: URL: https://github.com/apache/pinot/pull/9244#discussion_r955303504
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1429,15 +1429,17 @@ private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) { } } - private void fetchLatestStreamOffset() { - try (StreamMetadataProvider metadataProvider = _streamConsumerFactory - .createPartitionMetadataProvider(_clientId, _partitionGroupId)) { - _latestStreamOffsetAtStartupTime = - metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000); + public StreamPartitionMsgOffset fetchLatestStreamOffset() { + long maxWaitTimeMs = 5000; Review Comment: can we move this waitTime to be an argument `maxWaitTimeMillis`, since we seem to be calling it from different places and each may want to set their own wait time ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -386,6 +386,15 @@ public static class Server { "pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker"; public static final boolean DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false; + public static final String CONFIG_OF_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER = Review Comment: can we ensure that exactly one of the config is set (maybe in the starter), and throw exception if not? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org