This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bc27ad99ec Emit metrics if there's no consuming segment for a partition (#8877) bc27ad99ec is described below commit bc27ad99ecd68fd4ae8522055f8a6a84e27fa3c1 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Thu Jun 30 13:39:25 2022 -0700 Emit metrics if there's no consuming segment for a partition (#8877) --- .../pinot/common/metrics/ControllerGauge.java | 11 +- .../controller/helix/SegmentStatusChecker.java | 29 +- .../realtime/MissingConsumingSegmentFinder.java | 229 +++++++++++++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 3 - .../controller/helix/SegmentStatusCheckerTest.java | 24 +- .../MissingConsumingSegmentFinderTest.java | 306 +++++++++++++++++++++ 6 files changed, 585 insertions(+), 17 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 2951cfc7fb..2cd8c79dd5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -101,7 +101,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { DROPPED_MINION_INSTANCES("droppedMinionInstances", true), // Number of online minion instances - ONLINE_MINION_INSTANCES("onlineMinionInstances", true); + ONLINE_MINION_INSTANCES("onlineMinionInstances", true), + + // Number of partitions with missing consuming segments in ideal state + MISSING_CONSUMING_SEGMENT_TOTAL_COUNT("missingConsumingSegmentTotalCount", false), + + // Number of new partitions with missing consuming segments in ideal state + MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT("missingConsumingSegmentNewPartitionCount", false), + + // Maximum duration of a missing consuming segment in ideal state (in minutes) + MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes", false); private final String _gaugeName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index bf9f179925..6470fab0fa 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -27,9 +27,11 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; @@ -41,9 +43,12 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,8 +115,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh @Override protected void processTable(String tableNameWithType, Context context) { try { - updateTableConfigMetrics(tableNameWithType); - updateSegmentMetrics(tableNameWithType, context); + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + updateTableConfigMetrics(tableNameWithType, tableConfig); + updateSegmentMetrics(tableNameWithType, tableConfig, context); updateTableSizeMetrics(tableNameWithType); } catch (Exception e) { LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e); @@ -131,8 +137,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh * Updates metrics related to the table config. * If table config not found, resets the metrics */ - private void updateTableConfigMetrics(String tableNameWithType) { - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig) { if (tableConfig == null) { LOGGER.warn("Found null table config for table: {}. Resetting table config metrics.", tableNameWithType); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, 0); @@ -156,8 +161,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh * Runs a segment status pass over the given table. * TODO: revisit the logic and reduce the ZK access */ - private void updateSegmentMetrics(String tableNameWithType, Context context) { - if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) { + private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == TableType.OFFLINE) { context._offlineTableCount++; } else { context._realTimeTableCount++; @@ -197,8 +203,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh // Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot // be queried from the table. Set<String> segmentsExcludeReplaced = new HashSet<>(idealState.getPartitionSet()); - SegmentLineage segmentLineage = - SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType); + ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore(); + SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType); SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced, segmentLineage); _controllerMetrics .setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length()); @@ -299,6 +305,13 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal, nReplicasIdealMax); } + + if (tableType == TableType.REALTIME && tableConfig != null) { + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, streamConfig) + .findAndEmitMetrics(idealState); + } } @Override diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java new file mode 100644 index 0000000000..e726b77e5f --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.realtime; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.AccessOption; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.IdealState; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * For a given table, this class finds out if there is any partition group for which there's no consuming segment in + * ideal state. If so, it emits three metrics: + * - Total number of partitions with missing consuming segments including + * - Number of newly added partitions for which there's no consuming segment (there's no completed segment either) + * - Maximum duration (in minutes) that a partition hasn't had a consuming segment + */ +public class MissingConsumingSegmentFinder { + private static final Logger LOGGER = LoggerFactory.getLogger(MissingConsumingSegmentFinder.class); + + private final String _realtimeTableName; + private final SegmentMetadataFetcher _segmentMetadataFetcher; + private final Map<Integer, StreamPartitionMsgOffset> _partitionGroupIdToLargestStreamOffsetMap; + private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; + + private ControllerMetrics _controllerMetrics; + + public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, + ControllerMetrics controllerMetrics, PartitionLevelStreamConfig streamConfig) { + _realtimeTableName = realtimeTableName; + _controllerMetrics = controllerMetrics; + _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); + _streamPartitionMsgOffsetFactory = + StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + + // create partition group id to largest stream offset map + _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); + streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + try { + PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + .forEach(metadata -> { + _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + }); + } catch (Exception e) { + LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. " + + "Continue finding missing consuming segment only with ideal state information.", + streamConfig.getTopicName(), streamConfig.getTableNameWithType()); + } + } + + @VisibleForTesting + MissingConsumingSegmentFinder(String realtimeTableName, SegmentMetadataFetcher segmentMetadataFetcher, + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToLargestStreamOffsetMap, + StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) { + _realtimeTableName = realtimeTableName; + _segmentMetadataFetcher = segmentMetadataFetcher; + _partitionGroupIdToLargestStreamOffsetMap = partitionGroupIdToLargestStreamOffsetMap; + _streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory; + } + + public void findAndEmitMetrics(IdealState idealState) { + MissingSegmentInfo info = findMissingSegments(idealState.getRecord().getMapFields(), Instant.now()); + _controllerMetrics.setValueOfTableGauge(_realtimeTableName, ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT, + info._totalCount); + _controllerMetrics + .setValueOfTableGauge(_realtimeTableName, ControllerGauge.MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT, + info._newPartitionGroupCount); + _controllerMetrics + .setValueOfTableGauge(_realtimeTableName, ControllerGauge.MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES, + info._maxDurationInMinutes); + } + + @VisibleForTesting + MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>> idealStateMap, Instant now) { + // create the maps + Map<Integer, LLCSegmentName> partitionGroupIdToLatestConsumingSegmentMap = new HashMap<>(); + Map<Integer, LLCSegmentName> partitionGroupIdToLatestCompletedSegmentMap = new HashMap<>(); + idealStateMap.forEach((segmentName, instanceToStatusMap) -> { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName != null) { // Skip the uploaded realtime segments that don't conform to llc naming + if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) { + updateMap(partitionGroupIdToLatestConsumingSegmentMap, llcSegmentName); + } else if (instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) { + updateMap(partitionGroupIdToLatestCompletedSegmentMap, llcSegmentName); + } + } + }); + + MissingSegmentInfo missingSegmentInfo = new MissingSegmentInfo(); + if (!_partitionGroupIdToLargestStreamOffsetMap.isEmpty()) { + _partitionGroupIdToLargestStreamOffsetMap.forEach((partitionGroupId, largestStreamOffset) -> { + if (!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) { + LLCSegmentName latestCompletedSegment = partitionGroupIdToLatestCompletedSegmentMap.get(partitionGroupId); + if (latestCompletedSegment == null) { + // There's no consuming or completed segment for this partition group. Possibilities: + // 1) it's a new partition group that has not yet been detected + // 2) the first consuming segment has been deleted from ideal state manually + missingSegmentInfo._newPartitionGroupCount++; + missingSegmentInfo._totalCount++; + } else { + // Completed segment is available, but there's no consuming segment. + // Note that there is no problem in case the partition group has reached its end of life. + SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher + .fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName()); + StreamPartitionMsgOffset completedSegmentEndOffset = + _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()); + if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) { + // there are unconsumed messages available on the stream + missingSegmentInfo._totalCount++; + updateMaxDurationInfo(missingSegmentInfo, partitionGroupId, segmentZKMetadata.getCreationTime(), now); + } + } + } + }); + } else { + partitionGroupIdToLatestCompletedSegmentMap.forEach((partitionGroupId, latestCompletedSegment) -> { + if (!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) { + missingSegmentInfo._totalCount++; + long segmentCompletionTimeMillis = _segmentMetadataFetcher + .fetchSegmentCompletionTime(_realtimeTableName, latestCompletedSegment.getSegmentName()); + updateMaxDurationInfo(missingSegmentInfo, partitionGroupId, segmentCompletionTimeMillis, now); + } + }); + } + return missingSegmentInfo; + } + + private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo, Integer partitionGroupId, + long segmentCompletionTimeMillis, Instant now) { + long duration = Duration.between(Instant.ofEpochMilli(segmentCompletionTimeMillis), now).toMinutes(); + if (duration > missingSegmentInfo._maxDurationInMinutes) { + missingSegmentInfo._maxDurationInMinutes = duration; + } + LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {} minutes!", partitionGroupId, duration); + } + + private void updateMap(Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegmentMap, + LLCSegmentName llcSegmentName) { + int partitionGroupId = llcSegmentName.getPartitionGroupId(); + partitionGroupIdToLatestSegmentMap.compute(partitionGroupId, (pid, existingSegment) -> { + if (existingSegment == null) { + return llcSegmentName; + } else { + return existingSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? existingSegment + : llcSegmentName; + } + }); + } + + @VisibleForTesting + static class MissingSegmentInfo { + long _totalCount; + long _newPartitionGroupCount; + long _maxDurationInMinutes; + } + + static class SegmentMetadataFetcher { + private ZkHelixPropertyStore<ZNRecord> _propertyStore; + private ControllerMetrics _controllerMetrics; + + public SegmentMetadataFetcher(ZkHelixPropertyStore<ZNRecord> propertyStore, ControllerMetrics controllerMetrics) { + _propertyStore = propertyStore; + _controllerMetrics = controllerMetrics; + } + + public SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String segmentName) { + return fetchSegmentZkMetadata(tableName, segmentName, null); + } + + public long fetchSegmentCompletionTime(String tableName, String segmentName) { + Stat stat = new Stat(); + fetchSegmentZkMetadata(tableName, segmentName, stat); + return stat.getMtime(); + } + + private SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String segmentName, Stat stat) { + try { + ZNRecord znRecord = _propertyStore + .get(ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentName), stat, + AccessOption.PERSISTENT); + Preconditions.checkState(znRecord != null, "Failed to find segment ZK metadata for segment: %s of table: %s", + segmentName, tableName); + return new SegmentZKMetadata(znRecord); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L); + throw e; + } + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 75a3b84f93..7c1e5872f0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -862,9 +862,6 @@ public class PinotLLCRealtimeSegmentManager { * If so, it should create a new CONSUMING segment for the partition. * (this operation is done only if @param recreateDeletedConsumingSegment is set to true, * which means it's manually triggered by admin not by automatic periodic task) - * - * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and - * reset it to 0 at the end of validateLLC */ public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, boolean recreateDeletedConsumingSegment) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 7d788fc072..00ad93bbe0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.controller.helix; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -52,10 +54,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -181,7 +180,7 @@ public class SegmentStatusCheckerTest { allTableNames.add(tableName); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn").setLLC(true) - .setNumReplicas(3).build(); + .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build(); final LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); final LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); final LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, System.currentTimeMillis()); @@ -217,6 +216,9 @@ public class SegmentStatusCheckerTest { when(_helixResourceManager.getAllTables()).thenReturn(allTableNames); when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(_helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); } { _config = mock(ControllerConf.class); @@ -251,6 +253,18 @@ public class SegmentStatusCheckerTest { 100); Assert.assertEquals( _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + Assert.assertEquals(_controllerMetrics + .getValueOfTableGauge(externalView.getId(), ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + } + + Map<String, String> getStreamConfigMap() { + return ImmutableMap.of( + "streamType", "kafka", + "stream.kafka.consumer.type", "simple", + "stream.kafka.topic.name", "test", + "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name", + "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); } @Test diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java new file mode 100644 index 0000000000..1b14592916 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.realtime; + +import com.google.common.collect.ImmutableMap; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.LongMsgOffsetFactory; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; + + +public class MissingConsumingSegmentFinderTest { + + private StreamPartitionMsgOffsetFactory _offsetFactory = new LongMsgOffsetFactory(); + + @Test + public void noMissingConsumingSegmentsScenario1() { + // scenario 1: no missing segments, but connecting to stream throws exception + // only ideal state info is used + + Map<String, Map<String, String>> idealStateMap = new HashMap<>(); + // partition 0 + idealStateMap.put("tableA__0__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 1 + idealStateMap.put("tableA__1__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 2 + idealStateMap.put("tableA__2__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 3 + idealStateMap.put("tableA__3__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + + Instant now = Instant.parse("2022-06-01T18:00:00.00Z"); + MissingConsumingSegmentFinder finder = new MissingConsumingSegmentFinder("tableA", null, new HashMap<>(), null); + MissingConsumingSegmentFinder.MissingSegmentInfo info = finder.findMissingSegments(idealStateMap, now); + assertEquals(info._totalCount, 0); + assertEquals(info._newPartitionGroupCount, 0); + assertEquals(info._maxDurationInMinutes, 0); + } + + @Test + public void noMissingConsumingSegmentsScenario2() { + // scenario 2: no missing segments and there's no exception in connecting to stream + + Map<String, Map<String, String>> idealStateMap = new HashMap<>(); + // partition 0 + idealStateMap.put("tableA__0__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 1 + idealStateMap.put("tableA__1__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 2 + idealStateMap.put("tableA__2__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 3 + idealStateMap.put("tableA__3__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToLargestStreamOffsetMap = ImmutableMap.of( + 0, new LongMsgOffset(1000), + 1, new LongMsgOffset(1001), + 2, new LongMsgOffset(1002), + 3, new LongMsgOffset(1003) + ); + + Instant now = Instant.parse("2022-06-01T18:00:00.00Z"); + MissingConsumingSegmentFinder finder = + new MissingConsumingSegmentFinder("tableA", null, partitionGroupIdToLargestStreamOffsetMap, null); + MissingConsumingSegmentFinder.MissingSegmentInfo info = finder.findMissingSegments(idealStateMap, now); + assertEquals(info._totalCount, 0); + assertEquals(info._newPartitionGroupCount, 0); + assertEquals(info._maxDurationInMinutes, 0); + } + + @Test + public void noMissingConsumingSegmentsScenario3() { + // scenario 3: no missing segments and there's no exception in connecting to stream + // two partitions have reached end of life + + Map<String, Map<String, String>> idealStateMap = new HashMap<>(); + // partition 0 + idealStateMap.put("tableA__0__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 1 (has reached end of life) + idealStateMap.put("tableA__1__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 2 + idealStateMap.put("tableA__2__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 3 (has reached end of life) + idealStateMap.put("tableA__3__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToLargestStreamOffsetMap = ImmutableMap.of( + 0, new LongMsgOffset(1000), + 1, new LongMsgOffset(701), + 2, new LongMsgOffset(1002), + 3, new LongMsgOffset(703) + ); + + // setup segment metadata fetcher + SegmentZKMetadata m1 = mock(SegmentZKMetadata.class); + when(m1.getEndOffset()).thenReturn("701"); + SegmentZKMetadata m3 = mock(SegmentZKMetadata.class); + when(m3.getEndOffset()).thenReturn("703"); + MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher = + mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class); + when(metadataFetcher.fetchSegmentZkMetadata("tableA", "tableA__1__1__20220601T1200Z")).thenReturn(m1); + when(metadataFetcher.fetchSegmentZkMetadata("tableA", "tableA__3__1__20220601T1200Z")).thenReturn(m3); + + Instant now = Instant.parse("2022-06-01T18:00:00.00Z"); + MissingConsumingSegmentFinder finder = + new MissingConsumingSegmentFinder("tableA", metadataFetcher, partitionGroupIdToLargestStreamOffsetMap, + _offsetFactory); + MissingConsumingSegmentFinder.MissingSegmentInfo info = finder.findMissingSegments(idealStateMap, now); + assertEquals(info._totalCount, 0); + assertEquals(info._newPartitionGroupCount, 0); + assertEquals(info._maxDurationInMinutes, 0); + } + + @Test + public void noMissingConsumingSegmentsScenario4() { + // scenario 4: no missing segments, but connecting to stream throws exception + // two partitions have reached end of life + // since there's no way to detect if the partitions have reached end of life, those partitions are reported as + // missing consuming segments + + Map<String, Map<String, String>> idealStateMap = new HashMap<>(); + // partition 0 + idealStateMap.put("tableA__0__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 1 (has reached end of life) + idealStateMap.put("tableA__1__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 2 + idealStateMap.put("tableA__2__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 3 + idealStateMap.put("tableA__3__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 4 (has reached end of life) + idealStateMap.put("tableA__4__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 5 + idealStateMap.put("tableA__5__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__5__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__5__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + + // setup segment metadata fetcher + MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher = + mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class); + when(metadataFetcher.fetchSegmentCompletionTime("tableA", "tableA__1__1__20220601T1200Z")) + .thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli()); + when(metadataFetcher.fetchSegmentCompletionTime("tableA", "tableA__4__0__20220601T0900Z")) + .thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli()); + + Instant now = Instant.parse("2022-06-01T18:00:00.00Z"); + MissingConsumingSegmentFinder finder = + new MissingConsumingSegmentFinder("tableA", metadataFetcher, new HashMap<>(), null); + MissingConsumingSegmentFinder.MissingSegmentInfo info = finder.findMissingSegments(idealStateMap, now); + assertEquals(info._totalCount, 2); + assertEquals(info._newPartitionGroupCount, 0); + assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00) in minutes + } + + @Test + public void missingConsumingSegments() { + + Map<String, Map<String, String>> idealStateMap = new HashMap<>(); + // partition 0 + idealStateMap.put("tableA__0__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 1 (missing consuming segment) + idealStateMap.put("tableA__1__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 2 + idealStateMap.put("tableA__2__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 3 + idealStateMap.put("tableA__3__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 4 (missing consuming segment) + idealStateMap.put("tableA__4__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 5 + idealStateMap.put("tableA__5__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__5__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__5__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 6 is a new partition and there's no consuming segment in ideal states for it + + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); + partitionGroupIdToLargestStreamOffsetMap.put(0, new LongMsgOffset(1000)); + partitionGroupIdToLargestStreamOffsetMap.put(1, new LongMsgOffset(1001)); + partitionGroupIdToLargestStreamOffsetMap.put(2, new LongMsgOffset(1002)); + partitionGroupIdToLargestStreamOffsetMap.put(3, new LongMsgOffset(1003)); + partitionGroupIdToLargestStreamOffsetMap.put(4, new LongMsgOffset(1004)); + partitionGroupIdToLargestStreamOffsetMap.put(5, new LongMsgOffset(1005)); + partitionGroupIdToLargestStreamOffsetMap.put(6, new LongMsgOffset(16)); + + // setup segment metadata fetcher + SegmentZKMetadata m1 = mock(SegmentZKMetadata.class); + when(m1.getEndOffset()).thenReturn("701"); + when(m1.getCreationTime()).thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli()); + SegmentZKMetadata m4 = mock(SegmentZKMetadata.class); + when(m4.getEndOffset()).thenReturn("704"); + when(m4.getCreationTime()).thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli()); + MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher = + mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class); + when(metadataFetcher.fetchSegmentZkMetadata("tableA", "tableA__1__1__20220601T1200Z")).thenReturn(m1); + when(metadataFetcher.fetchSegmentZkMetadata("tableA", "tableA__4__0__20220601T0900Z")).thenReturn(m4); + + Instant now = Instant.parse("2022-06-01T18:00:00.00Z"); + MissingConsumingSegmentFinder finder = + new MissingConsumingSegmentFinder("tableA", metadataFetcher, partitionGroupIdToLargestStreamOffsetMap, + _offsetFactory); + MissingConsumingSegmentFinder.MissingSegmentInfo info = finder.findMissingSegments(idealStateMap, now); + assertEquals(info._totalCount, 3); + assertEquals(info._newPartitionGroupCount, 1); + assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00) in minutes + } + + @Test + public void missingConsumingSegmentsWithStreamConnectionIssue() { + + Map<String, Map<String, String>> idealStateMap = new HashMap<>(); + // partition 0 + idealStateMap.put("tableA__0__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__0__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 1 (missing consuming segment) + idealStateMap.put("tableA__1__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__1__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 2 + idealStateMap.put("tableA__2__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__2__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 3 + idealStateMap.put("tableA__3__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__3__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 4 (missing consuming segment) + idealStateMap.put("tableA__4__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + // partition 5 + idealStateMap.put("tableA__5__0__20220601T0900Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__5__1__20220601T1200Z", ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE")); + idealStateMap.put("tableA__5__2__20220601T1500Z", ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING")); + // partition 6 is a new partition and there's no consuming segment in ideal states for it + + // setup segment metadata fetcher + MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher = + mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class); + when(metadataFetcher.fetchSegmentCompletionTime("tableA", "tableA__1__1__20220601T1200Z")) + .thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli()); + when(metadataFetcher.fetchSegmentCompletionTime("tableA", "tableA__4__0__20220601T0900Z")) + .thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli()); + + Instant now = Instant.parse("2022-06-01T18:00:00.00Z"); + MissingConsumingSegmentFinder finder = + new MissingConsumingSegmentFinder("tableA", metadataFetcher, new HashMap<>(), _offsetFactory); + MissingConsumingSegmentFinder.MissingSegmentInfo info = finder.findMissingSegments(idealStateMap, now); + assertEquals(info._totalCount, 2); + assertEquals(info._newPartitionGroupCount, 0); + assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00) in minutes + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org