sajjad-moradi commented on code in PR #13668: URL: https://github.com/apache/pinot/pull/13668#discussion_r1710442034
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -214,7 +230,7 @@ private long getPartitionOffsetLag(IngestionOffsets offset) { * * @param partitionGroupId partition ID which we should stop tracking. */ - private void removePartitionId(int partitionGroupId) { + private synchronized void removePartitionId(int partitionGroupId) { Review Comment: Declaring these methods as synchronized definitely addresses the edge case that you described. Keep in mind that this scenario is very rare. Consuming segment moving to another servers is itself rare. For the described scenario to happen, consuming segment needs to move to another server as well as the execution of threads 1) handling consumption and 2) handling metric removal helix message should align. Using synchronized for threads handling consumption adds overhead and delays consumption a bit especially if there are many partitions on the same server. I suggest we only block detecting the edge case and not block for the regular happy path of consumption / updating metrics. We can do that by using an object level lock, and acquire it inside `stopTrackingPartitionIngestionDelay` method and also when the partition is added to the gauge metric inside `updateIngestionDelay` method: ```java public void stopTrackingPartitionIngestionDelay(String segmentName) { _lock.lock(); _segmentsToIgnore.put(segmentName, true); removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId()); _lock.unlock(); } void updateIngestionDelay(long ingestionTimeMs, long firstStreamIngestionTimeMs, int partitionGroupId) { if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) { // If stream does not return a valid ingestion timestamps don't publish a metric return; } IngestionTimestamps previousMeasure = _partitionToIngestionTimestampsMap.put(partitionGroupId, new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs)); if (previousMeasure == null) { _lock.lock(); //<<---- HERE // First time we start tracking a partition we should start tracking it via metric // Only publish the metric if supported by the underlying stream. If not supported the stream // returns Long.MIN_VALUE if (ingestionTimeMs >= 0) { if (_segmentsToIgnore does not contain segment for partitionId) { //<<---- HERE _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionGroupId)); } } if (firstStreamIngestionTimeMs >= 0) { if (_segmentsToIgnore does not contain segment for partitionId) { //<<---- HERE // Only publish this metric when creation time is supported by the underlying stream // When this timestamp is not supported it always returns the value Long.MIN_VALUE _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId)); } } _lock.unlock(); //<<---- HERE } } ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org