This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b78b76fc4f Warn about numGroups above threshold (#15278)
b78b76fc4f is described below

commit b78b76fc4fced65da1f06a81a6da807983efe864
Author: Alberto Bastos <alberto.var...@startree.ai>
AuthorDate: Thu Mar 27 10:42:22 2025 +0100

    Warn about numGroups above threshold (#15278)
---
 .../pinot/common/utils/config/QueryOptionsUtils.java      |  4 ++++
 .../core/operator/query/FilteredGroupByOperator.java      |  8 ++++++++
 .../apache/pinot/core/operator/query/GroupByOperator.java |  8 ++++++++
 .../pinot/core/plan/maker/InstancePlanMakerImplV2.java    |  6 ++++++
 .../pinot/core/query/request/context/QueryContext.java    | 10 ++++++++++
 .../java/org/apache/pinot/query/runtime/QueryRunner.java  | 11 ++++++++++-
 .../pinot/query/runtime/operator/AggregateOperator.java   |  4 ++++
 .../query/runtime/operator/MultistageGroupByExecutor.java | 15 +++++++++++++++
 .../query/runtime/operator/AggregateOperatorTest.java     | 12 ++++++++----
 .../java/org/apache/pinot/spi/utils/CommonConstants.java  |  6 ++++++
 10 files changed, 79 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index e37c980f4e..846bbf5d0c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -298,6 +298,10 @@ public class QueryOptionsUtils {
     return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_LIMIT, 
maxNumGroupLimit);
   }
 
+  public static Integer getNumGroupsWarningLimit(Map<String, String> 
queryOptions) {
+    String numGroupsWarningLimit = 
queryOptions.get(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT);
+    return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, 
numGroupsWarningLimit);
+  }
   @Nullable
   public static Integer getMaxInitialResultHolderCapacity(Map<String, String> 
queryOptions) {
     String maxInitialResultHolderCapacity = 
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index 5c82827cb4..6272a40422 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -48,6 +48,8 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
 import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -56,6 +58,7 @@ import org.apache.pinot.spi.trace.Tracing;
  */
 @SuppressWarnings("rawtypes")
 public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> 
{
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FilteredGroupByOperator.class);
   private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED";
 
   private final QueryContext _queryContext;
@@ -167,6 +170,11 @@ public class FilteredGroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
     boolean numGroupsLimitReached = groupKeyGenerator.getNumKeys() >= 
_queryContext.getNumGroupsLimit();
     Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), 
groupKeyGenerator.getNumKeys());
 
+    if (groupKeyGenerator.getNumKeys() >= 
_queryContext.getNumGroupsWarningLimit()) {
+      LOGGER.warn("numGroups reached warning limit: {} (actual: {})",
+          _queryContext.getNumGroupsWarningLimit(), 
groupKeyGenerator.getNumKeys());
+    }
+
     // Trim the groups when iff:
     // - Query has ORDER BY clause
     // - Segment group trim is enabled
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index 6e27c6b365..33d5e4157c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -43,6 +43,8 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
 import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -50,6 +52,7 @@ import org.apache.pinot.spi.trace.Tracing;
  */
 @SuppressWarnings("rawtypes")
 public class GroupByOperator extends BaseOperator<GroupByResultsBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupByOperator.class);
   private static final String EXPLAIN_NAME = "GROUP_BY";
 
   private final QueryContext _queryContext;
@@ -116,6 +119,11 @@ public class GroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
     boolean numGroupsLimitReached = groupByExecutor.getNumGroups() >= 
_queryContext.getNumGroupsLimit();
     Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), 
groupByExecutor.getNumGroups());
 
+    if (groupByExecutor.getNumGroups() >= 
_queryContext.getNumGroupsWarningLimit()) {
+      LOGGER.warn("numGroups reached warning limit: {} (actual: {})",
+          _queryContext.getNumGroupsWarningLimit(), 
groupByExecutor.getNumGroups());
+    }
+
     // Trim the groups when iff:
     // - Query has ORDER BY clause
     // - Segment group trim is enabled
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 562d1936c2..511815fbb2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -110,6 +110,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   private int _minInitialIndexedTableCapacity = 
Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private int _numGroupsLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT;
+  // Warning limit on number of groups stored for each segment
+  private int _numGroupsWarningLimit = 
Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT;
   // Used for SQL GROUP BY (server combine)
   private int _minSegmentGroupTrimSize = 
Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE;
   private int _minServerGroupTrimSize = 
Server.DEFAULT_QUERY_EXECUTOR_MIN_SERVER_GROUP_TRIM_SIZE;
@@ -131,6 +133,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     Preconditions.checkState(_minInitialIndexedTableCapacity <= 
_numGroupsLimit,
         "Invalid configuration: minInitialIndexedTableCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
         _minInitialIndexedTableCapacity, _numGroupsLimit);
