This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 56ccbc3d5f Revert "Make ingestion offset delay metric configurable 
(#14074)" (#14127)
56ccbc3d5f is described below

commit 56ccbc3d5f04f0d62041c32d314b814c1b7cab4f
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Tue Oct 1 16:39:47 2024 +0530

    Revert "Make ingestion offset delay metric configurable (#14074)" (#14127)
    
    This reverts commit bba61eef14a49e7ed7a5c4e73c640c12b916a5d6.
    
    Co-authored-by: Kartik Khare 
<kharekar...@kartiks-macbook-pro.wyvern-sun.ts.net>
---
 .../manager/realtime/IngestionDelayTracker.java    | 153 ++++-----------------
 .../realtime/RealtimeSegmentDataManager.java       |   5 +-
 .../manager/realtime/RealtimeTableDataManager.java |   4 +-
 .../realtime/IngestionDelayTrackerTest.java        |  13 +-
 4 files changed, 39 insertions(+), 136 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 658b54c1b3..fd31d8f72b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.time.Clock;
@@ -38,14 +37,15 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * A Class to track realtime ingestion delay for table partitions on a given 
server.
  * Highlights:
@@ -83,36 +83,22 @@ import org.slf4j.LoggerFactory;
  *
  * TODO: handle bug situations like the one where a partition is not allocated 
to a given server due to a bug.
  */
+
 public class IngestionDelayTracker {
 
   private static class IngestionInfo {
-    volatile Long _ingestionTimeMs;
-    volatile Long _firstStreamIngestionTimeMs;
-    volatile StreamPartitionMsgOffset _currentOffset;
-    volatile StreamPartitionMsgOffset _latestOffset;
-    final Supplier<StreamPartitionMsgOffset> _latestOffsetFetcher;
-
-    IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long 
firstStreamIngestionTimeMs,
-        @Nullable StreamPartitionMsgOffset currentOffset,
-        @Nullable Supplier<StreamPartitionMsgOffset> latestOffsetFetcher) {
+    final long _ingestionTimeMs;
+    final long _firstStreamIngestionTimeMs;
+    final StreamPartitionMsgOffset _currentOffset;
+    final StreamPartitionMsgOffset _latestOffset;
+
+    IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
+        @Nullable StreamPartitionMsgOffset currentOffset, @Nullable 
StreamPartitionMsgOffset latestOffset) {
       _ingestionTimeMs = ingestionTimeMs;
       _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
       _currentOffset = currentOffset;
-      _latestOffsetFetcher = latestOffsetFetcher;
-    }
-
-    void updateCurrentOffset(StreamPartitionMsgOffset currentOffset) {
-      _currentOffset = currentOffset;
-    }
-
-    void updateLatestOffset(StreamPartitionMsgOffset latestOffset) {
       _latestOffset = latestOffset;
     }
-
-    void updateIngestionTimes(long ingestionTimeMs, long 
firstStreamIngestionTimeMs) {
-      _ingestionTimeMs = ingestionTimeMs;
-      _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
-    }
   }
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IngestionDelayTracker.class);
@@ -126,13 +112,6 @@ public class IngestionDelayTracker {
 
   // Cache expire time for ignored segment if there is no update from the 
segment.
   private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
-  public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = 
"offset.lag.tracking.enable";
-  public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = 
"offset.lag.tracking.update.interval";
-
-  // Since offset lag metric does a call to Kafka, we want to make sure we 
don't do it too frequently.
-  public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true;
-  public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 
minute
-  public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L;
 
   // Per partition info for all partitions active for the current table.
   private final Map<Integer, IngestionInfo> _ingestionInfoMap = new 
ConcurrentHashMap<>();
@@ -140,11 +119,14 @@ public class IngestionDelayTracker {
   // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
   // go back to CONSUMING in some period of time, we verify whether they are 
still hosted in this server by reading
   // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  // TODO: Consider removing this mechanism after releasing 1.2.0, and use 
{@link #stopTrackingPartitionIngestionDelay}
+  //       instead.
   private final Map<Integer, Long> _partitionsMarkedForVerification = new 
ConcurrentHashMap<>();
 
   private final Cache<String, Boolean> _segmentsToIgnore =
       
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, 
TimeUnit.MINUTES).build();
 
+  // TODO: Make thread pool a server/cluster level config
   // ScheduledExecutorService to check partitions that are inactive against 
ideal state.
   private final ScheduledExecutorService _scheduledExecutor = 
Executors.newScheduledThreadPool(2);
 
@@ -157,10 +139,6 @@ public class IngestionDelayTracker {
 
   private Clock _clock;
 
-  // Configuration parameters
-  private final boolean _enableOffsetLagMetric;
-  private final long _offsetLagUpdateIntervalMs;
-
   @VisibleForTesting
   public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
       RealtimeTableDataManager realtimeTableDataManager, int 
scheduledExecutorThreadTickIntervalMs,
@@ -172,23 +150,6 @@ public class IngestionDelayTracker {
     _realTimeTableDataManager = realtimeTableDataManager;
     _clock = Clock.systemUTC();
     _isServerReadyToServeQueries = isServerReadyToServeQueries;
-
-    if (realtimeTableDataManager.getInstanceDataManagerConfig() != null
-        && realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() 
!= null) {
-      PinotConfiguration pinotConfiguration = 
realtimeTableDataManager.getInstanceDataManagerConfig().getConfig();
-      _enableOffsetLagMetric =
-          
pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, 
DEFAULT_ENABLE_OFFSET_LAG_METRIC);
-      _offsetLagUpdateIntervalMs = 
pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY,
-          DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS);
-
-      Preconditions.checkArgument(_offsetLagUpdateIntervalMs > 
MIN_OFFSET_LAG_UPDATE_INTERVAL,
-          String.format("Value of Offset lag update interval config: %s must 
be greater than %d",
-              OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, 
MIN_OFFSET_LAG_UPDATE_INTERVAL));
-    } else {
-      _enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC;
-      _offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS;
-    }
-
     // Handle negative timer values
     if (scheduledExecutorThreadTickIntervalMs <= 0) {
       throw new RuntimeException(String.format("Illegal timer timeout 
argument, expected > 0, got=%d for table=%s",
@@ -210,11 +171,6 @@ public class IngestionDelayTracker {
 
     _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
         INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, 
scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
-
-    if (_enableOffsetLagMetric) {
-      _scheduledExecutor.scheduleWithFixedDelay(this::updateLatestOffsets,
-          0, _offsetLagUpdateIntervalMs, TimeUnit.MILLISECONDS);
-    }
   }
 
   public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
@@ -297,18 +253,18 @@ public class IngestionDelayTracker {
    * @param firstStreamIngestionTimeMs ingestion time of the last consumed 
message in the first stream (from
    *                                   {@link RowMetadata})
    * @param currentOffset offset of the last consumed message (from {@link 
RowMetadata})
-   * @param latestOffsetFetcher a lambda function to fetch the latest offset
+   * @param latestOffset offset of the latest message in the partition (from 
{@link StreamMetadataProvider})
    */
   public void updateIngestionMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
       long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset,
-      Supplier<StreamPartitionMsgOffset> latestOffsetFetcher) {
+      @Nullable StreamPartitionMsgOffset latestOffset) {
     if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
       // Do not update the ingestion delay metrics during server startup period
       // or once the table data manager has been shutdown.
       return;
     }
 
-    if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && currentOffset 
== null) {
+    if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && 
(currentOffset == null || latestOffset == null)) {
       // Do not publish metrics if stream does not return valid ingestion time 
or offset.
       return;
     }
@@ -329,24 +285,12 @@ public class IngestionDelayTracker {
               ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
               () -> getPartitionEndToEndIngestionDelayMs(partitionId));
         }
-        if (_enableOffsetLagMetric) {
+        if (currentOffset != null && latestOffset != null) {
           _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
               () -> getPartitionIngestionOffsetLag(partitionId));
         }
-        IngestionInfo ingestionInfo =
-            new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, 
currentOffset, latestOffsetFetcher);
-
-        if (latestOffsetFetcher != null) {
-          StreamPartitionMsgOffset latestOffset = latestOffsetFetcher.get();
-          ingestionInfo.updateLatestOffset(latestOffset);
-        }
-
-        return ingestionInfo;
-      } else {
-        v.updateIngestionTimes(ingestionTimeMs, firstStreamIngestionTimeMs);
-        v.updateCurrentOffset(currentOffset);
-        return v;
       }
+      return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, 
currentOffset, latestOffset);
     });
 
     // If we are consuming we do not need to track this partition for removal.
