This is an automated email from the ASF dual-hosted git repository. chrispeck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8b82a416c0 [multistage] Add Support for Inferring Invalid Segment Partition Id (#15760) 8b82a416c0 is described below commit 8b82a416c0bdc103a0b6e3e6f4c36d4d62f62d13 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Sat May 10 01:06:09 2025 -0500 [multistage] Add Support for Inferring Invalid Segment Partition Id (#15760) * [multistage] Add Support for Inferring Invalid Segment Partition Id * address feedback * address more feedback * more cleanups --- .../apache/pinot/common/metrics/BrokerMeter.java | 8 ++ .../common/utils/config/QueryOptionsUtils.java | 4 + .../org/apache/pinot/query/QueryEnvironment.java | 5 +- .../query/context/PhysicalPlannerContext.java | 9 +- .../planner/logical/PinotLogicalQueryPlanner.java | 3 + .../planner/physical/v2/PRelNodeTreeValidator.java | 76 ++++++++++++ .../physical/v2/PRelToPlanNodeConverter.java | 25 +--- .../opt/rules/LeafStageWorkerAssignmentRule.java | 97 ++++++++++++++-- .../rules/LeafStageWorkerAssignmentRuleTest.java | 128 +++++++++++++++++++-- .../apache/pinot/spi/utils/CommonConstants.java | 6 + 10 files changed, 316 insertions(+), 45 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index c27ea83f02..99603b2c77 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -204,6 +204,14 @@ public class BrokerMeter implements AbstractMetrics.Meter { */ public static final BrokerMeter WINDOW_COUNT = create("WINDOW_COUNT", "queries", true); + /** + * How many MSE queries have encountered segments with invalid partitions. + * <p> + * This is only emitted for when usePhysicalOptimizer is set to true. + */ + public static final BrokerMeter INVALID_SEGMENT_PARTITION_IN_QUERY = create("INVALID_SEGMENT_PARTITION_IN_QUERY", + "queries", false); + /** * Number of queries executed with cursors. This count includes queries that use SSE and MSE */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 622dff1b8f..399ddfc510 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -379,6 +379,10 @@ public class QueryOptionsUtils { return useMSEToFillEmptySchema != null ? Boolean.parseBoolean(useMSEToFillEmptySchema) : defaultValue; } + public static boolean isInferInvalidSegmentPartition(Map<String, String> queryOptions) { + return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_INVALID_SEGMENT_PARTITION, "false")); + } + @Nullable private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) { if (optionValue == null) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index ea250c9d6f..092fe60c2e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -79,6 +79,8 @@ import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter; import org.apache.pinot.query.planner.logical.TransformationTracker; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.planner.physical.PinotDispatchPlanner; +import org.apache.pinot.query.planner.physical.v2.PRelNode; +import org.apache.pinot.query.planner.physical.v2.PRelNodeTreeValidator; import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment; import org.apache.pinot.query.planner.physical.v2.RelToPRelConverter; import org.apache.pinot.query.planner.plannode.PlanNode; @@ -175,7 +177,7 @@ public class QueryEnvironment { workerManager = _envConfig.getWorkerManager(); physicalPlannerContext = new PhysicalPlannerContext(workerManager.getRoutingManager(), workerManager.getHostName(), workerManager.getPort(), _envConfig.getRequestId(), - workerManager.getInstanceId()); + workerManager.getInstanceId(), sqlNodeAndOptions.getOptions()); } return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram, sqlNodeAndOptions.getOptions(), _envConfig, format, physicalPlannerContext); @@ -333,6 +335,7 @@ public class QueryEnvironment { Preconditions.checkNotNull(plannerContext.getPhysicalPlannerContext(), "Physical planner context is null"); optimized = RelToPRelConverter.toPRelNode(optimized, plannerContext.getPhysicalPlannerContext(), _envConfig.getTableCache()).unwrap(); + PRelNodeTreeValidator.validate((PRelNode) optimized); } return relation.withRel(optimized); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index 6035306be8..84ca486650 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -56,6 +56,7 @@ public class PhysicalPlannerContext { * Instance ID of the instance corresponding to this process. */ private final String _instanceId; + private final Map<String, String> _queryOptions; /** * Used by controller when it needs to extract table names from the query. @@ -67,15 +68,17 @@ public class PhysicalPlannerContext { _port = 0; _requestId = 0; _instanceId = ""; + _queryOptions = Map.of(); } public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId, - String instanceId) { + String instanceId, Map<String, String> queryOptions) { _routingManager = routingManager; _hostName = hostName; _port = port; _requestId = requestId; _instanceId = instanceId; + _queryOptions = queryOptions == null ? Map.of() : queryOptions; } public Supplier<Integer> getNodeIdGenerator() { @@ -107,6 +110,10 @@ public class PhysicalPlannerContext { return _instanceId; } + public Map<String, String> getQueryOptions() { + return _queryOptions; + } + public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String> queryOptions) { if (queryOptions == null) { return false; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 4bbdc8701a..9bca603e3f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -40,6 +40,7 @@ import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.SubPlan; import org.apache.pinot.query.planner.SubPlanMetadata; import org.apache.pinot.query.planner.physical.v2.PRelNode; +import org.apache.pinot.query.planner.physical.v2.PRelNodeTreeValidator; import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment; import org.apache.pinot.query.planner.plannode.BasePlanNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; @@ -95,6 +96,8 @@ public class PinotLogicalQueryPlanner { public static Pair<SubPlan, PlanFragmentAndMailboxAssignment.Result> makePlanV2(RelRoot relRoot, PhysicalPlannerContext physicalPlannerContext) { PRelNode pRelNode = (PRelNode) relRoot.rel; + // TODO(mse-physical): Don't emit metrics for explain statements. + PRelNodeTreeValidator.emitMetrics(pRelNode); PlanFragmentAndMailboxAssignment planFragmentAndMailboxAssignment = new PlanFragmentAndMailboxAssignment(); PlanFragmentAndMailboxAssignment.Result result = planFragmentAndMailboxAssignment.compute(pRelNode, physicalPlannerContext); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java new file mode 100644 index 0000000000..b103696165 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.physical.v2; + +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Window; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; + + +/** + * Centralizes validations for the optimized PRelNode tree. + */ +public class PRelNodeTreeValidator { + private static final BrokerMetrics BROKER_METRICS = BrokerMetrics.get(); + + private PRelNodeTreeValidator() { + } + + /** + * Validate the tree rooted at the given PRelNode. Ideally all issues with the plan should be caught even with an + * EXPLAIN, hence this method should be called as part of query compilation itself. + */ + public static void validate(PRelNode rootNode) { + // TODO(mse-physical): Add plan validations here. + } + + /** + * Emit metrics about the given plan tree. This should be avoided for Explain statements since metrics are not really + * helpful there and can be misleading. + */ + public static void emitMetrics(PRelNode pRelNode) { + Context context = new Context(); + walk(pRelNode, context); + if (context._joinCount > 0) { + BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, context._joinCount); + BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 1); + } + if (context._windowCount > 0) { + BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, context._windowCount); + BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_WINDOW, 1); + } + } + + private static void walk(PRelNode pRelNode, Context context) { + if (pRelNode.unwrap() instanceof Join) { + context._joinCount++; + } else if (pRelNode.unwrap() instanceof Window) { + context._windowCount++; + } + for (PRelNode input : pRelNode.getPRelInputs()) { + walk(input, context); + } + } + + private static class Context { + int _joinCount = 0; + int _windowCount = 0; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java index bdbf785719..6b6ff0d838 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import java.util.Set; -import javax.annotation.Nullable; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelNode; @@ -44,14 +43,12 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; -import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.logical.RexExpressionUtils; -import org.apache.pinot.query.planner.logical.TransformationTracker; import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate; import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange; import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin; @@ -75,14 +72,7 @@ public class PRelToPlanNodeConverter { private static final Logger LOGGER = LoggerFactory.getLogger(PRelToPlanNodeConverter.class); private static final int DEFAULT_STAGE_ID = -1; - private final BrokerMetrics _brokerMetrics = BrokerMetrics.get(); - private boolean _joinFound; - private boolean _windowFunctionFound; - @Nullable - private final TransformationTracker.Builder<PlanNode, RelNode> _tracker; - - public PRelToPlanNodeConverter(@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) { - _tracker = tracker; + private PRelToPlanNodeConverter() { } /** @@ -105,18 +95,8 @@ public class PRelToPlanNodeConverter { } else if (node instanceof Exchange) { result = convertPhysicalExchange((PhysicalExchange) node); } else if (node instanceof PhysicalJoin) { - /* _brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1); - if (!_joinFound) { - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 1); - _joinFound = true; - } */ result = convertJoin((PhysicalJoin) node); } else if (node instanceof Window) { - /* _brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1); - if (!_windowFunctionFound) { - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_WINDOW, 1); - _windowFunctionFound = true; - } */ result = convertWindow((Window) node); } else if (node instanceof Values) { result = convertValues((Values) node); @@ -126,9 +106,6 @@ public class PRelToPlanNodeConverter { throw new IllegalStateException("Unsupported RelNode: " + node); } result.setStageId(stageId); - /* if (_tracker != null) { - _tracker.trackCreation(node, result); - } */ return result; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java index 20c891031d..79016e7abb 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java @@ -40,7 +40,12 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.DatabaseUtils; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.UploadedRealtimeSegmentName; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.ServerRouteInfo; @@ -87,6 +92,8 @@ import org.slf4j.LoggerFactory; */ public class LeafStageWorkerAssignmentRule extends PRelOptRule { private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageWorkerAssignmentRule.class); + private static final int LIMIT_OF_INVALID_SEGMENTS_TO_LOG = 3; + private static final BrokerMetrics BROKER_METRICS = BrokerMetrics.get(); private final TableCache _tableCache; private final RoutingManager _routingManager; private final PhysicalPlannerContext _physicalPlannerContext; @@ -163,8 +170,10 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { List<String> fieldNames = tableScan.getRowType().getFieldNames(); Map<String, TablePartitionInfo> tablePartitionInfoMap = calculateTablePartitionInfo(tableName, routingTableMap.keySet()); + boolean inferInvalidPartitionSegment = QueryOptionsUtils.isInferInvalidSegmentPartition( + _physicalPlannerContext.getQueryOptions()); TableScanWorkerAssignmentResult workerAssignmentResult = assignTableScan(tableName, fieldNames, - instanceIdToSegments, tablePartitionInfoMap); + instanceIdToSegments, tablePartitionInfoMap, inferInvalidPartitionSegment); TableScanMetadata metadata = new TableScanMetadata(Set.of(tableName), workerAssignmentResult._workerIdToSegmentsMap, tableOptions, segmentUnavailableMap, timeBoundaryInfo); return tableScan.with(workerAssignmentResult._pinotDataDistribution, metadata); @@ -176,23 +185,29 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { */ @VisibleForTesting static TableScanWorkerAssignmentResult assignTableScan(String tableName, List<String> fieldNames, - InstanceIdToSegments instanceIdToSegments, Map<String, TablePartitionInfo> tpiMap) { + InstanceIdToSegments instanceIdToSegments, Map<String, TablePartitionInfo> tpiMap, + boolean inferInvalidPartitionSegment) { Set<String> tableTypes = instanceIdToSegments.getActiveTableTypes(); Set<String> partitionedTableTypes = tableTypes.stream().filter(tpiMap::containsKey).collect(Collectors.toSet()); Preconditions.checkState(!tableTypes.isEmpty(), "No routing entry for offline or realtime type"); if (tableTypes.equals(partitionedTableTypes)) { + // TODO(mse-physical): Support auto-partitioning inference for Hybrid tables. if (partitionedTableTypes.size() == 1) { // Attempt partitioned distribution String tableType = partitionedTableTypes.iterator().next(); String tableNameWithType = TableNameBuilder.forType(TableType.valueOf(tableType)).tableNameWithType(tableName); + TablePartitionInfo tpi = tpiMap.get(tableType); TableScanWorkerAssignmentResult assignmentResult = attemptPartitionedDistribution(tableNameWithType, - fieldNames, instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)), tpiMap.get(tableType)); + fieldNames, instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)), tpi, + inferInvalidPartitionSegment); + if (tpi != null && CollectionUtils.isNotEmpty(tpi.getSegmentsWithInvalidPartition())) { + // Use SegmentPartitionMetadataManager's logs to find the segments with invalid partitions. + BROKER_METRICS.addMeteredTableValue(tableNameWithType, BrokerMeter.INVALID_SEGMENT_PARTITION_IN_QUERY, + tpi.getSegmentsWithInvalidPartition().size()); + } if (assignmentResult != null) { return assignmentResult; } - } else { - // TODO(mse-physical): Support automatic partitioned dist for hybrid tables. - LOGGER.warn("Automatic Partitioned Distribution not supported for Hybrid Tables yet"); } } // For each server, we want to know the segments for each table-type. @@ -234,15 +249,10 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { @VisibleForTesting static TableScanWorkerAssignmentResult attemptPartitionedDistribution(String tableNameWithType, List<String> fieldNames, Map<String, List<String>> instanceIdToSegmentsMap, - @Nullable TablePartitionInfo tablePartitionInfo) { + @Nullable TablePartitionInfo tablePartitionInfo, boolean inferInvalidSegmentPartition) { if (tablePartitionInfo == null) { return null; } - if (CollectionUtils.isNotEmpty(tablePartitionInfo.getSegmentsWithInvalidPartition())) { - LOGGER.warn("Table {} has {} segments with invalid partition info. Will assume un-partitioned distribution", - tableNameWithType, tablePartitionInfo.getSegmentsWithInvalidPartition().size()); - return null; - } String tableType = Objects.requireNonNull(TableNameBuilder.getTableTypeFromTableName(tableNameWithType), "Illegal state: expected table name with type").toString(); @@ -260,6 +270,14 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { // ==> scan will have a single stream, so partitioned distribution doesn't matter return null; } + Map<Integer, List<String>> invalidSegmentsByInferredPartition; + try { + invalidSegmentsByInferredPartition = getInvalidSegmentsByInferredPartition( + tablePartitionInfo.getSegmentsWithInvalidPartition(), inferInvalidSegmentPartition, tableNameWithType, + numPartitions); + } catch (Throwable t) { + return null; + } // Pre-compute segmentToServer map for quick lookup later. Map<String, String> segmentToServer = new HashMap<>(); for (var entry : instanceIdToSegmentsMap.entrySet()) { @@ -292,6 +310,9 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { } } } + if (invalidSegmentsByInferredPartition.containsKey(partitionNum)) { + selectedSegments.addAll(invalidSegmentsByInferredPartition.get(partitionNum)); + } segmentsByPartition.put(partitionNum, selectedSegments); } // Initialize workers list. Initially each element is empty. We have 1 worker for each selected server. @@ -326,6 +347,58 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { return new TableScanWorkerAssignmentResult(dataDistribution, workerIdToSegmentsMap); } + /** + * Infers partition from invalid segments if the passed flag is set to true. Inference is done by simply: + * <ol> + * <li>Extracting the stream partition number from the segment name</li> + * <li>Doing a modulus with the numPartitions.</li> + * </ol> + */ + @VisibleForTesting + static Map<Integer, List<String>> getInvalidSegmentsByInferredPartition(@Nullable List<String> invalidSegments, + boolean inferPartitionsForInvalidSegments, String tableNameWithType, int numPartitions) { + if (CollectionUtils.isEmpty(invalidSegments)) { + return Map.of(); + } + if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType)) + || !inferPartitionsForInvalidSegments) { + throw new IllegalStateException(String.format("Table %s has %s segments with invalid partition info. Will " + + "assume un-partitioned distribution. Sampled: %s", tableNameWithType, invalidSegments.size(), + sampleSegmentsForLogging(invalidSegments))); + } + Map<Integer, List<String>> invalidSegmentsByInferredPartition = new HashMap<>(); + for (String invalidPartitionSegment : invalidSegments) { + int partitionId = inferPartitionId(invalidPartitionSegment, numPartitions); + if (partitionId == -1) { + throw new IllegalStateException(String.format("Could not infer partition for segment: %s. Falling back to " + + "un-partitioned distribution", invalidPartitionSegment)); + } + invalidSegmentsByInferredPartition.computeIfAbsent(partitionId, (x) -> new ArrayList<>()).add( + invalidPartitionSegment); + } + return invalidSegmentsByInferredPartition; + } + + @VisibleForTesting + static int inferPartitionId(String segmentName, int numPartitions) { + if (LLCSegmentName.isLLCSegment(segmentName)) { + LLCSegmentName llc = LLCSegmentName.of(segmentName); + return llc != null ? (llc.getPartitionGroupId() % numPartitions) : -1; + } else if (UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName)) { + UploadedRealtimeSegmentName uploaded = UploadedRealtimeSegmentName.of(segmentName); + return uploaded != null ? (uploaded.getPartitionId() % numPartitions) : -1; + } + return -1; + } + + @VisibleForTesting + static List<String> sampleSegmentsForLogging(List<String> segments) { + if (segments.size() > LIMIT_OF_INVALID_SEGMENTS_TO_LOG) { + return segments.subList(0, LIMIT_OF_INVALID_SEGMENTS_TO_LOG); + } + return segments; + } + private Map<String, TablePartitionInfo> calculateTablePartitionInfo(String tableName, Set<String> tableTypes) { Map<String, TablePartitionInfo> result = new HashMap<>(); if (tableTypes.contains("OFFLINE")) { diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java index eed658a96b..9dd8724add 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.planner.physical.v2.opt.rules; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import static org.testng.Assert.*; public class LeafStageWorkerAssignmentRuleTest { private static final String TABLE_NAME = "testTable"; + private static final String INVALID_SEGMENT_PARTITION = "testTable__1__35__20250509T1444Z"; private static final List<String> FIELDS_IN_SCAN = List.of("userId", "orderId", "orderAmount", "cityId", "cityName"); private static final String PARTITION_COLUMN = "userId"; private static final String PARTITION_FUNCTION = "murmur"; @@ -47,6 +49,7 @@ public class LeafStageWorkerAssignmentRuleTest { private static final int REALTIME_NUM_PARTITIONS = 8; private static final InstanceIdToSegments OFFLINE_INSTANCE_ID_TO_SEGMENTS; private static final InstanceIdToSegments REALTIME_INSTANCE_ID_TO_SEGMENTS; + private static final InstanceIdToSegments REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS; private static final InstanceIdToSegments HYBRID_INSTANCE_ID_TO_SEGMENTS; static { @@ -56,6 +59,9 @@ public class LeafStageWorkerAssignmentRuleTest { OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap = offlineSegmentsMap; REALTIME_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments(); REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap = realtimeSegmentsMap; + REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS = new InstanceIdToSegments(); + REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap + = createRealtimeSegmentsMapWithInvalidPartitionSegments(); HYBRID_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments(); HYBRID_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap = offlineSegmentsMap; HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap = realtimeSegmentsMap; @@ -64,7 +70,7 @@ public class LeafStageWorkerAssignmentRuleTest { @Test public void testAssignTableScanWithUnPartitionedOfflineTable() { TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, - OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of()); + OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of(), false); assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED); assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY); @@ -75,7 +81,7 @@ public class LeafStageWorkerAssignmentRuleTest { @Test public void testAssignTableScanWithUnPartitionedRealtimeTable() { TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, - REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of()); + REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of(), false); assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED); assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY); @@ -86,7 +92,7 @@ public class LeafStageWorkerAssignmentRuleTest { @Test public void testAssignTableScanWithUnPartitionedHybridTable() { TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, - HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of()); + HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of(), false); assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED); assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY); @@ -98,7 +104,7 @@ public class LeafStageWorkerAssignmentRuleTest { @Test public void testAssignTableScanPartitionedOfflineTable() { TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, - OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", createOfflineTablePartitionInfo())); + OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", createOfflineTablePartitionInfo()), false); // Basic checks. assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.HASH_DISTRIBUTED); assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); @@ -114,7 +120,7 @@ public class LeafStageWorkerAssignmentRuleTest { @Test public void testAssignTableScanPartitionedRealtimeTable() { TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, - REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME", createRealtimeTablePartitionInfo())); + REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME", createRealtimeTablePartitionInfo()), false); // Basic checks. assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.HASH_DISTRIBUTED); assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); @@ -127,11 +133,48 @@ public class LeafStageWorkerAssignmentRuleTest { validateTableScanAssignment(result, REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME"); } + @Test + public void testAssignTableScanPartitionedRealtimeTableWithSomeInvalidPartitionSegments() { + // In both the cases when the inference for invalid partition segments is turned on/off, the instance id to segments + // assignment will be the same, because that simply depends on the Routing Table selection. The only difference will + // be in the PinotDataDistribution. It will be hash distributed when the feature is turned on, and random otherwise. + { + // Case-1: When inferInvalidPartitionSegment is set to true. + TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, + REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS, Map.of("REALTIME", + createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), true); + // Basic checks. + assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.HASH_DISTRIBUTED); + assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); + assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY); + assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 1); + HashDistributionDesc desc = result._pinotDataDistribution.getHashDistributionDesc().iterator().next(); + assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS); + assertEquals(desc.getKeys(), List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN))); + assertEquals(desc.getHashFunction(), PARTITION_FUNCTION); + validateTableScanAssignment(result, + REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap, "REALTIME"); + } + { + // Case-2: When inferInvalidPartitionSegment is set to false. + TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, + REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS, Map.of("REALTIME", + createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), false); + // Basic checks. + assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED); + assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); + assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY); + assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 0); + validateTableScanAssignment(result, + REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap, "REALTIME"); + } + } + @Test public void testAssignTableScanPartitionedHybridTable() { TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN, HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", createOfflineTablePartitionInfo(), - "REALTIME", createRealtimeTablePartitionInfo())); + "REALTIME", createRealtimeTablePartitionInfo()), false); assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED); assertEquals(result._pinotDataDistribution.getWorkers().size(), 4); assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY); @@ -140,6 +183,65 @@ public class LeafStageWorkerAssignmentRuleTest { validateTableScanAssignment(result, HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME"); } + @Test + public void testGetInvalidSegmentsByInferredPartitionWhenSegmentNamesDontConform() { + final int numPartitions = 4; // arbitrary for this test + final boolean inferPartitions = true; + final String tableNameWithType = "foobar_REALTIME"; + assertThrows(IllegalStateException.class, () -> LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition( + List.of("foobar_123"), inferPartitions, tableNameWithType, numPartitions)); + assertThrows(IllegalStateException.class, () -> LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition( + List.of("foobar_123_123"), inferPartitions, tableNameWithType, numPartitions)); + assertThrows(IllegalStateException.class, () -> LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition( + List.of("foobar_123_123_123"), inferPartitions, tableNameWithType, numPartitions)); + assertThrows(IllegalStateException.class, () -> LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition( + List.of("foobar__9__35__20250509T1444Z", "foobar_123_123_123"), inferPartitions, tableNameWithType, + numPartitions)); + } + + @Test + public void testGetInvalidSegmentsByInferredPartitionWhenValidRealtimeSegmentNames() { + final boolean inferPartitions = true; + final String tableNameWithType = "foobar_REALTIME"; + // Should return segments by inferred partition when valid LLC Segment Name. + assertEquals(Map.of(9, List.of("foobar__9__35__20250509T1444Z")), + LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of("foobar__9__35__20250509T1444Z"), + inferPartitions, tableNameWithType, 256)); + assertEquals(Map.of(101, List.of("foobar__101__35__20250509T1Z")), + LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of("foobar__101__35__20250509T1Z"), + inferPartitions, tableNameWithType, 256)); + // Should return segments by inferred partition when valid Uploaded segment name. + assertEquals(Map.of(11, List.of("uploaded__table_name__11__20240530T0000Z__suffix")), + LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of( + "uploaded__table_name__11__20240530T0000Z__suffix"), inferPartitions, tableNameWithType, 256)); + // Should handle when numPartitions is less than kafka partition count. + assertEquals(Map.of(1, List.of("foobar__9__35__20250509T1444Z")), + LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of("foobar__9__35__20250509T1444Z"), + inferPartitions, tableNameWithType, 8)); + } + + @Test + public void testInferPartitionId() { + // Valid name cases. When numPartitions is less than the stream partition number, then we expect modulus to be used. + assertEquals(9, LeafStageWorkerAssignmentRule.inferPartitionId("foobar__9__35__20250509T1444Z", 16)); + assertEquals(1, LeafStageWorkerAssignmentRule.inferPartitionId("foobar__9__35__20250509T1444Z", 8)); + assertEquals(0, LeafStageWorkerAssignmentRule.inferPartitionId("foobar__16__35__20250509T1444Z", 16)); + assertEquals(16, LeafStageWorkerAssignmentRule.inferPartitionId("foobar__16__35__20250509T1444Z", 32)); + // Invalid segment name case. + assertEquals(-1, LeafStageWorkerAssignmentRule.inferPartitionId("foobar_invalid_123_123", 4)); + } + + @Test + public void testSampleSegmentsForLogging() { + assertEquals(List.of(), LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(List.of())); + assertEquals(List.of("s0"), LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(List.of("s0"))); + assertEquals(List.of("s0", "s1"), LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(List.of("s0", "s1"))); + assertEquals(List.of("s0", "s1", "s2"), LeafStageWorkerAssignmentRule.sampleSegmentsForLogging( + List.of("s0", "s1", "s2"))); + assertEquals(List.of("s0", "s1", "s2"), LeafStageWorkerAssignmentRule.sampleSegmentsForLogging( + List.of("s0", "s1", "s2", "s3", "s4"))); + } + private static void validateTableScanAssignment(TableScanWorkerAssignmentResult assignmentResult, Map<String, List<String>> instanceIdToSegmentsMap, String tableType) { Map<String, List<String>> actualInstanceIdToSegments = new HashMap<>(); @@ -173,6 +275,14 @@ public class LeafStageWorkerAssignmentRuleTest { return result; } + private static Map<String, List<String>> createRealtimeSegmentsMapWithInvalidPartitionSegments() { + Map<String, List<String>> result = createRealtimeSegmentsMap(); + List<String> segments = new ArrayList<>(result.get("instance-1")); + segments.add(INVALID_SEGMENT_PARTITION); + result.put("instance-1", segments); + return result; + } + private static TablePartitionInfo createOfflineTablePartitionInfo() { TablePartitionInfo.PartitionInfo[] infos = new TablePartitionInfo.PartitionInfo[OFFLINE_NUM_PARTITIONS]; for (int partitionNum = 0; partitionNum < OFFLINE_NUM_PARTITIONS; partitionNum++) { @@ -190,6 +300,10 @@ public class LeafStageWorkerAssignmentRuleTest { } private static TablePartitionInfo createRealtimeTablePartitionInfo() { + return createRealtimeTablePartitionInfo(List.of()); + } + + private static TablePartitionInfo createRealtimeTablePartitionInfo(List<String> invalidSegments) { TablePartitionInfo.PartitionInfo[] infos = new TablePartitionInfo.PartitionInfo[REALTIME_NUM_PARTITIONS]; for (int partitionNum = 0; partitionNum < REALTIME_NUM_PARTITIONS; partitionNum++) { String selectedInstance = String.format("instance-%s", partitionNum % NUM_SERVERS); @@ -202,6 +316,6 @@ public class LeafStageWorkerAssignmentRuleTest { segments); } return new TablePartitionInfo(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(TABLE_NAME), - PARTITION_COLUMN, PARTITION_FUNCTION, REALTIME_NUM_PARTITIONS, infos, List.of()); + PARTITION_COLUMN, PARTITION_FUNCTION, REALTIME_NUM_PARTITIONS, infos, invalidSegments); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index cd6308a30e..034fd702b4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -614,6 +614,12 @@ public class CommonConstants { // Use MSE compiler when trying to fill a response with no schema metadata // (overrides the "pinot.broker.use.mse.to.fill.empty.response.schema" broker conf) public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = "useMSEToFillEmptyResponseSchema"; + + // Used by the MSE Engine when auto-inferring data partitioning. Realtime streams can often incorrectly assign + // records to stream partitions, which can make a segment have multiple partitions. The scale of this is + // usually low, and this query option allows the MSE Optimizer to infer the partition of a segment based on its + // name, when that segment has multiple partitions in its columnPartitionMap. + public static final String INFER_INVALID_SEGMENT_PARTITION = "inferInvalidSegmentPartition"; } public static class QueryOptionValue { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org