This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3724ba27cf Adding a consumer lag as metric via a periodic task in controller (#9800) 3724ba27cf is described below commit 3724ba27cf5ceeebdd9805327b7f3d29abdea47d Author: Navina Ramesh <nav...@apache.org> AuthorDate: Fri Nov 18 14:14:01 2022 -0800 Adding a consumer lag as metric via a periodic task in controller (#9800) * emit lag as a part of segment status checker * continue * verified realtime consumer monitor emits metrics * added unit test; method in abstract metrics to get partition level gauge value * cleanup * fix unit test * Disable monitor by default; addressed feedback --- .../pinot/common/metrics/AbstractMetrics.java | 19 +++ .../pinot/common/metrics/ControllerGauge.java | 7 +- .../pinot/controller/BaseControllerStarter.java | 5 + .../apache/pinot/controller/ControllerConf.java | 19 +++ .../controller/helix/RealtimeConsumerMonitor.java | 121 ++++++++++++++ .../util/ConsumingSegmentInfoReader.java | 2 + ...ControllerPeriodicTaskStarterStatelessTest.java | 2 +- .../helix/RealtimeConsumerMonitorTest.java | 179 +++++++++++++++++++++ .../apache/pinot/spi/stream/PartitionLagState.java | 2 +- 9 files changed, 353 insertions(+), 3 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index 24656a3b70..740ae7c4b9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -411,6 +411,25 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e } } + /** + * Gets the value of a table partition gauge. + * + * @param tableName The table name + * @param partitionId The partition name + * @param gauge The gauge to use + */ + public long getValueOfPartitionGauge(final String tableName, final int partitionId, final G gauge) { + final String fullGaugeName; + String gaugeName = gauge.getGaugeName(); + fullGaugeName = gaugeName + "." + getTableName(tableName) + "." + partitionId; + + if (!_gaugeValues.containsKey(fullGaugeName)) { + return -1; + } else { + return _gaugeValues.get(fullGaugeName).get(); + } + } + /** * Initializes all global meters (such as exceptions count) to zero. */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 4a0e2dfe26..fdf7c918ac 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -117,8 +117,13 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { SEGMENT_DOWNLOADS_IN_PROGRESS("segmentDownloadsInProgress", true), // Number of in progress segment uploads - SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true); + SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true), + // Records lag at a partition level + MAX_RECORDS_LAG("maxRecordsLag", false), + + // Consumption availability lag in ms at a partition level + MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false); private final String _gaugeName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 27afdefcc6..b3334617c0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -73,6 +73,7 @@ import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.api.resources.ControllerFilePathProvider; import org.apache.pinot.controller.api.resources.InvalidControllerConfigException; +import org.apache.pinot.controller.helix.RealtimeConsumerMonitor; import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask; @@ -153,6 +154,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected SegmentRelocator _segmentRelocator; protected RetentionManager _retentionManager; protected SegmentStatusChecker _segmentStatusChecker; + protected RealtimeConsumerMonitor _realtimeConsumerMonitor; protected PinotTaskManager _taskManager; protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache; protected PeriodicTaskScheduler _periodicTaskScheduler; @@ -678,6 +680,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); periodicTasks.add(_segmentStatusChecker); + _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, + _controllerMetrics, _executorService); + periodicTasks.add(_realtimeConsumerMonitor); _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService, _connectionManager); periodicTasks.add(_segmentRelocator); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index f0a4584478..656c4dedde 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -217,6 +217,14 @@ public class ControllerConf extends PinotConfiguration { private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60; private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60; private static final int DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS = -1; // Disabled + + // Realtime Consumer Monitor + private static final String RT_CONSUMER_MONITOR_FREQUENCY_PERIOD = + "controller.realtimeConsumerMonitor.frequencyPeriod"; + private static final String RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS = + "controller.realtimeConsumerMonitor.initialDelayInSeconds"; + + private static final int DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_IN_SECONDS = -1; // Disabled by default } private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds"; @@ -588,6 +596,17 @@ public class ControllerConf extends PinotConfiguration { Integer.toString(statusCheckerFrequencyInSeconds)); } + public int getRealtimeConsumerMonitorRunFrequency() { + return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD)) + .map(period -> (int) convertPeriodToSeconds(period)).orElse( + ControllerPeriodicTasksConf.DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_IN_SECONDS); + } + + public long getRealtimeConsumerMonitorInitialDelayInSeconds() { + return getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS, + ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds()); + } + public int getTaskMetricsEmitterFrequencyInSeconds() { return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD)) .map(period -> (int) convertPeriodToSeconds(period)).orElseGet( diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java new file mode 100644 index 0000000000..eae591f69c --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import org.apache.commons.httpclient.SimpleHttpConnectionManager; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.stream.PartitionLagState; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class); + private static final int DEFAULT_TIMEOUT_MS = 10000; + private final ConsumingSegmentInfoReader _consumingSegmentInfoReader; + + @VisibleForTesting + public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, + ConsumingSegmentInfoReader consumingSegmentInfoReader) { + super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(), + controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager, + leadControllerManager, controllerMetrics); + _consumingSegmentInfoReader = consumingSegmentInfoReader; + } + + public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, + ExecutorService executorService) { + this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics, + new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager)); + } + + @Override + protected void setUpTask() { + LOGGER.info("Setting up RealtimeConsumerMonitor task"); + } + + @Override + protected void processTable(String tableNameWithType) { + if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) { + return; + } + try { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap = + _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, DEFAULT_TIMEOUT_MS); + Map<String, List<Long>> partitionToLagSet = new HashMap<>(); + Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>(); + + for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info + : segmentsInfoMap._segmentToConsumingInfoMap.values()) { + info.forEach(segment -> { + segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> { + if (!PartitionLagState.NOT_CALCULATED.equals(v)) { + try { + long recordsLag = Long.parseLong(v); + partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>()).add(recordsLag); + } catch (NumberFormatException nfe) { + // skip this as we are unable to parse the lag string + } + } + }); + segment._partitionOffsetInfo._availabilityLagMap.forEach((k, v) -> { + if (!PartitionLagState.NOT_CALCULATED.equals(v)) { + try { + long availabilityLagMs = Long.parseLong(v); + partitionToAvailabilityLagSet.computeIfAbsent(k, k1 -> new ArrayList<>()).add(availabilityLagMs); + } catch (NumberFormatException nfe) { + // skip this as we are unable to parse the lag string + } + } + }); + }); + } + partitionToLagSet.forEach((partition, lagSet) -> { + _controllerMetrics.setValueOfPartitionGauge(tableNameWithType, Integer.parseInt(partition), + ControllerGauge.MAX_RECORDS_LAG, Collections.max(lagSet)); + }); + + partitionToAvailabilityLagSet.forEach((partition, lagSet) -> { + _controllerMetrics.setValueOfPartitionGauge(tableNameWithType, Integer.parseInt(partition), + ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS, Collections.max(lagSet)); + }); + } catch (Exception e) { + LOGGER.error("Failed to fetch consuming segments info. Unable to update table consumption status metrics"); + } + } + + public static final class Context { } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java index 6caaea3087..2108eed344 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java @@ -227,6 +227,8 @@ public class ConsumingSegmentInfoReader { } } + // TODO: Invert response to be a map of partition to a vector of [currentOffset, recordsLag, latestUpstreamOffset, + // availabilityLagMs] @JsonIgnoreProperties(ignoreUnknown = true) static public class PartitionOffsetInfo { @JsonProperty("currentOffsetsMap") diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java index b82ed7f48e..6172c67def 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java @@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest extends ControllerTest { } private class MockControllerStarter extends ControllerStarter { - private static final int NUM_PERIODIC_TASKS = 9; + private static final int NUM_PERIODIC_TASKS = 10; public MockControllerStarter() { super(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java new file mode 100644 index 0000000000..1450c02a35 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class RealtimeConsumerMonitorTest { + + @Test + public void realtimeBasicTest() + throws Exception { + final String tableName = "myTable_REALTIME"; + final String rawTableName = TableNameBuilder.extractRawTableName(tableName); + List<String> allTableNames = new ArrayList<String>(); + allTableNames.add(tableName); + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn").setLLC(true) + .setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build(); + LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); + LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); + LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2, 0, System.currentTimeMillis()); + IdealState idealState = new IdealState(tableName); + idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE"); + idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE"); + idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING"); + idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), "pinot2", "CONSUMING"); + idealState.setPartitionState(segmentPartition2Seq0.getSegmentName(), "pinot1", "CONSUMING"); + idealState.setPartitionState(segmentPartition2Seq0.getSegmentName(), "pinot2", "CONSUMING"); + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(tableName); + externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE"); + externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE"); + externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING"); + externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot2", "CONSUMING"); + externalView.setState(segmentPartition2Seq0.getSegmentName(), "pinot1", "CONSUMING"); + externalView.setState(segmentPartition2Seq0.getSegmentName(), "pinot2", "CONSUMING"); + + PinotHelixResourceManager helixResourceManager; + { + helixResourceManager = mock(PinotHelixResourceManager.class); + ZkHelixPropertyStore<ZNRecord> helixPropertyStore = mock(ZkHelixPropertyStore.class); + when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); + when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore); + when(helixResourceManager.getAllTables()).thenReturn(allTableNames); + when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); + when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + } + ControllerConf config; + { + config = mock(ControllerConf.class); + when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); + when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); + } + LeadControllerManager leadControllerManager; + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } + PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); + ControllerMetrics controllerMetrics = new ControllerMetrics(metricsRegistry); + + // server 1 caught up on partition-1 and partition-2 + // server 2 lags for partition-2 and caught up on partition-1 + // So, the consumer monitor should show: 1. partition-1 has 0 lag; partition-2 has some non-zero lag. + // Segment 1 in replicas: + TreeMap<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> response = new TreeMap<>(); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part1ServerConsumingSegmentInfo = new ArrayList<>(2); + part1ServerConsumingSegmentInfo.add( + getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0")); + part1ServerConsumingSegmentInfo.add( + getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0")); + + response.put(segmentPartition1Seq1.getSegmentName(), part1ServerConsumingSegmentInfo); + + // Segment 2 in replicas + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part2ServerConsumingSegmentInfo = new ArrayList<>(2); + part2ServerConsumingSegmentInfo.add( + getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0")); + part2ServerConsumingSegmentInfo.add( + getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000")); + + response.put(segmentPartition2Seq0.getSegmentName(), part2ServerConsumingSegmentInfo); + + ConsumingSegmentInfoReader consumingSegmentReader = mock(ConsumingSegmentInfoReader.class); + when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000)) + .thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response)); + RealtimeConsumerMonitor realtimeConsumerMonitor = + new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, + controllerMetrics, consumingSegmentReader); + realtimeConsumerMonitor.start(); + realtimeConsumerMonitor.run(); + Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 1, + ControllerGauge.MAX_RECORDS_LAG), 0); + Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 2, + ControllerGauge.MAX_RECORDS_LAG), 40); + Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 1, + ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); + Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 2, + ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000); + } + + ConsumingSegmentInfoReader.ConsumingSegmentInfo getConsumingSegmentInfoForServer(String serverName, + String partitionId, String currentOffset, String upstreamLatestOffset, String availabilityLagMs) { + Map<String, String> currentOffsetMap = Collections.singletonMap(partitionId, currentOffset); + Map<String, String> latestUpstreamOffsetMap = Collections.singletonMap(partitionId, upstreamLatestOffset); + Map<String, String> recordsLagMap = Collections.singletonMap(partitionId, String.valueOf( + Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset))); + Map<String, String> availabilityLagMsMap = Collections.singletonMap(partitionId, availabilityLagMs); + + ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo = + new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap, latestUpstreamOffsetMap, recordsLagMap, + availabilityLagMsMap); + return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1, + currentOffsetMap, partitionOffsetInfo); + } + + Map<String, String> getStreamConfigMap() { + return ImmutableMap.of( + "streamType", "kafka", + "stream.kafka.consumer.type", "simple", + "stream.kafka.topic.name", "test", + "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name", + "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java index ead8246baf..224775fbc0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java @@ -24,7 +24,7 @@ package org.apache.pinot.spi.stream; * record offset, ingestion time etc. */ public class PartitionLagState { - protected final static String NOT_CALCULATED = "NOT_CALCULATED"; + public final static String NOT_CALCULATED = "NOT_CALCULATED"; /** * Defines how far behind the current record's offset / pointer is from upstream latest record --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org