This is an automated email from the ASF dual-hosted git repository.

chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b82a416c0 [multistage] Add Support for Inferring Invalid Segment 
Partition Id (#15760)
8b82a416c0 is described below

commit 8b82a416c0bdc103a0b6e3e6f4c36d4d62f62d13
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Sat May 10 01:06:09 2025 -0500

    [multistage] Add Support for Inferring Invalid Segment Partition Id (#15760)
    
    * [multistage] Add Support for Inferring Invalid Segment Partition Id
    
    * address feedback
    
    * address more feedback
    
    * more cleanups
---
 .../apache/pinot/common/metrics/BrokerMeter.java   |   8 ++
 .../common/utils/config/QueryOptionsUtils.java     |   4 +
 .../org/apache/pinot/query/QueryEnvironment.java   |   5 +-
 .../query/context/PhysicalPlannerContext.java      |   9 +-
 .../planner/logical/PinotLogicalQueryPlanner.java  |   3 +
 .../planner/physical/v2/PRelNodeTreeValidator.java |  76 ++++++++++++
 .../physical/v2/PRelToPlanNodeConverter.java       |  25 +---
 .../opt/rules/LeafStageWorkerAssignmentRule.java   |  97 ++++++++++++++--
 .../rules/LeafStageWorkerAssignmentRuleTest.java   | 128 +++++++++++++++++++--
 .../apache/pinot/spi/utils/CommonConstants.java    |   6 +
 10 files changed, 316 insertions(+), 45 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index c27ea83f02..99603b2c77 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -204,6 +204,14 @@ public class BrokerMeter implements AbstractMetrics.Meter {
    */
   public static final BrokerMeter WINDOW_COUNT = create("WINDOW_COUNT", 
"queries", true);
 
+  /**
+   * How many MSE queries have encountered segments with invalid partitions.
+   * <p>
+   * This is only emitted for when usePhysicalOptimizer is set to true.
+   */
+  public static final BrokerMeter INVALID_SEGMENT_PARTITION_IN_QUERY = 
create("INVALID_SEGMENT_PARTITION_IN_QUERY",
+      "queries", false);
+
   /**
    * Number of queries executed with cursors. This count includes queries that 
use SSE and MSE
    */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 622dff1b8f..399ddfc510 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -379,6 +379,10 @@ public class QueryOptionsUtils {
     return useMSEToFillEmptySchema != null ? 
Boolean.parseBoolean(useMSEToFillEmptySchema) : defaultValue;
   }
 
+  public static boolean isInferInvalidSegmentPartition(Map<String, String> 
queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_INVALID_SEGMENT_PARTITION,
 "false"));
+  }
+
   @Nullable
   private static Integer uncheckedParseInt(String optionName, @Nullable String 
optionValue) {
     if (optionValue == null) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index ea250c9d6f..092fe60c2e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -79,6 +79,8 @@ import 
org.apache.pinot.query.planner.logical.RelToPlanNodeConverter;
 import org.apache.pinot.query.planner.logical.TransformationTracker;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
 import org.apache.pinot.query.planner.physical.PinotDispatchPlanner;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PRelNodeTreeValidator;
 import 
org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
 import org.apache.pinot.query.planner.physical.v2.RelToPRelConverter;
 import org.apache.pinot.query.planner.plannode.PlanNode;
@@ -175,7 +177,7 @@ public class QueryEnvironment {
       workerManager = _envConfig.getWorkerManager();
       physicalPlannerContext = new 
PhysicalPlannerContext(workerManager.getRoutingManager(),
           workerManager.getHostName(), workerManager.getPort(), 
_envConfig.getRequestId(),
-          workerManager.getInstanceId());
+          workerManager.getInstanceId(), sqlNodeAndOptions.getOptions());
     }
     return new PlannerContext(_config, _catalogReader, _typeFactory, 
_optProgram, traitProgram,
         sqlNodeAndOptions.getOptions(), _envConfig, format, 
physicalPlannerContext);
@@ -333,6 +335,7 @@ public class QueryEnvironment {
       Preconditions.checkNotNull(plannerContext.getPhysicalPlannerContext(), 
"Physical planner context is null");
       optimized = RelToPRelConverter.toPRelNode(optimized, 
plannerContext.getPhysicalPlannerContext(),
           _envConfig.getTableCache()).unwrap();
+      PRelNodeTreeValidator.validate((PRelNode) optimized);
     }
     return relation.withRel(optimized);
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 6035306be8..84ca486650 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -56,6 +56,7 @@ public class PhysicalPlannerContext {
    * Instance ID of the instance corresponding to this process.
    */
   private final String _instanceId;
+  private final Map<String, String> _queryOptions;
 
   /**
    * Used by controller when it needs to extract table names from the query.
@@ -67,15 +68,17 @@ public class PhysicalPlannerContext {
     _port = 0;
     _requestId = 0;
     _instanceId = "";
+    _queryOptions = Map.of();
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
-      String instanceId) {
+      String instanceId, Map<String, String> queryOptions) {
     _routingManager = routingManager;
     _hostName = hostName;
     _port = port;
     _requestId = requestId;
     _instanceId = instanceId;
+    _queryOptions = queryOptions == null ? Map.of() : queryOptions;
   }
 
   public Supplier<Integer> getNodeIdGenerator() {
@@ -107,6 +110,10 @@ public class PhysicalPlannerContext {
     return _instanceId;
   }
 
+  public Map<String, String> getQueryOptions() {
+    return _queryOptions;
+  }
+
   public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String> 
queryOptions) {
     if (queryOptions == null) {
       return false;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 4bbdc8701a..9bca603e3f 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -40,6 +40,7 @@ import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.SubPlan;
 import org.apache.pinot.query.planner.SubPlanMetadata;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PRelNodeTreeValidator;
 import 
org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
 import org.apache.pinot.query.planner.plannode.BasePlanNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -95,6 +96,8 @@ public class PinotLogicalQueryPlanner {
   public static Pair<SubPlan, PlanFragmentAndMailboxAssignment.Result> 
makePlanV2(RelRoot relRoot,
       PhysicalPlannerContext physicalPlannerContext) {
     PRelNode pRelNode = (PRelNode) relRoot.rel;
+    // TODO(mse-physical): Don't emit metrics for explain statements.
+    PRelNodeTreeValidator.emitMetrics(pRelNode);
     PlanFragmentAndMailboxAssignment planFragmentAndMailboxAssignment = new 
PlanFragmentAndMailboxAssignment();
     PlanFragmentAndMailboxAssignment.Result result =
         planFragmentAndMailboxAssignment.compute(pRelNode, 
physicalPlannerContext);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
new file mode 100644
index 0000000000..b103696165
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Window;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+
+
+/**
+ * Centralizes validations for the optimized PRelNode tree.
+ */
+public class PRelNodeTreeValidator {
+  private static final BrokerMetrics BROKER_METRICS = BrokerMetrics.get();
+
+  private PRelNodeTreeValidator() {
+  }
+
+  /**
+   * Validate the tree rooted at the given PRelNode. Ideally all issues with 
the plan should be caught even with an
+   * EXPLAIN, hence this method should be called as part of query compilation 
itself.
+   */
+  public static void validate(PRelNode rootNode) {
+    // TODO(mse-physical): Add plan validations here.
+  }
+
+  /**
+   * Emit metrics about the given plan tree. This should be avoided for 
Explain statements since metrics are not really
+   * helpful there and can be misleading.
+   */
+  public static void emitMetrics(PRelNode pRelNode) {
+    Context context = new Context();
+    walk(pRelNode, context);
+    if (context._joinCount > 0) {
+      BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 
context._joinCount);
+      BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 1);
+    }
+    if (context._windowCount > 0) {
+      BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 
context._windowCount);
+      BROKER_METRICS.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_WINDOW, 1);
+    }
+  }
+
+  private static void walk(PRelNode pRelNode, Context context) {
+    if (pRelNode.unwrap() instanceof Join) {
+      context._joinCount++;
+    } else if (pRelNode.unwrap() instanceof Window) {
+      context._windowCount++;
+    }
+    for (PRelNode input : pRelNode.getPRelInputs()) {
+      walk(input, context);
+    }
+  }
+
+  private static class Context {
+    int _joinCount = 0;
+    int _windowCount = 0;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
index bdbf785719..6b6ff0d838 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
@@ -44,14 +43,12 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
-import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.logical.RexExpressionUtils;
-import org.apache.pinot.query.planner.logical.TransformationTracker;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
@@ -75,14 +72,7 @@ public class PRelToPlanNodeConverter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PRelToPlanNodeConverter.class);
   private static final int DEFAULT_STAGE_ID = -1;
 
