noob-se7en commented on code in PR #15831: URL: https://github.com/apache/pinot/pull/15831#discussion_r2100103527
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -112,6 +135,14 @@ private static class IngestionInfo { // Cache expire time for ignored segment if there is no update from the segment. private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10; + public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable"; + public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval"; + + // Since offset lag metric fetches metadata from upstream, we want to make sure we don't do it too frequently. + public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = false; + public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute + public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L; Review Comment: nit: ```suggestion public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); ``` We can do same for other time unit values as well. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -301,7 +301,7 @@ public void updateIngestionMetrics(String segmentName, int partitionId, long ing long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset, @Nullable StreamPartitionMsgOffset latestOffset) { _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs, - currentOffset, latestOffset); + currentOffset); Review Comment: I feel we should not remove this as of now and can have two delay metric at the same time to gain confidence. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -416,16 +502,74 @@ public long getPartitionIngestionOffsetLag(int partitionId) { if (ingestionInfo == null) { return 0; } - StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset; - StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset; + return ingestionInfo._offsetLag; + } + + private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long maxWaitTimeMs, + StreamMetadataProvider streamMetadataProvider) { + try { + return streamMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, maxWaitTimeMs); + } catch (Exception e) { + LOGGER.debug("Caught exception while fetching stream offset", e); + } + return null; + } + + /** + * Creates a new stream metadata provider + */ + private StreamMetadataProvider createPartitionMetadataProvider(String reason, String clientId, int partitionGroupId) { + LOGGER.info("Creating new partition metadata provider, reason: {}", reason); + return _streamConsumerFactory.createPartitionMetadataProvider(clientId, partitionGroupId); + } + + private void updateOffsetLagForAllPartitions() { + List<Map.Entry<Integer, IngestionInfo>> entries = new ArrayList<>(_ingestionInfoMap.entrySet()); Review Comment: I believe this tracker will only work if consumer thread adds entries inside _ingestionInfoMap. I don't think it will work when consumer dies before doing that. This has happened in the past that offline -> consuming transition failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org