sajjad-moradi commented on code in PR #13668:
URL: https://github.com/apache/pinot/pull/13668#discussion_r1710442034


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -214,7 +230,7 @@ private long getPartitionOffsetLag(IngestionOffsets offset) 
{
    *
    * @param partitionGroupId partition ID which we should stop tracking.
    */
-  private void removePartitionId(int partitionGroupId) {
+  private synchronized void removePartitionId(int partitionGroupId) {

Review Comment:
   Declaring these methods as synchronized definitely addresses the edge case 
that you described. Keep in mind that this scenario is very rare. Consuming 
segment moving to another servers is itself rare. For the described scenario to 
happen, consuming segment needs to move to another server as well as the 
execution of threads 1) handling consumption and 2) handling metric removal 
helix message should align.
   
   Using synchronized for threads handling consumption adds overhead and delays 
consumption a bit especially if there are many partitions on the same server. I 
suggest we only block detecting the edge case and not block for the regular 
happy path of consumption / updating metrics.
   
   We can do that by using an object level lock, and acquire it inside 
`stopTrackingPartitionIngestionDelay` method and also when the partition is 
added to the gauge metric inside `updateIngestionDelay` method:
   ```java
     public void stopTrackingPartitionIngestionDelay(String segmentName) {
       _lock.lock();
       _segmentsToIgnore.put(segmentName, true);
       removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId());
       _lock.unlock();
     }
     
     void updateIngestionDelay(long ingestionTimeMs, long 
firstStreamIngestionTimeMs, int partitionGroupId) {
       if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {
         // If stream does not return a valid ingestion timestamps don't 
publish a metric
         return;
       }
       IngestionTimestamps previousMeasure = 
_partitionToIngestionTimestampsMap.put(partitionGroupId,
           new IngestionTimestamps(ingestionTimeMs, 
firstStreamIngestionTimeMs));
       if (previousMeasure == null) {
         _lock.lock();  //<<---- HERE
         // First time we start tracking a partition we should start tracking 
it via metric
         // Only publish the metric if supported by the underlying stream. If 
not supported the stream
         // returns Long.MIN_VALUE
         if (ingestionTimeMs >= 0) {
           if (_segmentsToIgnore does not contain segment for partitionId) { 
//<<---- HERE
             _serverMetrics.setOrUpdatePartitionGauge(_metricName, 
partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
                 () -> getPartitionIngestionDelayMs(partitionGroupId));
           }
         }
         if (firstStreamIngestionTimeMs >= 0) {
           if (_segmentsToIgnore does not contain segment for partitionId) { 
//<<---- HERE
             // Only publish this metric when creation time is supported by the 
underlying stream
             // When this timestamp is not supported it always returns the 
value Long.MIN_VALUE
             _serverMetrics.setOrUpdatePartitionGauge(_metricName, 
partitionGroupId,
                 ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
                 () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId));
           }
         }
         _lock.unlock(); //<<---- HERE
       }
     }
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to