-  private final BrokerMetrics _brokerMetrics = BrokerMetrics.get();
-  private boolean _joinFound;
-  private boolean _windowFunctionFound;
-  @Nullable
-  private final TransformationTracker.Builder<PlanNode, RelNode> _tracker;
-
-  public PRelToPlanNodeConverter(@Nullable 
TransformationTracker.Builder<PlanNode, RelNode> tracker) {
-    _tracker = tracker;
+  private PRelToPlanNodeConverter() {
   }
 
   /**
@@ -105,18 +95,8 @@ public class PRelToPlanNodeConverter {
     } else if (node instanceof Exchange) {
       result = convertPhysicalExchange((PhysicalExchange) node);
     } else if (node instanceof PhysicalJoin) {
-      /* _brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1);
-      if (!_joinFound) {
-        _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 
1);
-        _joinFound = true;
-      } */
       result = convertJoin((PhysicalJoin) node);
     } else if (node instanceof Window) {
-      /* _brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1);
-      if (!_windowFunctionFound) {
-        _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_WINDOW, 
1);
-        _windowFunctionFound = true;
-      } */
       result = convertWindow((Window) node);
     } else if (node instanceof Values) {
       result = convertValues((Values) node);
@@ -126,9 +106,6 @@ public class PRelToPlanNodeConverter {
       throw new IllegalStateException("Unsupported RelNode: " + node);
     }
     result.setStageId(stageId);
-    /* if (_tracker != null) {
-      _tracker.trackCreation(node, result);
-    } */
     return result;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 20c891031d..79016e7abb 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -40,7 +40,12 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.ServerRouteInfo;
@@ -87,6 +92,8 @@ import org.slf4j.LoggerFactory;
  */
 public class LeafStageWorkerAssignmentRule extends PRelOptRule {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LeafStageWorkerAssignmentRule.class);
+  private static final int LIMIT_OF_INVALID_SEGMENTS_TO_LOG = 3;
+  private static final BrokerMetrics BROKER_METRICS = BrokerMetrics.get();
   private final TableCache _tableCache;
   private final RoutingManager _routingManager;
   private final PhysicalPlannerContext _physicalPlannerContext;
@@ -163,8 +170,10 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     List<String> fieldNames = tableScan.getRowType().getFieldNames();
     Map<String, TablePartitionInfo> tablePartitionInfoMap = 
calculateTablePartitionInfo(tableName,
         routingTableMap.keySet());
+    boolean inferInvalidPartitionSegment = 
QueryOptionsUtils.isInferInvalidSegmentPartition(
+        _physicalPlannerContext.getQueryOptions());
     TableScanWorkerAssignmentResult workerAssignmentResult = 
assignTableScan(tableName, fieldNames,
-        instanceIdToSegments, tablePartitionInfoMap);
+        instanceIdToSegments, tablePartitionInfoMap, 
inferInvalidPartitionSegment);
     TableScanMetadata metadata = new TableScanMetadata(Set.of(tableName), 
workerAssignmentResult._workerIdToSegmentsMap,
         tableOptions, segmentUnavailableMap, timeBoundaryInfo);
     return tableScan.with(workerAssignmentResult._pinotDataDistribution, 
metadata);
@@ -176,23 +185,29 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
    */
   @VisibleForTesting
   static TableScanWorkerAssignmentResult assignTableScan(String tableName, 
List<String> fieldNames,
-      InstanceIdToSegments instanceIdToSegments, Map<String, 
TablePartitionInfo> tpiMap) {
+      InstanceIdToSegments instanceIdToSegments, Map<String, 
TablePartitionInfo> tpiMap,
+      boolean inferInvalidPartitionSegment) {
     Set<String> tableTypes = instanceIdToSegments.getActiveTableTypes();
     Set<String> partitionedTableTypes = 
tableTypes.stream().filter(tpiMap::containsKey).collect(Collectors.toSet());
     Preconditions.checkState(!tableTypes.isEmpty(), "No routing entry for 
offline or realtime type");
     if (tableTypes.equals(partitionedTableTypes)) {
+      // TODO(mse-physical): Support auto-partitioning inference for Hybrid 
tables.
       if (partitionedTableTypes.size() == 1) {
         // Attempt partitioned distribution
         String tableType = partitionedTableTypes.iterator().next();
         String tableNameWithType = 
TableNameBuilder.forType(TableType.valueOf(tableType)).tableNameWithType(tableName);
+        TablePartitionInfo tpi = tpiMap.get(tableType);
         TableScanWorkerAssignmentResult assignmentResult = 
attemptPartitionedDistribution(tableNameWithType,
-            fieldNames, 
instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)), 
tpiMap.get(tableType));
+            fieldNames, 
instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)), tpi,
+            inferInvalidPartitionSegment);
+        if (tpi != null && 
CollectionUtils.isNotEmpty(tpi.getSegmentsWithInvalidPartition())) {
+          // Use SegmentPartitionMetadataManager's logs to find the segments 
with invalid partitions.
+          BROKER_METRICS.addMeteredTableValue(tableNameWithType, 
BrokerMeter.INVALID_SEGMENT_PARTITION_IN_QUERY,
+              tpi.getSegmentsWithInvalidPartition().size());
+        }
         if (assignmentResult != null) {
           return assignmentResult;
         }
