Copilot commented on code in PR #61495:
URL: https://github.com/apache/doris/pull/61495#discussion_r2954627571


##########
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.ExprToThriftVisitor;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.thrift.TBucketedAggregationNode;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Bucketed hash aggregation node.
+ *
+ * Fuses two-phase aggregation (local + global) into a single BE operator for 
single-BE deployments.
+ * Produces a BUCKETED_AGGREGATION_NODE in the Thrift plan, which the BE maps 
to
+ * BucketedAggSinkOperatorX / BucketedAggSourceOperatorX.
+ */
+public class BucketedAggregationNode extends PlanNode {
+    private final AggregateInfo aggInfo;
+    private final boolean needsFinalize;
+
+    public BucketedAggregationNode(PlanNodeId id, PlanNode input, 
AggregateInfo aggInfo,
+            boolean needsFinalize) {
+        super(id, aggInfo.getOutputTupleId().asList(), "BUCKETED AGGREGATE");
+        this.aggInfo = aggInfo;
+        this.needsFinalize = needsFinalize;
+        this.children.add(input);
+    }
+
+    @Override
+    protected void toThrift(TPlanNode msg) {
+        msg.node_type = TPlanNodeType.BUCKETED_AGGREGATION_NODE;
+
+        List<TExpr> aggregateFunctions = Lists.newArrayList();
+        for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
+            aggregateFunctions.add(ExprToThriftVisitor.treeToThrift(e));
+        }
+
+        List<TExpr> groupingExprs = Lists.newArrayList();
+        if (aggInfo.getGroupingExprs() != null) {
+            groupingExprs = 
ExprToThriftVisitor.treesToThrift(aggInfo.getGroupingExprs());
+        }
+
+        TBucketedAggregationNode bucketedAggNode = new 
TBucketedAggregationNode();
+        bucketedAggNode.setGroupingExprs(groupingExprs);
+        bucketedAggNode.setAggregateFunctions(aggregateFunctions);
+        
bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt());

Review Comment:
   `intermediate_tuple_id` is set to the *output* tuple id. For aggregate 
functions whose intermediate state differs from the final output type (e.g. 
AVG, NDV sketches, etc.), using the output tuple descriptor as the intermediate 
tuple can break `AggFnEvaluator::prepare()` expectations and/or produce 
incorrect results. Set `intermediate_tuple_id` to the actual intermediate tuple 
id from `AggregateInfo` (and keep `output_tuple_id` as the final tuple).
   



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -1074,6 +1075,14 @@ struct TAggregationNode {
   10: optional TSortInfo agg_sort_info_by_group_key
 }
 
+struct TBucketedAggregationNode {
+  1: optional list<Exprs.TExpr> grouping_exprs
+  2: required list<Exprs.TExpr> aggregate_functions
+  3: required Types.TTupleId intermediate_tuple_id
+  4: required Types.TTupleId output_tuple_id
+  5: required bool need_finalize
+}

Review Comment:
   `grouping_exprs` is declared `optional`, but the BE treats “empty 
grouping_exprs” as an internal error (see `pipeline_fragment_context.cpp`). If 
this node is never valid without GROUP BY keys, consider making 
`grouping_exprs` a `required list<Exprs.TExpr>` (and documenting “must be 
non-empty”) to encode the contract in the IDL and reduce accidental generation 
of invalid plans.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -2925,6 +2928,32 @@ public void setSkewRewriteAggBucketNum(int num) {
             checker = "checkAggPhase")
     public int aggPhase = 0;
 
+    @VariableMgr.VarAttr(name = ENABLE_BUCKETED_HASH_AGG, needForward = true, 
description = {
+            "是否启用 bucketed hash aggregation 优化。该优化在单 BE 场景下将两阶段聚合融合为单个算子,"
+                    + "消除 Exchange 开销和序列化/反序列化成本。默认关闭。",
+            "Whether to enable bucketed hash aggregation optimization. This 
optimization fuses two-phase "
+                    + "aggregation into a single operator on single-BE 
deployments, eliminating exchange overhead "
+                    + "and serialization/deserialization costs. Disabled by 
default."})

Review Comment:
   The variable description states “默认关闭 / Disabled by default” but the actual 
default is `true`, which makes this optimization enabled by default and can 
change query planning behavior unexpectedly. Either change the default to 
`false` or update both CN/EN descriptions to match the intended default (and 
ensure release notes / behavior-change checklist reflects it if enabling by 
default is intended).
   



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1423,6 +1425,53 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         }
         break;
     }
