This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bc27ad99ec Emit metrics if there's no consuming segment for a 
partition (#8877)
bc27ad99ec is described below

commit bc27ad99ecd68fd4ae8522055f8a6a84e27fa3c1
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Thu Jun 30 13:39:25 2022 -0700

    Emit metrics if there's no consuming segment for a partition (#8877)
---
 .../pinot/common/metrics/ControllerGauge.java      |  11 +-
 .../controller/helix/SegmentStatusChecker.java     |  29 +-
 .../realtime/MissingConsumingSegmentFinder.java    | 229 +++++++++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   3 -
 .../controller/helix/SegmentStatusCheckerTest.java |  24 +-
 .../MissingConsumingSegmentFinderTest.java         | 306 +++++++++++++++++++++
 6 files changed, 585 insertions(+), 17 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 2951cfc7fb..2cd8c79dd5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -101,7 +101,16 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   DROPPED_MINION_INSTANCES("droppedMinionInstances", true),
 
   // Number of online minion instances
-  ONLINE_MINION_INSTANCES("onlineMinionInstances", true);
+  ONLINE_MINION_INSTANCES("onlineMinionInstances", true),
+
+  // Number of partitions with missing consuming segments in ideal state
+  MISSING_CONSUMING_SEGMENT_TOTAL_COUNT("missingConsumingSegmentTotalCount", 
false),
+
+  // Number of new partitions with missing consuming segments in ideal state
+  
MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT("missingConsumingSegmentNewPartitionCount",
 false),
+
+  // Maximum duration of a missing consuming segment in ideal state (in 
minutes)
+  
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes",
 false);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index bf9f179925..6470fab0fa 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -27,9 +27,11 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