-      } else {
-        // TODO(mse-physical): Support automatic partitioned dist for hybrid 
tables.
-        LOGGER.warn("Automatic Partitioned Distribution not supported for 
Hybrid Tables yet");
       }
     }
     // For each server, we want to know the segments for each table-type.
@@ -234,15 +249,10 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
   @VisibleForTesting
   static TableScanWorkerAssignmentResult attemptPartitionedDistribution(String 
tableNameWithType,
       List<String> fieldNames, Map<String, List<String>> 
instanceIdToSegmentsMap,
-      @Nullable TablePartitionInfo tablePartitionInfo) {
+      @Nullable TablePartitionInfo tablePartitionInfo, boolean 
inferInvalidSegmentPartition) {
     if (tablePartitionInfo == null) {
       return null;
     }
-    if 
(CollectionUtils.isNotEmpty(tablePartitionInfo.getSegmentsWithInvalidPartition()))
 {
-      LOGGER.warn("Table {} has {} segments with invalid partition info. Will 
assume un-partitioned distribution",
-          tableNameWithType, 
tablePartitionInfo.getSegmentsWithInvalidPartition().size());
-      return null;
-    }
     String tableType =
         
Objects.requireNonNull(TableNameBuilder.getTableTypeFromTableName(tableNameWithType),
             "Illegal state: expected table name with type").toString();
@@ -260,6 +270,14 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
       // ==> scan will have a single stream, so partitioned distribution 
doesn't matter
       return null;
     }