+    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
+        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
+            return Status::InternalError(
+                    "Bucketed aggregation node {} should not be used without 
group by keys",
+                    tnode.node_id);
+        }
+
+        // Create source operator (goes on the current / downstream pipeline).
+        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));

Review Comment:
   This uses `_parallel_instances` when adding the operator to the pipeline, 
but uses `_num_instances` when creating sink/source dependencies and shared 
state bookkeeping. If these differ, dependency counts can mismatch the actual 
number of running source/sink tasks, risking hangs (waiting on deps that will 
never be signaled) or premature completion. Use the same instance count 
consistently for operator parallelism and dependency creation (whichever is 
correct for this fragment/pipeline), and keep it aligned with 
`RuntimeState::task_num()` used later in 
`BucketedAggSharedState::init_instances(...)`.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1423,6 +1425,53 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         }
         break;
     }
+    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
+        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
+            return Status::InternalError(
+                    "Bucketed aggregation node {} should not be used without 
group by keys",
+                    tnode.node_id);
+        }
+
+        // Create source operator (goes on the current / downstream pipeline).
+        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+
+        // Create a new pipeline for the sink side.
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (!_dag.contains(downstream_pipeline_id)) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+        cur_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
+        // Create sink operator.
+        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
+                pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
+        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
+        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
+
+        // Pre-register a single shared state for ALL instances so that every
+        // sink instance writes its per-instance hash table into the same
+        // BucketedAggSharedState and every source instance can merge across
+        // all of them.
+        {
+            auto shared_state = BucketedAggSharedState::create_shared();
+            shared_state->id = op->operator_id();
+            shared_state->related_op_ids.insert(op->operator_id());
+
+            for (int i = 0; i < _num_instances; i++) {
+                auto sink_dep = 
std::make_shared<Dependency>(op->operator_id(), op->node_id(),
+                                                             
"BUCKETED_AGG_SINK_DEPENDENCY");
+                sink_dep->set_shared_state(shared_state.get());
+                shared_state->sink_deps.push_back(sink_dep);
+            }
+            shared_state->create_source_dependencies(_num_instances, 
op->operator_id(),

Review Comment:
   This uses `_parallel_instances` when adding the operator to the pipeline, 
but uses `_num_instances` when creating sink/source dependencies and shared 
state bookkeeping. If these differ, dependency counts can mismatch the actual 
number of running source/sink tasks, risking hangs (waiting on deps that will 
never be signaled) or premature completion. Use the same instance count 
consistently for operator parallelism and dependency creation (whichever is 
correct for this fragment/pipeline), and keep it aligned with 
`RuntimeState::task_num()` used later in 
`BucketedAggSharedState::init_instances(...)`.
   



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +180,221 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr 
expr, Map<String, Strin
                 aggregate.getLogicalProperties(), localAgg));
     }
 
