This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 38132964998 [fix](nereids)Fix decompose repeat nest rewrite need to 
derive stats after construct cteProducer (#60811)
38132964998 is described below

commit 3813296499846acd4140af700b59688874adc7d4
Author: feiniaofeiafei <[email protected]>
AuthorDate: Mon Mar 2 16:02:57 2026 +0800

    [fix](nereids)Fix decompose repeat nest rewrite need to derive stats after 
construct cteProducer (#60811)
    
    Related PR: #59116
    
    Reproduce:
    ```sql
        select a,b,c,c1 from (
        select a,b,c,d,sum(d) c1 from t1 group by grouping 
sets((a,b,c),(a,b,c,d),(a),(a,b,c,c))
        ) t group by rollup(a,b,c,c1);
    ```
    Error:
    ```shell
    java.lang.IllegalArgumentException: Stats for CTE: CTEId#0 not found
    ```
    
    Problem Summary:
    
    Root cause:
    This SQL triggers DecomposeRepeatWithPreAggregation twice: first for the
    inner LogicalAggregate, then for the outer LogicalAggregate.
    When rewriting the inner aggregate, a CTE structure is created
    (CTEAnchor, CTEProducer, CTEConsumer).
    When rewriting the outer aggregate, choosePreAggShuffleKeyPartitionExprs
    calls StatsDerive to derive statistics for the current subtree.
    At that point, the child of the outer LogicalAggregate is only the
    consumer subtree; the producer subtree is not part of the plan being
    visited.
    StatsDerive.visitLogicalCTEConsumer looks up producer statistics by
    CTEId, but the producer was never derived in this context, so the lookup
    fails with Stats for CTE: CTEId#0 not found.
    How to fix:
    In DecomposeRepeatWithPreAggregation.constructProducer(), derive
    statistics for the CTE producer when it is created. This ensures
    producer stats are stored in the context before the consumer is later
    visited. When StatsDerive visits LogicalCTEConsumer during the outer
    rewrite, it can find the producer stats correctly.
    
    Another fix: This pr increases the ndv requirement for the key when
    selecting a shuffle key. Only when the ndv > instance number * 512 will
    it be selected as the shuffle key. This is to avoid scenarios where low
    ndv leads to data skew.
---
 .../rewrite/DecomposeRepeatWithPreAggregation.java      |  4 +++-
 .../org/apache/doris/nereids/util/AggregateUtils.java   |  1 +
 .../main/java/org/apache/doris/qe/SessionVariable.java  |  3 ---
 .../apache/doris/statistics/util/StatisticsUtil.java    |  2 +-
 .../rewrite/DecomposeRepeatWithPreAggregationTest.java  | 16 ++++++++--------
 .../decompose_repeat/decompose_repeat.out               | 17 +++++++++++++++++
 .../decompose_repeat/decompose_repeat.groovy            | 13 ++++++-------
 7 files changed, 36 insertions(+), 20 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
index fd40b635ffd..0751f76c0ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
@@ -95,6 +95,7 @@ public class DecomposeRepeatWithPreAggregation extends 
DefaultPlanRewriter<Disti
     public static final DecomposeRepeatWithPreAggregation INSTANCE = new 
DecomposeRepeatWithPreAggregation();
     private static final Set<Class<? extends AggregateFunction>> 
SUPPORT_AGG_FUNCTIONS =
             ImmutableSet.of(Sum.class, Sum0.class, Min.class, Max.class, 
AnyValue.class, Count.class);
+    private static final int DECOMPOSE_REPEAT_THRESHOLD = 3;
 
     @Override
     public Plan rewriteRoot(Plan plan, JobContext jobContext) {
@@ -404,7 +405,7 @@ public class DecomposeRepeatWithPreAggregation extends 
DefaultPlanRewriter<Disti
         // This is an empirical threshold: when there are too few grouping 
sets,
         // the overhead of creating CTE and union may outweigh the benefits.
         // The value 3 is chosen heuristically based on practical experience.
-        if (groupingSets.size() <= 
connectContext.getSessionVariable().decomposeRepeatThreshold) {
+        if (groupingSets.size() <= DECOMPOSE_REPEAT_THRESHOLD) {
             return -1;
         }
         return findMaxGroupingSetIndex(groupingSets);
@@ -492,6 +493,7 @@ public class DecomposeRepeatWithPreAggregation extends 
DefaultPlanRewriter<Disti
         LogicalCTEProducer<LogicalAggregate<Plan>> producer =
                 new LogicalCTEProducer<>(ctx.statementContext.getNextCTEId(), 
preAggClone);
         ctx.cteProducerList.add(producer);
+        producer.accept(new StatsDerive(false), new DeriveContext());
         return producer;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
index d641ec3d257..a08645925fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
@@ -49,6 +49,7 @@ public class AggregateUtils {
     public static final double MID_CARDINALITY_THRESHOLD = 0.01;
     public static final double HIGH_CARDINALITY_THRESHOLD = 0.1;
     public static final int LOW_NDV_THRESHOLD = 1024;
+    public static final int NDV_INSTANCE_BALANCE_MULTIPLIER = 512;
 
     public static AggregateFunction 
tryConvertToMultiDistinct(AggregateFunction function) {
         if (function instanceof SupportMultiDistinct && function.isDistinct()) 
{
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4e96a70afe7..93286a8a330 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -856,7 +856,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String SKEW_REWRITE_AGG_BUCKET_NUM = 
"skew_rewrite_agg_bucket_num";
     public static final String AGG_SHUFFLE_USE_PARENT_KEY = 
"agg_shuffle_use_parent_key";
-    public static final String DECOMPOSE_REPEAT_THRESHOLD = 
"decompose_repeat_threshold";
     public static final String DECOMPOSE_REPEAT_SHUFFLE_INDEX_IN_MAX_GROUP
             = "decompose_repeat_shuffle_index_in_max_group";
 
@@ -3440,8 +3439,6 @@ public class SessionVariable implements Serializable, 
Writable {
     )
     public boolean useV3StorageFormat = false;
 
-    @VariableMgr.VarAttr(name = DECOMPOSE_REPEAT_THRESHOLD)
-    public int decomposeRepeatThreshold = 3;
     @VariableMgr.VarAttr(name = DECOMPOSE_REPEAT_SHUFFLE_INDEX_IN_MAX_GROUP)
     public int decomposeRepeatShuffleIndexInMaxGroup = -1;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 02d275795de..c8a52de9e35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -1340,6 +1340,6 @@ public class StatisticsUtil {
         double balanceFactor = maxHotValueCntIncludeNull == 0
                 ? Double.MAX_VALUE : rowsPerInstance / 
maxHotValueCntIncludeNull;
         // The larger this factor is, the more balanced the data.
-        return balanceFactor > 2.0 && ndv > instanceNum * 3 && ndv > 
AggregateUtils.LOW_NDV_THRESHOLD;
+        return balanceFactor > 2.0 && ndv > instanceNum * 
AggregateUtils.NDV_INSTANCE_BALANCE_MULTIPLIER;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregationTest.java
index 428ae79d373..725b537f1e6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregationTest.java
@@ -572,7 +572,7 @@ public class DecomposeRepeatWithPreAggregationTest extends 
TestWithFeService imp
     }
 
     /** Helper: build Statistics with column ndv for given expressions. */
-    private static Statistics statsWithNdv(Map<Expression, Double> exprToNdv) {
+    private static Statistics statsWithNdv(Map<Expression, Double> exprToNdv, 
int rows) {
         Map<Expression, ColumnStatistic> map = new HashMap<>();
         for (Map.Entry<Expression, Double> e : exprToNdv.entrySet()) {
             ColumnStatistic col = new ColumnStatisticBuilder(1)
@@ -586,7 +586,7 @@ public class DecomposeRepeatWithPreAggregationTest extends 
TestWithFeService imp
                     .build();
             map.put(e.getKey(), col);
         }
-        return new Statistics(100, map);
+        return new Statistics(rows, map);
     }
 
     @Test
@@ -609,10 +609,10 @@ public class DecomposeRepeatWithPreAggregationTest 
extends TestWithFeService imp
         );
 
         Map<Expression, Double> exprToNdv = new HashMap<>();
-        exprToNdv.put(a, 400.0);
-        exprToNdv.put(b, 6000.0);
-        exprToNdv.put(c, 2000.0);
-        Statistics stats = statsWithNdv(exprToNdv);
+        exprToNdv.put(a, 4000.0);
+        exprToNdv.put(b, 60000.0);
+        exprToNdv.put(c, 20000.0);
+        Statistics stats = statsWithNdv(exprToNdv, 60000);
 
         @SuppressWarnings("unchecked")
         Optional<Expression> chosen = (Optional<Expression>) method.invoke(
@@ -628,14 +628,14 @@ public class DecomposeRepeatWithPreAggregationTest 
extends TestWithFeService imp
 
         @SuppressWarnings("unchecked")
         Optional<Expression> chosen2 = (Optional<Expression>) method.invoke(
-                rule, groupingSets, -1, candidates, stats, 1000);
+                rule, groupingSets, -1, candidates, stats, 50);
         Assertions.assertTrue(chosen2.isPresent());
         Assertions.assertEquals(b, chosen2.get());
 
         // inputStats null -> chooseByNdv returns empty for every group -> 
empty
         @SuppressWarnings("unchecked")
         Optional<Expression> emptyNullStats = (Optional<Expression>) 
method.invoke(
-                rule, groupingSets, -1, candidates, null, 1000);
+                rule, groupingSets, -1, candidates, null, 50);
         Assertions.assertFalse(emptyNullStats.isPresent());
     }
 }
diff --git 
a/regression-test/data/nereids_rules_p0/decompose_repeat/decompose_repeat.out 
b/regression-test/data/nereids_rules_p0/decompose_repeat/decompose_repeat.out
index 1bbee3c8213..a78403bcb84 100644
--- 
a/regression-test/data/nereids_rules_p0/decompose_repeat/decompose_repeat.out
+++ 
b/regression-test/data/nereids_rules_p0/decompose_repeat/decompose_repeat.out
@@ -34,6 +34,23 @@
 1      3       2       2       2       1
 1      3       2       2       2       1
 
+-- !nest_rewrite --
+\N     \N      \N      \N
+1      \N      \N      \N
+1      \N      \N      \N
+1      \N      \N      \N
+1      \N      \N      10
+1      2       \N      \N
+1      2       1       \N
+1      2       1       1
+1      2       3       \N
+1      2       3       3
+1      2       3       4
+1      2       3       7
+1      3       \N      \N
+1      3       2       \N
+1      3       2       2
+
 -- !upper_ref --
 11     1       2       1
 12     1       3       \N
diff --git 
a/regression-test/suites/nereids_rules_p0/decompose_repeat/decompose_repeat.groovy
 
b/regression-test/suites/nereids_rules_p0/decompose_repeat/decompose_repeat.groovy
index 7668024708e..93ae90765e3 100644
--- 
a/regression-test/suites/nereids_rules_p0/decompose_repeat/decompose_repeat.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/decompose_repeat/decompose_repeat.groovy
@@ -16,19 +16,18 @@
 // under the License.
 
 suite("decompose_repeat") {
-//    sql "set disable_nereids_rules='DECOMPOSE_REPEAT';"
+    sql "set disable_nereids_rules='DECOMPOSE_REPEAT';"
     sql "drop table if exists t1;"
     sql "create table t1(a int, b int, c int, d int) distributed by hash(a) 
properties('replication_num'='1');"
     sql "insert into t1 values(1,2,3,4),(1,2,3,3),(1,2,1,1),(1,3,2,2);"
     order_qt_sum "select a,b,c,sum(d) from t1 group by rollup(a,b,c);"
     order_qt_agg_func_gby_key_same_col "select a,b,c,d,sum(d) from t1 group by 
rollup(a,b,c,d);"
     order_qt_multi_agg_func "select a,b,c,sum(d),sum(c),max(a) from t1 group 
by rollup(a,b,c,d);"
-    // maybe this problem:DORIS-24075
-//    order_qt_nest_rewrite """
-//    select a,b,c,c1 from (
-//    select a,b,c,d,sum(d) c1 from t1 group by grouping 
sets((a,b,c),(a,b,c,d),(a),(a,b,c,c))
-//    ) t group by rollup(a,b,c,c1);
-//    """
+    order_qt_nest_rewrite """
+    select a,b,c,c1 from (
+    select a,b,c,d,sum(d) c1 from t1 group by grouping 
sets((a,b,c),(a,b,c,d),(a),(a,b,c,c))
+    ) t group by rollup(a,b,c,c1);
+    """
     order_qt_upper_ref """
     select c1+10,a,b,c from (select a,b,c,sum(d) c1 from t1 group by 
rollup(a,b,c)) t group by c1+10,a,b,c;
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to