+    Map<Integer, List<String>> invalidSegmentsByInferredPartition;
+    try {
+      invalidSegmentsByInferredPartition = 
getInvalidSegmentsByInferredPartition(
+          tablePartitionInfo.getSegmentsWithInvalidPartition(), 
inferInvalidSegmentPartition, tableNameWithType,
+          numPartitions);
+    } catch (Throwable t) {
+      return null;
+    }
     // Pre-compute segmentToServer map for quick lookup later.
     Map<String, String> segmentToServer = new HashMap<>();
     for (var entry : instanceIdToSegmentsMap.entrySet()) {
@@ -292,6 +310,9 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
           }
         }
       }
+      if (invalidSegmentsByInferredPartition.containsKey(partitionNum)) {
+        
selectedSegments.addAll(invalidSegmentsByInferredPartition.get(partitionNum));
+      }
       segmentsByPartition.put(partitionNum, selectedSegments);
     }
     // Initialize workers list. Initially each element is empty. We have 1 
worker for each selected server.
@@ -326,6 +347,58 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     return new TableScanWorkerAssignmentResult(dataDistribution, 
workerIdToSegmentsMap);
   }
 
+  /**
+   * Infers partition from invalid segments if the passed flag is set to true. 
Inference is done by simply:
+   * <ol>
+   *   <li>Extracting the stream partition number from the segment name</li>
+   *   <li>Doing a modulus with the numPartitions.</li>
+   * </ol>
+   */
+  @VisibleForTesting
+  static Map<Integer, List<String>> 
getInvalidSegmentsByInferredPartition(@Nullable List<String> invalidSegments,
+      boolean inferPartitionsForInvalidSegments, String tableNameWithType, int 
numPartitions) {
+    if (CollectionUtils.isEmpty(invalidSegments)) {
+      return Map.of();
+    }
+    if 
(!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))
+        || !inferPartitionsForInvalidSegments) {
+      throw new IllegalStateException(String.format("Table %s has %s segments 
with invalid partition info. Will "
+          + "assume un-partitioned distribution. Sampled: %s", 
tableNameWithType, invalidSegments.size(),
+          sampleSegmentsForLogging(invalidSegments)));
+    }
+    Map<Integer, List<String>> invalidSegmentsByInferredPartition = new 
HashMap<>();
+    for (String invalidPartitionSegment : invalidSegments) {
+      int partitionId = inferPartitionId(invalidPartitionSegment, 
numPartitions);
+      if (partitionId == -1) {
+        throw new IllegalStateException(String.format("Could not infer 
partition for segment: %s. Falling back to "
+            + "un-partitioned distribution", invalidPartitionSegment));
+      }
+      invalidSegmentsByInferredPartition.computeIfAbsent(partitionId, (x) -> 
new ArrayList<>()).add(
+          invalidPartitionSegment);
+    }
+    return invalidSegmentsByInferredPartition;
+  }
+
+  @VisibleForTesting
+  static int inferPartitionId(String segmentName, int numPartitions) {
+    if (LLCSegmentName.isLLCSegment(segmentName)) {
+      LLCSegmentName llc = LLCSegmentName.of(segmentName);
+      return llc != null ? (llc.getPartitionGroupId() % numPartitions) : -1;
+    } else if 
(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName)) {
+      UploadedRealtimeSegmentName uploaded = 
UploadedRealtimeSegmentName.of(segmentName);
+      return uploaded != null ? (uploaded.getPartitionId() % numPartitions) : 
-1;
+    }
+    return -1;
+  }
+
+  @VisibleForTesting
+  static List<String> sampleSegmentsForLogging(List<String> segments) {
+    if (segments.size() > LIMIT_OF_INVALID_SEGMENTS_TO_LOG) {
+      return segments.subList(0, LIMIT_OF_INVALID_SEGMENTS_TO_LOG);
+    }
+    return segments;
+  }
+
   private Map<String, TablePartitionInfo> calculateTablePartitionInfo(String 
tableName, Set<String> tableTypes) {
     Map<String, TablePartitionInfo> result = new HashMap<>();
     if (tableTypes.contains("OFFLINE")) {
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
index eed658a96b..9dd8724add 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.planner.physical.v2.opt.rules;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +40,7 @@ import static org.testng.Assert.*;
 
 public class LeafStageWorkerAssignmentRuleTest {
   private static final String TABLE_NAME = "testTable";
+  private static final String INVALID_SEGMENT_PARTITION = 
"testTable__1__35__20250509T1444Z";
   private static final List<String> FIELDS_IN_SCAN = List.of("userId", 
"orderId", "orderAmount", "cityId", "cityName");
   private static final String PARTITION_COLUMN = "userId";
   private static final String PARTITION_FUNCTION = "murmur";
@@ -47,6 +49,7 @@ public class LeafStageWorkerAssignmentRuleTest {
   private static final int REALTIME_NUM_PARTITIONS = 8;
   private static final InstanceIdToSegments OFFLINE_INSTANCE_ID_TO_SEGMENTS;
   private static final InstanceIdToSegments REALTIME_INSTANCE_ID_TO_SEGMENTS;
+  private static final InstanceIdToSegments 
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS;
   private static final InstanceIdToSegments HYBRID_INSTANCE_ID_TO_SEGMENTS;
 
   static {
@@ -56,6 +59,9 @@ public class LeafStageWorkerAssignmentRuleTest {
     OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap = 
offlineSegmentsMap;
     REALTIME_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments();
     REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap = 
realtimeSegmentsMap;
+    REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS = new 
InstanceIdToSegments();
+    
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap
+        = createRealtimeSegmentsMapWithInvalidPartitionSegments();
     HYBRID_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments();
     HYBRID_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap = 
offlineSegmentsMap;
     HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap = 
realtimeSegmentsMap;
@@ -64,7 +70,7 @@ public class LeafStageWorkerAssignmentRuleTest {
   @Test
   public void testAssignTableScanWithUnPartitionedOfflineTable() {
     TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
-        OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of());
+        OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of(), false);
     assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.RANDOM_DISTRIBUTED);
     assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
     assertEquals(result._pinotDataDistribution.getCollation(), 
RelCollations.EMPTY);
@@ -75,7 +81,7 @@ public class LeafStageWorkerAssignmentRuleTest {
   @Test
   public void testAssignTableScanWithUnPartitionedRealtimeTable() {
     TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
-        REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of());
+        REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of(), false);
     assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.RANDOM_DISTRIBUTED);
     assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
     assertEquals(result._pinotDataDistribution.getCollation(), 
RelCollations.EMPTY);
@@ -86,7 +92,7 @@ public class LeafStageWorkerAssignmentRuleTest {
   @Test
   public void testAssignTableScanWithUnPartitionedHybridTable() {
     TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
-        HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of());
+        HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of(), false);
     assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.RANDOM_DISTRIBUTED);
     assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
     assertEquals(result._pinotDataDistribution.getCollation(), 
RelCollations.EMPTY);
@@ -98,7 +104,7 @@ public class LeafStageWorkerAssignmentRuleTest {
   @Test
   public void testAssignTableScanPartitionedOfflineTable() {
     TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
-        OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", 
createOfflineTablePartitionInfo()));
+        OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", 
createOfflineTablePartitionInfo()), false);
     // Basic checks.
     assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.HASH_DISTRIBUTED);
     assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