+    /**
+     * Implements bucketed hash aggregation for single-BE deployments.
+     * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate 
operator,
+     * eliminating exchange overhead and serialization/deserialization costs.
+     *
+     * Only generated when:
+     * 1. enable_bucketed_hash_agg session variable is true
+     * 2. Cluster has exactly one alive BE
+     * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+     * 4. Aggregate functions support two-phase execution
+     * 5. Data volume checks pass (min input rows, max group keys)
+     */
+    private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> 
aggregate, ConnectContext ctx) {
+        if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+            return ImmutableList.of();
+        }
+        // Only for single-BE deployments
+        int beNumber = Math.max(1, 
ctx.getEnv().getClusterInfo().getBackendsNumber(true));
+        if (beNumber != 1) {
+            return ImmutableList.of();
+        }
+        // Without-key aggregation not supported in initial version
+        if (aggregate.getGroupByExpressions().isEmpty()) {
+            return ImmutableList.of();
+        }
+        // Must support two-phase execution (same check as splitTwoPhase)
+        if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+            return ImmutableList.of();
+        }
+        // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+        // These are produced by DistinctAggregateRewriter as the bottom dedup 
phase.
+        if (aggregate.getAggregateFunctions().isEmpty()) {
+            return ImmutableList.of();
+        }
+        // Skip aggregates whose child group contains a LogicalAggregate.
+        // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+        // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+        // on top of GROUP BY a,b dedup). Bucketed agg does not support
+        // DISTINCT aggregation in the initial version.
+        if (childGroupContainsAggregate(aggregate)) {
+            return ImmutableList.of();
+        }
+        // Skip when data is already distributed by the GROUP BY keys
+        // (e.g., table bucketed by UserID, query GROUP BY UserID).
+        // In this case the two-phase plan needs no exchange and is strictly
+        // better than bucketed agg (no 256-bucket overhead, no merge phase).
+        if (groupByKeysSatisfyDistribution(aggregate)) {
+            return ImmutableList.of();
+        }
+        // Data-volume-based checks: control bucketed agg eligibility based on
+        // estimated data scale, similar to ClickHouse's 
group_by_two_level_threshold
+        // and group_by_two_level_threshold_bytes. This reduces reliance on
+        // column-level statistics which may be inaccurate or missing.
+        //
+        // When statistics are unavailable (groupExpression absent or 
childStats null),
+        // conservatively skip bucketed agg — without data volume information 
we cannot
+        // make an informed decision, and the risk of choosing bucketed agg in 
a
+        // high-cardinality scenario outweighs the potential benefit.
+        if (!aggregate.getGroupExpression().isPresent()) {
+            return ImmutableList.of();
+        }
+        GroupExpression ge = aggregate.getGroupExpression().get();
+        Statistics childStats = ge.childStatistics(0);
+        if (childStats == null) {
+            return ImmutableList.of();
+        }
+        double rows = childStats.getRowCount();
+        long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+        long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+        // Gate: minimum input rows.
+        // When input data is too small, the overhead of initializing 256
+        // per-bucket hash tables and the pipelined merge phase outweighs
+        // the benefit of eliminating exchange. Skip bucketed agg.
+        if (minInputRows > 0 && rows < minInputRows) {
+            return ImmutableList.of();
+        }
+
+        // Gate: maximum estimated group keys (similar to ClickHouse's
+        // group_by_two_level_threshold). When the number of distinct groups
+        // is too large, the source-side merge must combine too many keys
+        // across instances, and the merge cost dominates. Skip bucketed agg.
+        Statistics aggStats = ge.getOwnerGroup().getStatistics();
+        if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() > 
maxGroupKeys) {
+            return ImmutableList.of();
+        }
+
+        // High-cardinality ratio checks (existing logic).
+        // These complement the absolute thresholds above with relative checks:
+        // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows * 
threshold,
+        //    the combined NDV is at least that high.
+        // 2. Aggregation ratio check: if estimated output rows > rows * 
threshold,
+        //    merge cost dominates.
+        double highCardThreshold = 0.3;
+        for (Expression groupByKey : aggregate.getGroupByExpressions()) {
+            ColumnStatistic colStat = 
childStats.findColumnStatistics(groupByKey);
+            if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows 
* highCardThreshold) {
+                return ImmutableList.of();
+            }
+        }
+        if (aggStats != null && aggStats.getRowCount() > rows * 
highCardThreshold) {
+            return ImmutableList.of();
+        }
+        // Build output expressions: rewrite AggregateFunction -> 
AggregateExpression with GLOBAL_RESULT param
+        // (same as one-phase aggregation — raw input directly produces final 
result).
+        List<NamedExpression> aggOutput = 
ExpressionUtils.rewriteDownShortCircuit(
+                aggregate.getOutputExpressions(), expr -> {
+                    if (!(expr instanceof AggregateFunction)) {
+                        return expr;
+                    }
+                    return new AggregateExpression((AggregateFunction) expr, 
AggregateParam.GLOBAL_RESULT);
+                }
+        );
+        return ImmutableList.of(new PhysicalBucketedHashAggregate<>(
+                aggregate.getGroupByExpressions(), aggOutput,
+                aggregate.getLogicalProperties(), aggregate.child()));
+    }
+
+    /**
+     * Check if the child group of this aggregate contains a LogicalAggregate.
+     * This is used to detect aggregates produced by DISTINCT decomposition 
rewrites
+     * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where 
the original
+     * DISTINCT aggregate is split into a top non-distinct aggregate over a 
bottom dedup aggregate.
+     */
+    private boolean childGroupContainsAggregate(LogicalAggregate<? extends 
Plan> aggregate) {
+        if (!aggregate.getGroupExpression().isPresent()) {
+            return false;
+        }
+        GroupExpression groupExpr = aggregate.getGroupExpression().get();
+        if (groupExpr.arity() == 0) {
+            return false;
+        }
+        Group childGroup = groupExpr.child(0);
+        for (GroupExpression childGroupExpr : 
childGroup.getLogicalExpressions()) {
+            if (childGroupExpr.getPlan() instanceof LogicalAggregate) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Check if the GROUP BY keys of this aggregate are a superset of (or 
equal to)
+     * the underlying OlapTable's hash distribution columns. When this is true,
+     * the data is already correctly partitioned for the aggregation, so the
+     * two-phase plan (local + global) requires no exchange and is strictly 
better
+     * than bucketed agg (no 256-bucket overhead, no merge phase).
+     *
+     * Traverses the child group in the Memo to find a LogicalOlapScan,
+     * walking through LogicalProject and LogicalFilter transparently.
+     */
+    private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends 
Plan> aggregate) {
+        if (!aggregate.getGroupExpression().isPresent()) {
+            return false;
+        }
+        GroupExpression groupExpr = aggregate.getGroupExpression().get();
+        if (groupExpr.arity() == 0) {
+            return false;
+        }
+        OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5);
+        if (table == null) {
+            return false;
+        }
+        DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        // Collect GROUP BY column names (only direct SlotReference with 
original column info)
+        Set<String> groupByColumnNames = new HashSet<>();
+        for (Expression expr : aggregate.getGroupByExpressions()) {
+            if (expr instanceof SlotReference) {
+                SlotReference slot = (SlotReference) expr;
+                if (slot.getOriginalColumn().isPresent()) {
+                    
groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+                }
+            }
+        }
+        // All distribution columns must appear in the GROUP BY keys
+        for (Column column : distributionColumns) {
+            if (!groupByColumnNames.contains(column.getName().toLowerCase())) {

Review Comment:
   `String#toLowerCase()` without an explicit locale can produce incorrect 
results under locale-sensitive environments (e.g. Turkish locale). Use 
`toLowerCase(Locale.ROOT)` (and import `java.util.Locale`) for deterministic 
case-folding of identifiers.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +180,221 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr 
expr, Map<String, Strin
                 aggregate.getLogicalProperties(), localAgg));
     }
 