@@ -407,28 +351,6 @@ public class IngestionDelayTracker {
     }
   }
 
-  /**
-   * Updates the latest offsets for each partition at a configurable frequency 
to reduce load.
-   */
-  private void updateLatestOffsets() {
-    if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
-      return;
-    }
-    for (Map.Entry<Integer, IngestionInfo> entry : 
_ingestionInfoMap.entrySet()) {
-      int partitionId = entry.getKey();
-      IngestionInfo ingestionInfo = entry.getValue();
-      Supplier<StreamPartitionMsgOffset> latestOffsetFetcher = 
ingestionInfo._latestOffsetFetcher;
-      if (latestOffsetFetcher != null) {
-        try {
-          StreamPartitionMsgOffset latestOffset = latestOffsetFetcher.get();
-          ingestionInfo.updateLatestOffset(latestOffset);
-        } catch (Exception e) {
-          LOGGER.debug("Failed to fetch latest offset for partition {}", 
partitionId, e);
-        }
-      }
-    }
-  }
-
   /**
    * This function is invoked when a segment goes from CONSUMING to ONLINE, so 
we can assert whether the partition of
    * the segment is still hosted by this server after some interval of time.
@@ -478,35 +400,20 @@ public class IngestionDelayTracker {
   }
 
   public long getPartitionIngestionOffsetLag(int partitionId) {
-    try {
-      IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-      if (ingestionInfo == null) {
-        return 0;
-      }
-      StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
-      StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
-      if (currentOffset == null || latestOffset == null) {
-        return 0;
-      }
-      // TODO: Support other types of offsets
-      if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
-        return 0;
-      }
-      long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - 
((LongMsgOffset) currentOffset).getOffset();
-
-      if (offsetLag < 0) {
-        LOGGER.debug(
-            "Offset lag for partition {} is negative: currentOffset={}, 
latestOffset={}. This is most likely due to "
-                + "latestOffset not being updated",
-            partitionId, currentOffset, latestOffset);
-        return 0;
-      }
-
-      return offsetLag;
-    } catch (Exception e) {
-      LOGGER.warn("Failed to compute ingestion offset lag for partition {}", 
partitionId, e);
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    if (ingestionInfo == null) {
+      return 0;
+    }
+    StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
+    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+    if (currentOffset == null || latestOffset == null) {
+      return 0;
+    }
+    // TODO: Support other types of offsets
+    if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
       return 0;
     }
+    return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) 
currentOffset).getOffset();
   }
 
   /*
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index f95cf2918e..ecf5cb12cd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -1843,10 +1842,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private void updateIngestionMetrics(RowMetadata metadata) {
     if (metadata != null) {
       try {
-        Supplier<StreamPartitionMsgOffset> latestOffsetFetcher = () -> 
fetchLatestStreamOffset(5000, true);
+        StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, 
true);
         _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, 
_partitionGroupId,
             metadata.getRecordIngestionTimeMs(), 
metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
-            latestOffsetFetcher);
+            latestOffset);
       } catch (Exception e) {
         _segmentLogger.warn("Failed to fetch latest offset for updating 
ingestion delay", e);
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2051010acd..afc1e44529 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -141,7 +141,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   @Override
   protected void doInit() {
     _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, 
_serverMetrics, _tableNameWithType);
-
+    // Tracks ingestion delay of all partitions being served for this table
     _ingestionDelayTracker =
         new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, 
_isServerReadyToServeQueries);
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
@@ -288,7 +288,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    */
   public void updateIngestionMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
       long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset,
-      @Nullable Supplier<StreamPartitionMsgOffset> latestOffset) {
+      @Nullable StreamPartitionMsgOffset latestOffset) {
     _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, firstStreamIngestionTimeMs,
         currentOffset, latestOffset);
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 85109fe258..9cb527b121 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -22,7 +22,6 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
-import java.util.function.Supplier;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -309,25 +308,23 @@ public class IngestionDelayTrackerTest {
 
     // Test tracking offset lag for a single partition
     StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
-    Supplier<StreamPartitionMsgOffset> latestOffsetFetcher = () -> new 
LongMsgOffset(200);
+    StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
     ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
-        latestOffsetFetcher);
+        latestOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 100);
 
     // Test tracking offset lag for another partition
     StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
     StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150);
-    latestOffsetFetcher = () -> new LongMsgOffset(150);
     ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
-        latestOffsetFetcher);
+        latestOffset1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
 100);
 
     // Update offset lag for partition0
     msgOffset0 = new LongMsgOffset(150);
-    latestOffsetFetcher = () -> new LongMsgOffset(200);
-
+    latestOffset0 = new LongMsgOffset(200);
     ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
-        latestOffsetFetcher);
+        latestOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 50);
 
     ingestionDelayTracker.shutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to