@@ -114,7 +120,7 @@ public class LeafStageWorkerAssignmentRuleTest {
   @Test
   public void testAssignTableScanPartitionedRealtimeTable() {
     TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
-        REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME", 
createRealtimeTablePartitionInfo()));
+        REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME", 
createRealtimeTablePartitionInfo()), false);
     // Basic checks.
     assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.HASH_DISTRIBUTED);
     assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
@@ -127,11 +133,48 @@ public class LeafStageWorkerAssignmentRuleTest {
     validateTableScanAssignment(result, 
REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
   }
 
+  @Test
+  public void 
testAssignTableScanPartitionedRealtimeTableWithSomeInvalidPartitionSegments() {
+    // In both the cases when the inference for invalid partition segments is 
turned on/off, the instance id to segments
+    // assignment will be the same, because that simply depends on the Routing 
Table selection. The only difference will
+    // be in the PinotDataDistribution. It will be hash distributed when the 
feature is turned on, and random otherwise.
+    {
+      // Case-1: When inferInvalidPartitionSegment is set to true.
+      TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+          REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS, 
Map.of("REALTIME",
+              
createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), true);
+      // Basic checks.
+      assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.HASH_DISTRIBUTED);
+      assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+      assertEquals(result._pinotDataDistribution.getCollation(), 
RelCollations.EMPTY);
+      
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 1);
+      HashDistributionDesc desc = 
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
+      assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS);
+      assertEquals(desc.getKeys(), 
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
+      assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+      validateTableScanAssignment(result,
+          
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap,
 "REALTIME");
+    }
+    {
+      // Case-2: When inferInvalidPartitionSegment is set to false.
+      TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+          REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS, 
Map.of("REALTIME",
+              
createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), false);
+      // Basic checks.
+      assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.RANDOM_DISTRIBUTED);
+      assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+      assertEquals(result._pinotDataDistribution.getCollation(), 
RelCollations.EMPTY);
+      
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 0);
+      validateTableScanAssignment(result,
+          
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap,
 "REALTIME");
