This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3724ba27cf Adding a consumer lag as metric via a periodic task in 
controller (#9800)
3724ba27cf is described below

commit 3724ba27cf5ceeebdd9805327b7f3d29abdea47d
Author: Navina Ramesh <nav...@apache.org>
AuthorDate: Fri Nov 18 14:14:01 2022 -0800

    Adding a consumer lag as metric via a periodic task in controller (#9800)
    
    * emit lag as a part of segment status checker
    
    * continue
    
    * verified realtime consumer monitor emits metrics
    
    * added unit test; method in abstract metrics to get partition level gauge 
value
    
    * cleanup
    
    * fix unit test
    
    * Disable monitor by default; addressed feedback
---
 .../pinot/common/metrics/AbstractMetrics.java      |  19 +++
 .../pinot/common/metrics/ControllerGauge.java      |   7 +-
 .../pinot/controller/BaseControllerStarter.java    |   5 +
 .../apache/pinot/controller/ControllerConf.java    |  19 +++
 .../controller/helix/RealtimeConsumerMonitor.java  | 121 ++++++++++++++
 .../util/ConsumingSegmentInfoReader.java           |   2 +
 ...ControllerPeriodicTaskStarterStatelessTest.java |   2 +-
 .../helix/RealtimeConsumerMonitorTest.java         | 179 +++++++++++++++++++++
 .../apache/pinot/spi/stream/PartitionLagState.java |   2 +-
 9 files changed, 353 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 24656a3b70..740ae7c4b9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -411,6 +411,25 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     }
   }
 
+  /**
+   * Gets the value of a table partition gauge.
+   *
+   * @param tableName The table name
+   * @param partitionId The partition name
+   * @param gauge The gauge to use
+   */
+  public long getValueOfPartitionGauge(final String tableName, final int 
partitionId, final G gauge) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + getTableName(tableName) + "." + 
partitionId;
+
+    if (!_gaugeValues.containsKey(fullGaugeName)) {
+      return -1;
+    } else {
+      return _gaugeValues.get(fullGaugeName).get();
+    }
+  }
+
   /**
    * Initializes all global meters (such as exceptions count) to zero.
    */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 4a0e2dfe26..fdf7c918ac 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -117,8 +117,13 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   SEGMENT_DOWNLOADS_IN_PROGRESS("segmentDownloadsInProgress", true),
 
   // Number of in progress segment uploads
-  SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true);
+  SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true),
 
+  // Records lag at a partition level
+  MAX_RECORDS_LAG("maxRecordsLag", false),
+
+  // Consumption availability lag in ms at a partition level
+  MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 27afdefcc6..b3334617c0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -73,6 +73,7 @@ import 
org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
 import 
org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
+import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask;
@@ -153,6 +154,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected SegmentRelocator _segmentRelocator;
   protected RetentionManager _retentionManager;
   protected SegmentStatusChecker _segmentStatusChecker;
+  protected RealtimeConsumerMonitor _realtimeConsumerMonitor;
   protected PinotTaskManager _taskManager;
   protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> 
_taskManagerStatusCache;
   protected PeriodicTaskScheduler _periodicTaskScheduler;
@@ -678,6 +680,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
             _executorService);
     periodicTasks.add(_segmentStatusChecker);
+    _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, 
_helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executorService);
+    periodicTasks.add(_realtimeConsumerMonitor);
     _segmentRelocator = new SegmentRelocator(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
         _executorService, _connectionManager);
     periodicTasks.add(_segmentRelocator);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index f0a4584478..656c4dedde 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -217,6 +217,14 @@ public class ControllerConf extends PinotConfiguration {
     private static final int 
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
     private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 
60 * 60;
     private static final int 
DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS = -1; // Disabled
+
+    // Realtime Consumer Monitor
+    private static final String RT_CONSUMER_MONITOR_FREQUENCY_PERIOD =
+        "controller.realtimeConsumerMonitor.frequencyPeriod";
+    private static final String RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS =
+        "controller.realtimeConsumerMonitor.initialDelayInSeconds";
+
+    private static final int DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_IN_SECONDS 
= -1; // Disabled by default
   }
 
   private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 
"server.request.timeoutSeconds";
@@ -588,6 +596,17 @@ public class ControllerConf extends PinotConfiguration {
         Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
+  public int getRealtimeConsumerMonitorRunFrequency() {
+    return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD))
+        .map(period -> (int) convertPeriodToSeconds(period)).orElse(
+            
ControllerPeriodicTasksConf.DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_IN_SECONDS);
+  }
+
+  public long getRealtimeConsumerMonitorInitialDelayInSeconds() {
+    return 
getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS,
+        ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+  }
+
   public int getTaskMetricsEmitterFrequencyInSeconds() {
     return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD))
         .map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