@@ -41,9 +43,12 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,8 +115,9 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
   @Override
   protected void processTable(String tableNameWithType, Context context) {
     try {
-      updateTableConfigMetrics(tableNameWithType);
-      updateSegmentMetrics(tableNameWithType, context);
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      updateTableConfigMetrics(tableNameWithType, tableConfig);
+      updateSegmentMetrics(tableNameWithType, tableConfig, context);
       updateTableSizeMetrics(tableNameWithType);
     } catch (Exception e) {
       LOGGER.error("Caught exception while updating segment status for table 
{}", tableNameWithType, e);
@@ -131,8 +137,7 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
    * Updates metrics related to the table config.
    * If table config not found, resets the metrics
    */
-  private void updateTableConfigMetrics(String tableNameWithType) {
-    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+  private void updateTableConfigMetrics(String tableNameWithType, TableConfig 
tableConfig) {
     if (tableConfig == null) {
       LOGGER.warn("Found null table config for table: {}. Resetting table 
config metrics.", tableNameWithType);
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.REPLICATION_FROM_CONFIG, 0);
@@ -156,8 +161,9 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
    * Runs a segment status pass over the given table.
    * TODO: revisit the logic and reduce the ZK access
    */
-  private void updateSegmentMetrics(String tableNameWithType, Context context) 
{
-    if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE) {
+  private void updateSegmentMetrics(String tableNameWithType, TableConfig 
tableConfig, Context context) {
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType == TableType.OFFLINE) {
       context._offlineTableCount++;
     } else {
       context._realTimeTableCount++;
@@ -197,8 +203,8 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     // Get the segments excluding the replaced segments which are specified in 
the segment lineage entries and cannot
     // be queried from the table.
     Set<String> segmentsExcludeReplaced = new 
HashSet<>(idealState.getPartitionSet());
-    SegmentLineage segmentLineage =
-        
SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
 tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+    SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
     
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced,
 segmentLineage);
     _controllerMetrics
         .setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
@@ -299,6 +305,13 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
       LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", 
tableNameWithType, nReplicasExternal,
           nReplicasIdealMax);
     }
+
+    if (tableType == TableType.REALTIME && tableConfig != null) {
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, 
_controllerMetrics, streamConfig)
+          .findAndEmitMetrics(idealState);
+    }
   }
 
   @Override
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
new file mode 100644
index 0000000000..e726b77e5f
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * For a given table, this class finds out if there is any partition group for 
which there's no consuming segment in
+ * ideal state. If so, it emits three metrics:
+ *   - Total number of partitions with missing consuming segments including
+ *   - Number of newly added partitions for which there's no consuming segment 
(there's no completed segment either)
+ *   - Maximum duration (in minutes) that a partition hasn't had a consuming 
segment
+ */
+public class MissingConsumingSegmentFinder {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MissingConsumingSegmentFinder.class);
+
+  private final String _realtimeTableName;
+  private final SegmentMetadataFetcher _segmentMetadataFetcher;
+  private final Map<Integer, StreamPartitionMsgOffset> 
_partitionGroupIdToLargestStreamOffsetMap;
+  private final StreamPartitionMsgOffsetFactory 
_streamPartitionMsgOffsetFactory;
+
+  private ControllerMetrics _controllerMetrics;
+
+  public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+      ControllerMetrics controllerMetrics, PartitionLevelStreamConfig 
streamConfig) {
+    _realtimeTableName = realtimeTableName;
+    _controllerMetrics = controllerMetrics;
+    _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, 
controllerMetrics);
+    _streamPartitionMsgOffsetFactory =
+        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+
+    // create partition group id to largest stream offset map
+    _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
+    streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
+    try {
+      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, 
Collections.emptyList())
+          .forEach(metadata -> {
+            
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+          });
+    } catch (Exception e) {
+      LOGGER.warn("Problem encountered in fetching stream metadata for topic: 
{} of table: {}. "
+              + "Continue finding missing consuming segment only with ideal 
state information.",
+          streamConfig.getTopicName(), streamConfig.getTableNameWithType());
+    }
+  }
+
+  @VisibleForTesting
+  MissingConsumingSegmentFinder(String realtimeTableName, 
SegmentMetadataFetcher segmentMetadataFetcher,
+      Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToLargestStreamOffsetMap,
+      StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) {
+    _realtimeTableName = realtimeTableName;
+    _segmentMetadataFetcher = segmentMetadataFetcher;
+    _partitionGroupIdToLargestStreamOffsetMap = 
partitionGroupIdToLargestStreamOffsetMap;
+    _streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory;
+  }
+
+  public void findAndEmitMetrics(IdealState idealState) {
+    MissingSegmentInfo info = 
findMissingSegments(idealState.getRecord().getMapFields(), Instant.now());
+    _controllerMetrics.setValueOfTableGauge(_realtimeTableName, 
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT,
+        info._totalCount);
+    _controllerMetrics
+        .setValueOfTableGauge(_realtimeTableName, 
ControllerGauge.MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT,
+            info._newPartitionGroupCount);
+    _controllerMetrics
+        .setValueOfTableGauge(_realtimeTableName, 
ControllerGauge.MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES,
+            info._maxDurationInMinutes);
+  }
+
+  @VisibleForTesting
+  MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>> 
idealStateMap, Instant now) {
+    // create the maps
+    Map<Integer, LLCSegmentName> partitionGroupIdToLatestConsumingSegmentMap = 
new HashMap<>();
+    Map<Integer, LLCSegmentName> partitionGroupIdToLatestCompletedSegmentMap = 
new HashMap<>();
+    idealStateMap.forEach((segmentName, instanceToStatusMap) -> {
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName != null) { // Skip the uploaded realtime segments 
that don't conform to llc naming
+        if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) {
+          updateMap(partitionGroupIdToLatestConsumingSegmentMap, 
llcSegmentName);
+        } else if 
(instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) {
+          updateMap(partitionGroupIdToLatestCompletedSegmentMap, 
llcSegmentName);
+        }
+      }
+    });
+
+    MissingSegmentInfo missingSegmentInfo = new MissingSegmentInfo();
+    if (!_partitionGroupIdToLargestStreamOffsetMap.isEmpty()) {
+      _partitionGroupIdToLargestStreamOffsetMap.forEach((partitionGroupId, 
largestStreamOffset) -> {
+        if 
(!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
+          LLCSegmentName latestCompletedSegment = 
partitionGroupIdToLatestCompletedSegmentMap.get(partitionGroupId);
+          if (latestCompletedSegment == null) {
+            // There's no consuming or completed segment for this partition 
group. Possibilities:
+            //   1) it's a new partition group that has not yet been detected
+            //   2) the first consuming segment has been deleted from ideal 
state manually
+            missingSegmentInfo._newPartitionGroupCount++;
+            missingSegmentInfo._totalCount++;
+          } else {
+            // Completed segment is available, but there's no consuming 
segment.
+            // Note that there is no problem in case the partition group has 
reached its end of life.
+            SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
+                .fetchSegmentZkMetadata(_realtimeTableName, 
latestCompletedSegment.getSegmentName());
+            StreamPartitionMsgOffset completedSegmentEndOffset =
+                
_streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+            if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
+              // there are unconsumed messages available on the stream
+              missingSegmentInfo._totalCount++;
+              updateMaxDurationInfo(missingSegmentInfo, partitionGroupId, 
segmentZKMetadata.getCreationTime(), now);
+            }
+          }
+        }
+      });
+    } else {
+      partitionGroupIdToLatestCompletedSegmentMap.forEach((partitionGroupId, 
latestCompletedSegment) -> {
+        if 
(!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
+          missingSegmentInfo._totalCount++;
+          long segmentCompletionTimeMillis = _segmentMetadataFetcher
+              .fetchSegmentCompletionTime(_realtimeTableName, 
latestCompletedSegment.getSegmentName());
+          updateMaxDurationInfo(missingSegmentInfo, partitionGroupId, 
segmentCompletionTimeMillis, now);
+        }
+      });
+    }
+    return missingSegmentInfo;
+  }
+
+  private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo, 
Integer partitionGroupId,
+      long segmentCompletionTimeMillis, Instant now) {
+    long duration = 
Duration.between(Instant.ofEpochMilli(segmentCompletionTimeMillis), 
now).toMinutes();
+    if (duration > missingSegmentInfo._maxDurationInMinutes) {
+      missingSegmentInfo._maxDurationInMinutes = duration;
+    }
+    LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {} 
minutes!", partitionGroupId, duration);
+  }
+
+  private void updateMap(Map<Integer, LLCSegmentName> 
partitionGroupIdToLatestSegmentMap,
+      LLCSegmentName llcSegmentName) {
+    int partitionGroupId = llcSegmentName.getPartitionGroupId();
+    partitionGroupIdToLatestSegmentMap.compute(partitionGroupId, (pid, 
existingSegment) -> {
+      if (existingSegment == null) {
+        return llcSegmentName;
+      } else {
+        return existingSegment.getSequenceNumber() > 
llcSegmentName.getSequenceNumber() ? existingSegment
+            : llcSegmentName;
+      }
+    });
+  }
+
+  @VisibleForTesting
+  static class MissingSegmentInfo {
+    long _totalCount;
+    long _newPartitionGroupCount;
+    long _maxDurationInMinutes;
+  }
+
+  static class SegmentMetadataFetcher {
+    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+    private ControllerMetrics _controllerMetrics;
+
+    public SegmentMetadataFetcher(ZkHelixPropertyStore<ZNRecord> 
propertyStore, ControllerMetrics controllerMetrics) {
+      _propertyStore = propertyStore;
+      _controllerMetrics = controllerMetrics;
+    }
+
+    public SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String 
segmentName) {
+      return fetchSegmentZkMetadata(tableName, segmentName, null);
+    }
+
+    public long fetchSegmentCompletionTime(String tableName, String 
segmentName) {
+      Stat stat = new Stat();
+      fetchSegmentZkMetadata(tableName, segmentName, stat);
+      return stat.getMtime();
+    }
+
+    private SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String 
segmentName, Stat stat) {
+      try {
+        ZNRecord znRecord = _propertyStore
+            
.get(ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, 
segmentName), stat,
+                AccessOption.PERSISTENT);
+        Preconditions.checkState(znRecord != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
+            segmentName, tableName);
+        return new SegmentZKMetadata(znRecord);
+      } catch (Exception e) {
+        _controllerMetrics.addMeteredTableValue(tableName, 
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
+        throw e;
+      }
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 75a3b84f93..7c1e5872f0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -862,9 +862,6 @@ public class PinotLLCRealtimeSegmentManager {
    * If so, it should create a new CONSUMING segment for the partition.
    * (this operation is done only if @param recreateDeletedConsumingSegment is 
set to true,
    * which means it's manually triggered by admin not by automatic periodic 
task)
-   *
-   * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and
-   * reset it to 0 at the end of validateLLC
    */
   public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
       boolean recreateDeletedConsumingSegment) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 7d788fc072..00ad93bbe0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -52,10 +54,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -181,7 +180,7 @@ public class SegmentStatusCheckerTest {
     allTableNames.add(tableName);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn").setLLC(true)
-            .setNumReplicas(3).build();
+            .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build();
     final LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, 
System.currentTimeMillis());
     final LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, 
System.currentTimeMillis());
     final LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, 