+    }
+  }
+
   @Test
   public void testAssignTableScanPartitionedHybridTable() {
     TableScanWorkerAssignmentResult result = 
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
         HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", 
createOfflineTablePartitionInfo(),
-            "REALTIME", createRealtimeTablePartitionInfo()));
+            "REALTIME", createRealtimeTablePartitionInfo()), false);
     assertEquals(result._pinotDataDistribution.getType(), 
RelDistribution.Type.RANDOM_DISTRIBUTED);
     assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
     assertEquals(result._pinotDataDistribution.getCollation(), 
RelCollations.EMPTY);
@@ -140,6 +183,65 @@ public class LeafStageWorkerAssignmentRuleTest {
     validateTableScanAssignment(result, 
HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
   }
 
+  @Test
+  public void 
testGetInvalidSegmentsByInferredPartitionWhenSegmentNamesDontConform() {
+    final int numPartitions = 4;  // arbitrary for this test
+    final boolean inferPartitions = true;
+    final String tableNameWithType = "foobar_REALTIME";
+    assertThrows(IllegalStateException.class, () -> 
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(
+        List.of("foobar_123"), inferPartitions, tableNameWithType, 
numPartitions));
+    assertThrows(IllegalStateException.class, () -> 
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(
+        List.of("foobar_123_123"), inferPartitions, tableNameWithType, 
numPartitions));
+    assertThrows(IllegalStateException.class, () -> 
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(
+        List.of("foobar_123_123_123"), inferPartitions, tableNameWithType, 
numPartitions));
+    assertThrows(IllegalStateException.class, () -> 
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(
+        List.of("foobar__9__35__20250509T1444Z", "foobar_123_123_123"), 
inferPartitions, tableNameWithType,
+        numPartitions));
+  }
+
+  @Test
+  public void 
testGetInvalidSegmentsByInferredPartitionWhenValidRealtimeSegmentNames() {
+    final boolean inferPartitions = true;
+    final String tableNameWithType = "foobar_REALTIME";
+    // Should return segments by inferred partition when valid LLC Segment 
Name.
+    assertEquals(Map.of(9, List.of("foobar__9__35__20250509T1444Z")),
+        
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of("foobar__9__35__20250509T1444Z"),
+        inferPartitions, tableNameWithType, 256));
+    assertEquals(Map.of(101, List.of("foobar__101__35__20250509T1Z")),
+        
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of("foobar__101__35__20250509T1Z"),
+        inferPartitions, tableNameWithType, 256));
+    // Should return segments by inferred partition when valid Uploaded 
segment name.
+    assertEquals(Map.of(11, 
List.of("uploaded__table_name__11__20240530T0000Z__suffix")),
+        
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of(
+            "uploaded__table_name__11__20240530T0000Z__suffix"), 
inferPartitions, tableNameWithType, 256));
+    // Should handle when numPartitions is less than kafka partition count.
+    assertEquals(Map.of(1, List.of("foobar__9__35__20250509T1444Z")),
+        
LeafStageWorkerAssignmentRule.getInvalidSegmentsByInferredPartition(List.of("foobar__9__35__20250509T1444Z"),
+            inferPartitions, tableNameWithType, 8));
+  }
+
+  @Test
+  public void testInferPartitionId() {
+    // Valid name cases. When numPartitions is less than the stream partition 
number, then we expect modulus to be used.
+    assertEquals(9, 
LeafStageWorkerAssignmentRule.inferPartitionId("foobar__9__35__20250509T1444Z", 
16));
+    assertEquals(1, 
LeafStageWorkerAssignmentRule.inferPartitionId("foobar__9__35__20250509T1444Z", 
8));
+    assertEquals(0, 
LeafStageWorkerAssignmentRule.inferPartitionId("foobar__16__35__20250509T1444Z",
 16));
+    assertEquals(16, 
LeafStageWorkerAssignmentRule.inferPartitionId("foobar__16__35__20250509T1444Z",
 32));
+    // Invalid segment name case.
+    assertEquals(-1, 
LeafStageWorkerAssignmentRule.inferPartitionId("foobar_invalid_123_123", 4));
+  }
+
+  @Test
+  public void testSampleSegmentsForLogging() {
+    assertEquals(List.of(), 
LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(List.of()));
+    assertEquals(List.of("s0"), 
LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(List.of("s0")));
+    assertEquals(List.of("s0", "s1"), 
LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(List.of("s0", "s1")));
+    assertEquals(List.of("s0", "s1", "s2"), 
LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(
+        List.of("s0", "s1", "s2")));
+    assertEquals(List.of("s0", "s1", "s2"), 
LeafStageWorkerAssignmentRule.sampleSegmentsForLogging(
+        List.of("s0", "s1", "s2", "s3", "s4")));
+  }
+
   private static void 