new file mode 100644
index 0000000000..eae591f69c
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends 
ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private static final int DEFAULT_TIMEOUT_MS = 10000;
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", 
controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, 
controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new 
SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if 
(!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType)))
 {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          
_consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 
DEFAULT_TIMEOUT_MS);
+      Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+      Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+      for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+          : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+        info.forEach(segment -> {
+          segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long recordsLag = Long.parseLong(v);
+                partitionToLagSet.computeIfAbsent(k, k1 -> new 
ArrayList<>()).add(recordsLag);
+              } catch (NumberFormatException nfe) {
+                // skip this as we are unable to parse the lag string
+              }
+            }
+          });
+          segment._partitionOffsetInfo._availabilityLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long availabilityLagMs = Long.parseLong(v);
+                partitionToAvailabilityLagSet.computeIfAbsent(k, k1 -> new 
ArrayList<>()).add(availabilityLagMs);
+              } catch (NumberFormatException nfe) {
+                // skip this as we are unable to parse the lag string
+              }
+            }
+          });
+        });
+      }
+      partitionToLagSet.forEach((partition, lagSet) -> {
+        _controllerMetrics.setValueOfPartitionGauge(tableNameWithType, 
Integer.parseInt(partition),
+            ControllerGauge.MAX_RECORDS_LAG, Collections.max(lagSet));
+      });
+
+      partitionToAvailabilityLagSet.forEach((partition, lagSet) -> {
+        _controllerMetrics.setValueOfPartitionGauge(tableNameWithType, 
Integer.parseInt(partition),
+            ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS, 
Collections.max(lagSet));
+      });
+    } catch (Exception e) {
+      LOGGER.error("Failed to fetch consuming segments info. Unable to update 
table consumption status metrics");
+    }
+  }
+
+  public static final class Context { }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 6caaea3087..2108eed344 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -227,6 +227,8 @@ public class ConsumingSegmentInfoReader {
     }
   }
 