+    _numGroupsWarningLimit = 
queryExecutorConfig.getProperty(Server.NUM_GROUPS_WARN_LIMIT,
+        Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT);
     _minSegmentGroupTrimSize = 
queryExecutorConfig.getProperty(Server.MIN_SEGMENT_GROUP_TRIM_SIZE,
         Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE);
     _minServerGroupTrimSize = 
queryExecutorConfig.getProperty(Server.MIN_SERVER_GROUP_TRIM_SIZE,
@@ -256,6 +260,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
       } else {
         queryContext.setNumGroupsLimit(_numGroupsLimit);
       }
+      // Set numGroupsWarningThreshold
+      queryContext.setNumGroupsWarningLimit(_numGroupsWarningLimit);
       // Set minSegmentGroupTrimSize
       Integer minSegmentGroupTrimSizeFromQuery = 
QueryOptionsUtils.getMinSegmentGroupTrimSize(queryOptions);
       if (minSegmentGroupTrimSizeFromQuery != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 9f35b1c2ef..b1144e6044 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -117,6 +117,8 @@ public class QueryContext {
   private int _minInitialIndexedTableCapacity = 
Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
   // Limit of number of groups stored in each segment
   private int _numGroupsLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT;
+  // Warning threshold of number of groups stored in each segment
+  private int _numGroupsWarningLimit = 
Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT;
   // Minimum number of groups to keep per segment when trimming groups for SQL 
GROUP BY
   private int _minSegmentGroupTrimSize = 
Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE;
   // Minimum number of groups to keep across segments when trimming groups for 
SQL GROUP BY
@@ -384,6 +386,14 @@ public class QueryContext {
     _numGroupsLimit = numGroupsLimit;
   }
 
+  public int getNumGroupsWarningLimit() {
+    return _numGroupsWarningLimit;
+  }
+
+  public void setNumGroupsWarningLimit(int numGroupsWarningLimit) {
+    _numGroupsWarningLimit = numGroupsWarningLimit;
+  }
+
   public int getMinSegmentGroupTrimSize() {
     return _minSegmentGroupTrimSize;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 7320061c5e..fb5f340961 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -117,6 +117,8 @@ public class QueryRunner {
   @Nullable
   private Integer _numGroupsLimit;
   @Nullable
+  private Integer _numGroupsWarningLimit;
+  @Nullable
   private Integer _mseMinGroupTrimSize;
 
   @Nullable
@@ -156,6 +158,9 @@ public class QueryRunner {
     String numGroupsLimitStr = 
config.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
     _numGroupsLimit = numGroupsLimitStr != null ? 
Integer.parseInt(numGroupsLimitStr) : null;
 
+    String numGroupsWarnLimitStr = 
config.getProperty(Server.CONFIG_OF_NUM_GROUPS_WARN_LIMIT);
+    _numGroupsWarningLimit = numGroupsWarnLimitStr != null ? 
Integer.parseInt(numGroupsWarnLimitStr) : null;
+
     String mseMinGroupTrimSizeStr = 
config.getProperty(Server.CONFIG_OF_MSE_MIN_GROUP_TRIM_SIZE);
     _mseMinGroupTrimSize = mseMinGroupTrimSizeStr != null ? 
Integer.parseInt(mseMinGroupTrimSizeStr) : null;
 
@@ -377,7 +382,11 @@ public class QueryRunner {
     opChainMetadata.putAll(requestMetadata);
     // 2. put all stageMetadata.customProperties.
     opChainMetadata.putAll(customProperties);
-    // 3. add all overrides from config if anything is still empty.
+    // 3. put some config not allowed through query options but propagated 
that way
+    if (_numGroupsWarningLimit != null) {
+      opChainMetadata.put(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, 
Integer.toString(_numGroupsWarningLimit));
+    }
+    // 4. add all overrides from config if anything is still empty.
     Integer numGroupsLimit = 
QueryOptionsUtils.getNumGroupsLimit(opChainMetadata);
     if (numGroupsLimit == null) {
       numGroupsLimit = _numGroupsLimit;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 7d1c7a7e31..64d84f2720 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -236,6 +236,10 @@ public class AggregateOperator extends MultiStageOperator {
             _input.earlyTerminate();
           }
         }
+        if (_groupByExecutor.getNumGroups() >= 
_groupByExecutor.getNumGroupsWarningLimit()) {
+          LOGGER.warn("numGroups reached warning limit: {} (actual: {})",
+              _groupByExecutor.getNumGroupsWarningLimit(), 
_groupByExecutor.getNumGroups());
+        }
         return dataBlock;
       }
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index d1d5029496..1cd26d2574 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -61,6 +61,7 @@ public class MultistageGroupByExecutor {
   private final boolean _leafReturnFinalResult;
   private final DataSchema _resultSchema;
   private final int _numGroupsLimit;
+  private final int _numGroupsWarningLimit;
   private final boolean _filteredAggregationsSkipEmptyGroups;
 
   // Group By Result holders for each mode
@@ -85,6 +86,7 @@ public class MultistageGroupByExecutor {
     int maxInitialResultHolderCapacity = 
getResolvedMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
 
     _numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);
+    _numGroupsWarningLimit = getNumGroupsWarningLimit(opChainMetadata);
 
     // By default, we compute all groups for SQL compliant results. However, 
we allow overriding this behavior via
     // query option for improved performance.
@@ -122,6 +124,11 @@ public class MultistageGroupByExecutor {
     return numGroupsLimit != null ? numGroupsLimit : 
Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT;
   }
 
+  private int getNumGroupsWarningLimit(Map<String, String> opChainMetadata) {
+    Integer numGroupsWarningLimit = 
QueryOptionsUtils.getNumGroupsWarningLimit(opChainMetadata);
+    return numGroupsWarningLimit != null ? numGroupsWarningLimit : 
Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT;
+  }
+
   private int getResolvedMaxInitialResultHolderCapacity(Map<String, String> 
opChainMetadata,
       @Nullable PlanNode.NodeHint nodeHint) {
     Integer mseMaxInitialResultHolderCapacity = 
getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
@@ -166,6 +173,10 @@ public class MultistageGroupByExecutor {
     return _numGroupsLimit;
   }
 
+  public int getNumGroupsWarningLimit() {
+    return _numGroupsWarningLimit;
+  }
+
   /**
    * Performs group-by aggregation for the data in the block.
    */
@@ -277,6 +288,10 @@ public class MultistageGroupByExecutor {
     }
   }
 
+  public int getNumGroups() {
+    return _groupIdGenerator.getNumGroups();
+  }
+
   public boolean isNumGroupsLimitReached() {
     return _groupIdGenerator.getNumGroups() == _numGroupsLimit;
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 23ccdc2ce3..9bea990d7e 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -255,7 +256,9 @@ public class AggregateOperatorTest {
         .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, 3.0}))
         
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
     DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new 
ColumnDataType[]{INT, DOUBLE});
-    AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys, nodeHint);
+    Map<String, String> opChainMetadata = new HashMap<>();
+    opChainMetadata.put(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, "1");
+    AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys, nodeHint, opChainMetadata);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -315,14 +318,15 @@ public class AggregateOperatorTest {
   }
 
   private AggregateOperator getOperator(DataSchema resultSchema, 
List<RexExpression.FunctionCall> aggCalls,
-      List<Integer> filterArgs, List<Integer> groupKeys, PlanNode.NodeHint 
nodeHint) {
-    return new AggregateOperator(OperatorTestUtil.getTracingContext(), _input,
+      List<Integer> filterArgs, List<Integer> groupKeys, PlanNode.NodeHint 
nodeHint,
+      Map<String, String> opChainMetadata) {
+    return new AggregateOperator(OperatorTestUtil.getContext(opChainMetadata), 
_input,
         new AggregateNode(-1, resultSchema, nodeHint, List.of(), aggCalls, 
filterArgs, groupKeys, AggType.DIRECT,
             false, null, 0));
   }
 
   private AggregateOperator getOperator(DataSchema resultSchema, 
List<RexExpression.FunctionCall> aggCalls,
       List<Integer> filterArgs, List<Integer> groupKeys) {
-    return getOperator(resultSchema, aggCalls, filterArgs, groupKeys, 
PlanNode.NodeHint.EMPTY);
+    return getOperator(resultSchema, aggCalls, filterArgs, groupKeys, 
PlanNode.NodeHint.EMPTY, Map.of());
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index bc9042738b..554dfc9037 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -542,6 +542,8 @@ public class CommonConstants {
         public static final String ERROR_ON_NUM_GROUPS_LIMIT = 
"errorOnNumGroupsLimit";
 
         public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
+        // Not actually accepted as Query Option but faked as one during MSE
+        public static final String NUM_GROUPS_WARNING_LIMIT = 
"numGroupsWarningLimit";
         public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"maxInitialResultHolderCapacity";
         public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY = 
"minInitialIndexedTableCapacity";
         public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"mseMaxInitialResultHolderCapacity";
@@ -843,6 +845,10 @@ public class CommonConstants {
     public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_LIMIT;
     public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT = 100_000;
+    public static final String NUM_GROUPS_WARN_LIMIT = "num.groups.warn.limit";
+    public static final String CONFIG_OF_NUM_GROUPS_WARN_LIMIT =
+        QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_WARN_LIMIT;
+    public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT = 
150_000;
     public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"max.init.group.holder.capacity";
     public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + 
MAX_INITIAL_RESULT_HOLDER_CAPACITY;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to