validateTableScanAssignment(TableScanWorkerAssignmentResult assignmentResult,
       Map<String, List<String>> instanceIdToSegmentsMap, String tableType) {
     Map<String, List<String>> actualInstanceIdToSegments = new HashMap<>();
@@ -173,6 +275,14 @@ public class LeafStageWorkerAssignmentRuleTest {
     return result;
   }
 
+  private static Map<String, List<String>> 
createRealtimeSegmentsMapWithInvalidPartitionSegments() {
+    Map<String, List<String>> result = createRealtimeSegmentsMap();
+    List<String> segments = new ArrayList<>(result.get("instance-1"));
+    segments.add(INVALID_SEGMENT_PARTITION);
+    result.put("instance-1", segments);
+    return result;
+  }
+
   private static TablePartitionInfo createOfflineTablePartitionInfo() {
     TablePartitionInfo.PartitionInfo[] infos = new 
TablePartitionInfo.PartitionInfo[OFFLINE_NUM_PARTITIONS];
     for (int partitionNum = 0; partitionNum < OFFLINE_NUM_PARTITIONS; 
partitionNum++) {
@@ -190,6 +300,10 @@ public class LeafStageWorkerAssignmentRuleTest {
   }
 
   private static TablePartitionInfo createRealtimeTablePartitionInfo() {
+    return createRealtimeTablePartitionInfo(List.of());
+  }
+
+  private static TablePartitionInfo 
createRealtimeTablePartitionInfo(List<String> invalidSegments) {
     TablePartitionInfo.PartitionInfo[] infos = new 
TablePartitionInfo.PartitionInfo[REALTIME_NUM_PARTITIONS];
     for (int partitionNum = 0; partitionNum < REALTIME_NUM_PARTITIONS; 
partitionNum++) {
       String selectedInstance = String.format("instance-%s", partitionNum % 
NUM_SERVERS);
@@ -202,6 +316,6 @@ public class LeafStageWorkerAssignmentRuleTest {
           segments);
     }
     return new 
TablePartitionInfo(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(TABLE_NAME),
-        PARTITION_COLUMN, PARTITION_FUNCTION, REALTIME_NUM_PARTITIONS, infos, 
List.of());
+        PARTITION_COLUMN, PARTITION_FUNCTION, REALTIME_NUM_PARTITIONS, infos, 
invalidSegments);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index cd6308a30e..034fd702b4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -614,6 +614,12 @@ public class CommonConstants {
         // Use MSE compiler when trying to fill a response with no schema 
metadata
         // (overrides the "pinot.broker.use.mse.to.fill.empty.response.schema" 
broker conf)
         public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = 
"useMSEToFillEmptyResponseSchema";
+
+        // Used by the MSE Engine when auto-inferring data partitioning. 
Realtime streams can often incorrectly assign
+        // records to stream partitions, which can make a segment have 
multiple partitions. The scale of this is
+        // usually low, and this query option allows the MSE Optimizer to 
infer the partition of a segment based on its
+        // name, when that segment has multiple partitions in its 
columnPartitionMap.
+        public static final String INFER_INVALID_SEGMENT_PARTITION = 
"inferInvalidSegmentPartition";
       }
 
       public static class QueryOptionValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to