Jackie-Jiang commented on code in PR #9994: URL: https://github.com/apache/pinot/pull/9994#discussion_r1062020489
########## pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java: ########## @@ -45,8 +45,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge { // Dedup metrics DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false), CONSUMPTION_QUOTA_UTILIZATION("ratio", false), - JVM_HEAP_USED_BYTES("bytes", true); - + JVM_HEAP_USED_BYTES("bytes", true), + // Lag metrics + TABLE_MAX_INGESTION_DELAY_MS("milliseconds", false), + TABLE_PER_PARTITION_INGESTION_DELAY_MS("milliseconds", false); Review Comment: I think we can use the same gauge for both table level and partition level. For partition level, we will suffix it with `.partitionId`. ```suggestion MAX_INGESTION_DELAY_MS("milliseconds", false); ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1554,6 +1557,25 @@ private void createPartitionMetadataProvider(String reason) { _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); } + /* + * Updates the ingestion delay if messages were processed using the time stamp for the last consumed event. + * + * @param indexedMessagesCount + */ + private void updateIngestionDelay(int indexedMessageCount) { + if (_catchingUpPhase) { Review Comment: If this is `false` in the beginning, we also need to update the ingestion delay when it becomes `true`, or the lag will already be propagated. ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java: ########## @@ -77,6 +77,9 @@ public int getNumConsumingSegmentsNotReachedIngestionCriteria() { LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { _caughtUpSegments.add(segName); + rtSegmentDataManager.notifyConsumptionCaughtUp(false); Review Comment: We want to skip reporting delay during server startup because: 1. Server will need to catch up, which will very likely trigger the alert 2. Server is not really serving the query, so it is actually false alarm We do want to report delay when a new consuming segment is created because server is serving queries from it. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -69,4 +69,13 @@ protected static PinotDataBufferMemoryManager getMemoryManager(String consumerDi public abstract Map<String, PartitionLagState> getPartitionToLagState( Map<String, ConsumerPartitionState> consumerPartitionStateMap); + + /** + * The RT segment data manager can handle status change from external components like the ConsumptionStatusChecker + * etc. Currently, it acts as a way to signal the RT Segment data manager that the current partition has caught up. + * + * @param caughtUpWithUpstream Boolean indicating if the partition has caught up with upstream source or not based on + * the strategy used in the {@literal IngestionBasedConsumptionStatusChecker} + */ + public abstract void notifyConsumptionCaughtUp(boolean caughtUpWithUpstream); Review Comment: This is quite confusing. Per the implementation, seems like passing `false` means the segment is caught up? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1594,4 +1616,9 @@ public String getSegmentName() { public void forceCommit() { _forceCommitMessageReceived = true; } + + @Override + public void notifyConsumptionCaughtUp(boolean catchingUpPhase) { Review Comment: Another way to handle it is to assume it is caught up in the beginning, and let the status checker to set it as not caught up yet. The status checker only monitor the initial consuming segments during restart. New consuming segments won't be tracked, thus will be in caught up phase automatically. I think this PR is already taking this approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org