System.currentTimeMillis());
@@ -217,6 +216,9 @@ public class SegmentStatusCheckerTest {
       when(_helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
       
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+      ZNRecord znRecord = new ZNRecord("0");
+      znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, 
"10000");
+      when(_helixPropertyStore.get(anyString(), any(), 
anyInt())).thenReturn(znRecord);
     }
     {
       _config = mock(ControllerConf.class);
@@ -251,6 +253,18 @@ public class SegmentStatusCheckerTest {
             100);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(externalView.getId(), 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    Assert.assertEquals(_controllerMetrics
+        .getValueOfTableGauge(externalView.getId(), 
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
+  }
+
+  Map<String, String> getStreamConfigMap() {
+    return ImmutableMap.of(
+        "streamType", "kafka",
+        "stream.kafka.consumer.type", "simple",
+        "stream.kafka.topic.name", "test",
+        "stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
+        "stream.kafka.consumer.factory.class.name",
+        
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
   }
 
   @Test
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java
new file mode 100644
index 0000000000..1b14592916
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class MissingConsumingSegmentFinderTest {
+
+  private StreamPartitionMsgOffsetFactory _offsetFactory = new 
LongMsgOffsetFactory();
+
+  @Test
+  public void noMissingConsumingSegmentsScenario1() {
+    // scenario 1: no missing segments, but connecting to stream throws 
exception
+    // only ideal state info is used
+
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+    // partition 0
+    idealStateMap.put("tableA__0__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 1
+    idealStateMap.put("tableA__1__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 2
+    idealStateMap.put("tableA__2__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 3
+    idealStateMap.put("tableA__3__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+
+    Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+    MissingConsumingSegmentFinder finder = new 
MissingConsumingSegmentFinder("tableA", null, new HashMap<>(), null);
+    MissingConsumingSegmentFinder.MissingSegmentInfo info = 
finder.findMissingSegments(idealStateMap, now);
+    assertEquals(info._totalCount, 0);
+    assertEquals(info._newPartitionGroupCount, 0);
+    assertEquals(info._maxDurationInMinutes, 0);
+  }
+
+  @Test
+  public void noMissingConsumingSegmentsScenario2() {
+    // scenario 2: no missing segments and there's no exception in connecting 
to stream
+
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+    // partition 0
+    idealStateMap.put("tableA__0__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 1
+    idealStateMap.put("tableA__1__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 2
+    idealStateMap.put("tableA__2__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 3
+    idealStateMap.put("tableA__3__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+
+    Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToLargestStreamOffsetMap = ImmutableMap.of(
+        0, new LongMsgOffset(1000),
+        1, new LongMsgOffset(1001),
+        2, new LongMsgOffset(1002),
+        3, new LongMsgOffset(1003)
+    );
+
+    Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+    MissingConsumingSegmentFinder finder =
+        new MissingConsumingSegmentFinder("tableA", null, 
partitionGroupIdToLargestStreamOffsetMap, null);
+    MissingConsumingSegmentFinder.MissingSegmentInfo info = 
finder.findMissingSegments(idealStateMap, now);
+    assertEquals(info._totalCount, 0);
+    assertEquals(info._newPartitionGroupCount, 0);
+    assertEquals(info._maxDurationInMinutes, 0);
+  }
+
+  @Test
+  public void noMissingConsumingSegmentsScenario3() {
+    // scenario 3: no missing segments and there's no exception in connecting 
to stream
+    // two partitions have reached end of life
+
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+    // partition 0
+    idealStateMap.put("tableA__0__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 1 (has reached end of life)
+    idealStateMap.put("tableA__1__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 2
+    idealStateMap.put("tableA__2__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 3 (has reached end of life)
+    idealStateMap.put("tableA__3__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+
+    Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToLargestStreamOffsetMap = ImmutableMap.of(
+        0, new LongMsgOffset(1000),
+        1, new LongMsgOffset(701),
+        2, new LongMsgOffset(1002),
+        3, new LongMsgOffset(703)
+    );
+
+    // setup segment metadata fetcher
+    SegmentZKMetadata m1 = mock(SegmentZKMetadata.class);
+    when(m1.getEndOffset()).thenReturn("701");
+    SegmentZKMetadata m3 = mock(SegmentZKMetadata.class);
+    when(m3.getEndOffset()).thenReturn("703");
+    MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+        mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+    when(metadataFetcher.fetchSegmentZkMetadata("tableA", 
"tableA__1__1__20220601T1200Z")).thenReturn(m1);
+    when(metadataFetcher.fetchSegmentZkMetadata("tableA", 
"tableA__3__1__20220601T1200Z")).thenReturn(m3);
+
+    Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+    MissingConsumingSegmentFinder finder =
+        new MissingConsumingSegmentFinder("tableA", metadataFetcher, 
partitionGroupIdToLargestStreamOffsetMap,
+            _offsetFactory);
+    MissingConsumingSegmentFinder.MissingSegmentInfo info = 
finder.findMissingSegments(idealStateMap, now);
+    assertEquals(info._totalCount, 0);
+    assertEquals(info._newPartitionGroupCount, 0);
+    assertEquals(info._maxDurationInMinutes, 0);
+  }
+
+  @Test
+  public void noMissingConsumingSegmentsScenario4() {
+    // scenario 4: no missing segments, but connecting to stream throws 
exception
+    // two partitions have reached end of life
+    // since there's no way to detect if the partitions have reached end of 
life, those partitions are reported as
+    // missing consuming segments
+
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+    // partition 0
+    idealStateMap.put("tableA__0__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 1 (has reached end of life)
+    idealStateMap.put("tableA__1__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 2
+    idealStateMap.put("tableA__2__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 3
+    idealStateMap.put("tableA__3__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 4 (has reached end of life)
+    idealStateMap.put("tableA__4__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 5
+    idealStateMap.put("tableA__5__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__5__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__5__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+
+    // setup segment metadata fetcher
+    MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+        mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+    when(metadataFetcher.fetchSegmentCompletionTime("tableA", 
"tableA__1__1__20220601T1200Z"))
+        .thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli());
+    when(metadataFetcher.fetchSegmentCompletionTime("tableA", 
"tableA__4__0__20220601T0900Z"))
+        .thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli());
+
+    Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+    MissingConsumingSegmentFinder finder =
+        new MissingConsumingSegmentFinder("tableA", metadataFetcher, new 
HashMap<>(), null);
+    MissingConsumingSegmentFinder.MissingSegmentInfo info = 
finder.findMissingSegments(idealStateMap, now);
+    assertEquals(info._totalCount, 2);
+    assertEquals(info._newPartitionGroupCount, 0);
+    assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00) 
in minutes
+  }
+
+  @Test
+  public void missingConsumingSegments() {
+
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+    // partition 0
+    idealStateMap.put("tableA__0__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 1 (missing consuming segment)
+    idealStateMap.put("tableA__1__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 2
+    idealStateMap.put("tableA__2__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 3
+    idealStateMap.put("tableA__3__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 4 (missing consuming segment)
+    idealStateMap.put("tableA__4__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 5
+    idealStateMap.put("tableA__5__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__5__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__5__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 6 is a new partition and there's no consuming segment in 
ideal states for it
+
+    Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
+    partitionGroupIdToLargestStreamOffsetMap.put(0, new LongMsgOffset(1000));
+    partitionGroupIdToLargestStreamOffsetMap.put(1, new LongMsgOffset(1001));
+    partitionGroupIdToLargestStreamOffsetMap.put(2, new LongMsgOffset(1002));
+    partitionGroupIdToLargestStreamOffsetMap.put(3, new LongMsgOffset(1003));
+    partitionGroupIdToLargestStreamOffsetMap.put(4, new LongMsgOffset(1004));
+    partitionGroupIdToLargestStreamOffsetMap.put(5, new LongMsgOffset(1005));
+    partitionGroupIdToLargestStreamOffsetMap.put(6, new LongMsgOffset(16));
+
+    // setup segment metadata fetcher
+    SegmentZKMetadata m1 = mock(SegmentZKMetadata.class);
+    when(m1.getEndOffset()).thenReturn("701");
+    
when(m1.getCreationTime()).thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli());
+    SegmentZKMetadata m4 = mock(SegmentZKMetadata.class);
+    when(m4.getEndOffset()).thenReturn("704");
+    
when(m4.getCreationTime()).thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli());
+    MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+        mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+    when(metadataFetcher.fetchSegmentZkMetadata("tableA", 
"tableA__1__1__20220601T1200Z")).thenReturn(m1);
+    when(metadataFetcher.fetchSegmentZkMetadata("tableA", 
"tableA__4__0__20220601T0900Z")).thenReturn(m4);
+
+    Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+    MissingConsumingSegmentFinder finder =
+        new MissingConsumingSegmentFinder("tableA", metadataFetcher, 
partitionGroupIdToLargestStreamOffsetMap,
+            _offsetFactory);
+    MissingConsumingSegmentFinder.MissingSegmentInfo info = 
finder.findMissingSegments(idealStateMap, now);
+    assertEquals(info._totalCount, 3);
+    assertEquals(info._newPartitionGroupCount, 1);
+    assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00) 
in minutes
+  }
+
+  @Test
+  public void missingConsumingSegmentsWithStreamConnectionIssue() {
+
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+    // partition 0
+    idealStateMap.put("tableA__0__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__0__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 1 (missing consuming segment)
+    idealStateMap.put("tableA__1__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__1__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 2
+    idealStateMap.put("tableA__2__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__2__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 3
+    idealStateMap.put("tableA__3__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__3__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 4 (missing consuming segment)
+    idealStateMap.put("tableA__4__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    // partition 5
+    idealStateMap.put("tableA__5__0__20220601T0900Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__5__1__20220601T1200Z", 
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+    idealStateMap.put("tableA__5__2__20220601T1500Z", 
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+    // partition 6 is a new partition and there's no consuming segment in 
ideal states for it
+
+    // setup segment metadata fetcher
+    MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+        mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+    when(metadataFetcher.fetchSegmentCompletionTime("tableA", 
"tableA__1__1__20220601T1200Z"))
+        .thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli());
+    when(metadataFetcher.fetchSegmentCompletionTime("tableA", 
"tableA__4__0__20220601T0900Z"))
+        .thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli());
+
+    Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+    MissingConsumingSegmentFinder finder =
+        new MissingConsumingSegmentFinder("tableA", metadataFetcher, new 
HashMap<>(), _offsetFactory);
+    MissingConsumingSegmentFinder.MissingSegmentInfo info = 
finder.findMissingSegments(idealStateMap, now);
+    assertEquals(info._totalCount, 2);
+    assertEquals(info._newPartitionGroupCount, 0);
+    assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00) 
in minutes
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to