+  // TODO: Invert response to be a map of partition to a vector of 
[currentOffset, recordsLag, latestUpstreamOffset,
+  //  availabilityLagMs]
   @JsonIgnoreProperties(ignoreUnknown = true)
   static public class PartitionOffsetInfo {
     @JsonProperty("currentOffsetsMap")
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index b82ed7f48e..6172c67def 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest 
extends ControllerTest {
   }
 
   private class MockControllerStarter extends ControllerStarter {
-    private static final int NUM_PERIODIC_TASKS = 9;
+    private static final int NUM_PERIODIC_TASKS = 10;
 
     public MockControllerStarter() {
       super();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
new file mode 100644
index 0000000000..1450c02a35
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class RealtimeConsumerMonitorTest {
+
+  @Test
+  public void realtimeBasicTest()
+      throws Exception {
+    final String tableName = "myTable_REALTIME";
+    final String rawTableName = 
TableNameBuilder.extractRawTableName(tableName);
+    List<String> allTableNames = new ArrayList<String>();
+    allTableNames.add(tableName);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn").setLLC(true)
+            .setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build();
+    LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1, 
0, System.currentTimeMillis());
+    LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1, 
1, System.currentTimeMillis());
+    LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2, 
0, System.currentTimeMillis());
+    IdealState idealState = new IdealState(tableName);
+    idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), 
"pinot1", "ONLINE");
+    idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), 
"pinot2", "ONLINE");
+    idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), 
"pinot1", "CONSUMING");
+    idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), 
"pinot2", "CONSUMING");
+    idealState.setPartitionState(segmentPartition2Seq0.getSegmentName(), 
"pinot1", "CONSUMING");
+    idealState.setPartitionState(segmentPartition2Seq0.getSegmentName(), 
"pinot2", "CONSUMING");
+    idealState.setReplicas("3");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+    ExternalView externalView = new ExternalView(tableName);
+    externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1", 
"ONLINE");
+    externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2", 
"ONLINE");
+    externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1", 
"CONSUMING");
+    externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot2", 
"CONSUMING");
+    externalView.setState(segmentPartition2Seq0.getSegmentName(), "pinot1", 
"CONSUMING");
+    externalView.setState(segmentPartition2Seq0.getSegmentName(), "pinot2", 
"CONSUMING");
+
+    PinotHelixResourceManager helixResourceManager;
+    {
+      helixResourceManager = mock(PinotHelixResourceManager.class);
+      ZkHelixPropertyStore<ZNRecord> helixPropertyStore = 
mock(ZkHelixPropertyStore.class);
+      
when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+      
when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore);
+      when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
+      
when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
+      
when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+      ZNRecord znRecord = new ZNRecord("0");
+      znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, 
"10000");
+      when(helixPropertyStore.get(anyString(), any(), 
anyInt())).thenReturn(znRecord);
+    }
+    ControllerConf config;
+    {
+      config = mock(ControllerConf.class);
+      when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
+      when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
+    }
+    LeadControllerManager leadControllerManager;
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
+    PinotMetricsRegistry metricsRegistry = 
PinotMetricUtils.getPinotMetricsRegistry();
+    ControllerMetrics controllerMetrics = new 
ControllerMetrics(metricsRegistry);
+
+    // server 1 caught up on partition-1 and partition-2
+    // server 2 lags for partition-2 and caught up on partition-1
+    // So, the consumer monitor should show: 1. partition-1 has 0 lag; 
partition-2 has some non-zero lag.
+    // Segment 1 in replicas:
+    TreeMap<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> 
response = new TreeMap<>();
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
part1ServerConsumingSegmentInfo = new ArrayList<>(2);
+    part1ServerConsumingSegmentInfo.add(
+        getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"));
+    part1ServerConsumingSegmentInfo.add(
+        getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0"));
+
+    response.put(segmentPartition1Seq1.getSegmentName(), 
part1ServerConsumingSegmentInfo);
+
+    // Segment 2 in replicas
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
part2ServerConsumingSegmentInfo = new ArrayList<>(2);
+    part2ServerConsumingSegmentInfo.add(
+        getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"));
+    part2ServerConsumingSegmentInfo.add(
+        getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000"));
+
+    response.put(segmentPartition2Seq0.getSegmentName(), 
part2ServerConsumingSegmentInfo);
+
+    ConsumingSegmentInfoReader consumingSegmentReader = 
mock(ConsumingSegmentInfoReader.class);
+    when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
+        .thenReturn(new 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response));
+    RealtimeConsumerMonitor realtimeConsumerMonitor =
+        new RealtimeConsumerMonitor(config, helixResourceManager, 
leadControllerManager,
+            controllerMetrics, consumingSegmentReader);
+    realtimeConsumerMonitor.start();
+    realtimeConsumerMonitor.run();
+    Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 
1,
+        ControllerGauge.MAX_RECORDS_LAG), 0);
+    Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 
2,
+        ControllerGauge.MAX_RECORDS_LAG), 40);
+    Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 
1,
+        ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0);
+    Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 
2,
+        ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000);
+  }
+
+  ConsumingSegmentInfoReader.ConsumingSegmentInfo 
getConsumingSegmentInfoForServer(String serverName,
+      String partitionId, String currentOffset, String upstreamLatestOffset, 
String availabilityLagMs) {
+    Map<String, String> currentOffsetMap = 
Collections.singletonMap(partitionId, currentOffset);
+    Map<String, String> latestUpstreamOffsetMap = 
Collections.singletonMap(partitionId, upstreamLatestOffset);
+    Map<String, String> recordsLagMap = Collections.singletonMap(partitionId, 
String.valueOf(
+        Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset)));
+    Map<String, String> availabilityLagMsMap = 
Collections.singletonMap(partitionId, availabilityLagMs);
+
+    ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo =
+        new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap, 
latestUpstreamOffsetMap, recordsLagMap,
+            availabilityLagMsMap);
+    return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, 
"CONSUMING", -1,
+        currentOffsetMap, partitionOffsetInfo);
+  }
+
+  Map<String, String> getStreamConfigMap() {
+    return ImmutableMap.of(
+        "streamType", "kafka",
+        "stream.kafka.consumer.type", "simple",
+        "stream.kafka.topic.name", "test",
+        "stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
+        "stream.kafka.consumer.factory.class.name",
+        
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
index ead8246baf..224775fbc0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
@@ -24,7 +24,7 @@ package org.apache.pinot.spi.stream;
  * record offset, ingestion time etc.
  */
 public class PartitionLagState {
-  protected final static String NOT_CALCULATED = "NOT_CALCULATED";
+  public final static String NOT_CALCULATED = "NOT_CALCULATED";
 
   /**
    * Defines how far behind the current record's offset / pointer is from 
upstream latest record


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to