+    /**
+     * Implements bucketed hash aggregation for single-BE deployments.
+     * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate 
operator,
+     * eliminating exchange overhead and serialization/deserialization costs.
+     *
+     * Only generated when:
+     * 1. enable_bucketed_hash_agg session variable is true
+     * 2. Cluster has exactly one alive BE
+     * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+     * 4. Aggregate functions support two-phase execution
+     * 5. Data volume checks pass (min input rows, max group keys)
+     */
+    private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> 
aggregate, ConnectContext ctx) {
+        if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+            return ImmutableList.of();
+        }
+        // Only for single-BE deployments
+        int beNumber = Math.max(1, 
ctx.getEnv().getClusterInfo().getBackendsNumber(true));

Review Comment:
   `Math.max(1, ...)` masks the “0 alive BE” case as `1`, which can incorrectly 
allow `implementBucketedPhase()` to generate a bucketed-agg candidate when the 
cluster has no available backends. Use the real backend count directly and 
require it to be exactly 1.
   



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +180,221 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr 
expr, Map<String, Strin
                 aggregate.getLogicalProperties(), localAgg));
     }
 
+    /**
+     * Implements bucketed hash aggregation for single-BE deployments.
+     * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate 
operator,
+     * eliminating exchange overhead and serialization/deserialization costs.
+     *
+     * Only generated when:
+     * 1. enable_bucketed_hash_agg session variable is true
+     * 2. Cluster has exactly one alive BE
+     * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+     * 4. Aggregate functions support two-phase execution
+     * 5. Data volume checks pass (min input rows, max group keys)
+     */
+    private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> 
aggregate, ConnectContext ctx) {
+        if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+            return ImmutableList.of();
+        }
+        // Only for single-BE deployments
+        int beNumber = Math.max(1, 
ctx.getEnv().getClusterInfo().getBackendsNumber(true));
+        if (beNumber != 1) {
+            return ImmutableList.of();
+        }
+        // Without-key aggregation not supported in initial version
+        if (aggregate.getGroupByExpressions().isEmpty()) {
+            return ImmutableList.of();
+        }
+        // Must support two-phase execution (same check as splitTwoPhase)
+        if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+            return ImmutableList.of();
+        }
+        // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+        // These are produced by DistinctAggregateRewriter as the bottom dedup 
phase.
+        if (aggregate.getAggregateFunctions().isEmpty()) {
+            return ImmutableList.of();
+        }
+        // Skip aggregates whose child group contains a LogicalAggregate.
+        // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+        // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+        // on top of GROUP BY a,b dedup). Bucketed agg does not support
+        // DISTINCT aggregation in the initial version.
+        if (childGroupContainsAggregate(aggregate)) {
+            return ImmutableList.of();
+        }
+        // Skip when data is already distributed by the GROUP BY keys
+        // (e.g., table bucketed by UserID, query GROUP BY UserID).
+        // In this case the two-phase plan needs no exchange and is strictly
+        // better than bucketed agg (no 256-bucket overhead, no merge phase).
+        if (groupByKeysSatisfyDistribution(aggregate)) {
+            return ImmutableList.of();
+        }
+        // Data-volume-based checks: control bucketed agg eligibility based on
+        // estimated data scale, similar to ClickHouse's 
group_by_two_level_threshold
+        // and group_by_two_level_threshold_bytes. This reduces reliance on
+        // column-level statistics which may be inaccurate or missing.
+        //
+        // When statistics are unavailable (groupExpression absent or 
childStats null),
+        // conservatively skip bucketed agg — without data volume information 
we cannot
+        // make an informed decision, and the risk of choosing bucketed agg in 
a
+        // high-cardinality scenario outweighs the potential benefit.
+        if (!aggregate.getGroupExpression().isPresent()) {
+            return ImmutableList.of();
+        }
+        GroupExpression ge = aggregate.getGroupExpression().get();
+        Statistics childStats = ge.childStatistics(0);
+        if (childStats == null) {
+            return ImmutableList.of();
+        }
+        double rows = childStats.getRowCount();
+        long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+        long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+        // Gate: minimum input rows.
+        // When input data is too small, the overhead of initializing 256
+        // per-bucket hash tables and the pipelined merge phase outweighs
+        // the benefit of eliminating exchange. Skip bucketed agg.
+        if (minInputRows > 0 && rows < minInputRows) {
+            return ImmutableList.of();
+        }
+
+        // Gate: maximum estimated group keys (similar to ClickHouse's
+        // group_by_two_level_threshold). When the number of distinct groups
+        // is too large, the source-side merge must combine too many keys
+        // across instances, and the merge cost dominates. Skip bucketed agg.
+        Statistics aggStats = ge.getOwnerGroup().getStatistics();
+        if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() > 
maxGroupKeys) {
+            return ImmutableList.of();
+        }
+
+        // High-cardinality ratio checks (existing logic).
+        // These complement the absolute thresholds above with relative checks:
+        // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows * 
threshold,
+        //    the combined NDV is at least that high.
+        // 2. Aggregation ratio check: if estimated output rows > rows * 
threshold,
+        //    merge cost dominates.
+        double highCardThreshold = 0.3;
+        for (Expression groupByKey : aggregate.getGroupByExpressions()) {
+            ColumnStatistic colStat = 
childStats.findColumnStatistics(groupByKey);
+            if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows 
* highCardThreshold) {
+                return ImmutableList.of();
+            }
+        }
+        if (aggStats != null && aggStats.getRowCount() > rows * 
highCardThreshold) {
+            return ImmutableList.of();
+        }
+        // Build output expressions: rewrite AggregateFunction -> 
AggregateExpression with GLOBAL_RESULT param
+        // (same as one-phase aggregation — raw input directly produces final 
result).
+        List<NamedExpression> aggOutput = 
ExpressionUtils.rewriteDownShortCircuit(
+                aggregate.getOutputExpressions(), expr -> {
+                    if (!(expr instanceof AggregateFunction)) {
+                        return expr;
+                    }
+                    return new AggregateExpression((AggregateFunction) expr, 
AggregateParam.GLOBAL_RESULT);
+                }
+        );
+        return ImmutableList.of(new PhysicalBucketedHashAggregate<>(
+                aggregate.getGroupByExpressions(), aggOutput,
+                aggregate.getLogicalProperties(), aggregate.child()));
+    }
+
+    /**
+     * Check if the child group of this aggregate contains a LogicalAggregate.
+     * This is used to detect aggregates produced by DISTINCT decomposition 
rewrites
+     * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where 
the original
+     * DISTINCT aggregate is split into a top non-distinct aggregate over a 
bottom dedup aggregate.
+     */
+    private boolean childGroupContainsAggregate(LogicalAggregate<? extends 
Plan> aggregate) {
+        if (!aggregate.getGroupExpression().isPresent()) {
+            return false;
+        }
+        GroupExpression groupExpr = aggregate.getGroupExpression().get();
+        if (groupExpr.arity() == 0) {
+            return false;
+        }
+        Group childGroup = groupExpr.child(0);
+        for (GroupExpression childGroupExpr : 
childGroup.getLogicalExpressions()) {
+            if (childGroupExpr.getPlan() instanceof LogicalAggregate) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Check if the GROUP BY keys of this aggregate are a superset of (or 
equal to)
+     * the underlying OlapTable's hash distribution columns. When this is true,
+     * the data is already correctly partitioned for the aggregation, so the
+     * two-phase plan (local + global) requires no exchange and is strictly 
better
+     * than bucketed agg (no 256-bucket overhead, no merge phase).
+     *
+     * Traverses the child group in the Memo to find a LogicalOlapScan,
+     * walking through LogicalProject and LogicalFilter transparently.
+     */
+    private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends 
Plan> aggregate) {
+        if (!aggregate.getGroupExpression().isPresent()) {
+            return false;
+        }
+        GroupExpression groupExpr = aggregate.getGroupExpression().get();
+        if (groupExpr.arity() == 0) {
+            return false;
+        }
+        OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5);
+        if (table == null) {
+            return false;
+        }
+        DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        // Collect GROUP BY column names (only direct SlotReference with 
original column info)
+        Set<String> groupByColumnNames = new HashSet<>();
+        for (Expression expr : aggregate.getGroupByExpressions()) {
+            if (expr instanceof SlotReference) {
+                SlotReference slot = (SlotReference) expr;
+                if (slot.getOriginalColumn().isPresent()) {
+                    
groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());

Review Comment:
   `String#toLowerCase()` without an explicit locale can produce incorrect 
results under locale-sensitive environments (e.g. Turkish locale). Use 
`toLowerCase(Locale.ROOT)` (and import `java.util.Locale`) for deterministic 
case-folding of identifiers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to