This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b95038ed42d [fix](be) Preserve shuffle for serial merge aggregation 
(#63529)
b95038ed42d is described below

commit b95038ed42ddf4a1b2a992f180b92a6eff9c6142
Author: Pxl <[email protected]>
AuthorDate: Tue May 26 14:27:59 2026 +0800

    [fix](be) Preserve shuffle for serial merge aggregation (#63529)
    
    Problem Summary: PR #62438 added `enable_local_exchange_before_agg` to
    allow skipping local exchange before non-finalizing aggregation. That
    optimization used `!_needs_finalize` as the condition, but
    non-finalizing aggregation includes both first-phase update/serialize
    aggregation and merge/serialize aggregation.
    
    When `experimental_use_serial_exchange` is enabled and
    `enable_local_exchange_before_agg` is disabled, a serial exchange source
    can be followed by the default `PASSTHROUGH` local exchange before a
    non-finalizing merge aggregation. For DISTINCT aggregation, the merge
    aggregation is the stage that deduplicates distinct keys after hash
    exchange. `PASSTHROUGH` only restores local parallelism and does not
    preserve key distribution, so duplicate distinct keys can be processed
    by different local tasks and later partial sums can produce incorrect
    results.
    
    This PR keeps the knob behavior for aggregation stages that can safely
    skip the local hash exchange, but excludes merge aggregation with a
    serial child from the skip path. That case falls through to the existing
    `HASH_SHUFFLE` / `BUCKET_HASH_SHUFFLE` distribution requirement. The PR
    also computes the merge flag during `AggSinkOperatorX::init()` because
    local exchange planning runs before `prepare()`.
    
    ### Release note
    
    Fix occasional incorrect DISTINCT aggregate results when serial exchange
    is enabled.
---
 be/src/exec/operator/aggregation_sink_operator.cpp |  5 +----
 be/src/exec/operator/aggregation_sink_operator.h   |  3 ++-
 .../agg_operator_group_by_limit_opt_test.cpp       |  8 ++++++-
 be/test/exec/operator/agg_operator_test.cpp        |  8 ++++++-
 .../data/nereids_syntax_p0/agg_4_phase.out         |  2 ++
 .../suites/nereids_syntax_p0/agg_4_phase.groovy    | 25 +++++++++++++++++++++-
 6 files changed, 43 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp 
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 0808361ad74..8e40a53d3d4 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -880,6 +880,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
                 tnode.agg_node.__isset.agg_sort_infos ? 
tnode.agg_node.agg_sort_infos[i] : dummy,
                 tnode.agg_node.grouping_exprs.empty(), false, &evaluator));
         _aggregate_evaluators.push_back(evaluator);
+        _is_merge |= evaluator->is_merge();
     }
 
     if (tnode.agg_node.__isset.agg_sort_info_by_group_key) {
@@ -952,7 +953,6 @@ Status 
AggSinkOperatorX::_init_aggregate_evaluators(RuntimeState* state) {
 
 Status AggSinkOperatorX::_calc_aggregate_evaluators() {
     _offsets_of_aggregate_states.resize(_aggregate_evaluators.size());
-    _is_merge = false;
     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
         _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
 
@@ -975,9 +975,6 @@ Status AggSinkOperatorX::_calc_aggregate_evaluators() {
                     (_total_size_of_aggregate_states + alignment_of_next_state 
- 1) /
                     alignment_of_next_state * alignment_of_next_state;
         }
-        if (_aggregate_evaluators[i]->is_merge()) {
-            _is_merge = true;
-        }
     }
     return Status::OK();
 }
diff --git a/be/src/exec/operator/aggregation_sink_operator.h 
b/be/src/exec/operator/aggregation_sink_operator.h
index f0cc6b70d4e..605bb62a1dd 100644
--- a/be/src/exec/operator/aggregation_sink_operator.h
+++ b/be/src/exec/operator/aggregation_sink_operator.h
@@ -163,7 +163,8 @@ public:
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(
                                      state);
         }
-        if (!_needs_finalize && !state->enable_local_exchange_before_agg()) {
+        if (!_needs_finalize && !state->enable_local_exchange_before_agg() &&
+            !(_is_merge && _child && _child->is_serial_operator())) {
             return 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(state);
         }
         return _is_colocate && _require_bucket_distribution
diff --git a/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp 
b/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
index 26f312ea9c9..0e5d2f83248 100644
--- a/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
+++ b/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
@@ -80,7 +80,13 @@ struct MockAggsinkOperator : public AggSinkOperatorX {
 
     Status _init_probe_expr_ctx(RuntimeState* state) override { return 
Status::OK(); }
 
-    Status _init_aggregate_evaluators(RuntimeState* state) override { return 
Status::OK(); }
+    Status _init_aggregate_evaluators(RuntimeState* state) override {
+        _is_merge = false;
+        for (auto* evaluator : _aggregate_evaluators) {
+            _is_merge |= evaluator->is_merge();
+        }
+        return Status::OK();
+    }
 
     Status _check_agg_fn_output() override { return Status::OK(); }
 };
diff --git a/be/test/exec/operator/agg_operator_test.cpp 
b/be/test/exec/operator/agg_operator_test.cpp
index ae750013c84..75592bfa097 100644
--- a/be/test/exec/operator/agg_operator_test.cpp
+++ b/be/test/exec/operator/agg_operator_test.cpp
@@ -81,7 +81,13 @@ struct MockAggsinkOperator : public AggSinkOperatorX {
 
     Status _init_probe_expr_ctx(RuntimeState* state) override { return 
Status::OK(); }
 
-    Status _init_aggregate_evaluators(RuntimeState* state) override { return 
Status::OK(); }
+    Status _init_aggregate_evaluators(RuntimeState* state) override {
+        _is_merge = false;
+        for (auto* evaluator : _aggregate_evaluators) {
+            _is_merge |= evaluator->is_merge();
+        }
+        return Status::OK();
+    }
 
     Status _check_agg_fn_output() override { return Status::OK(); }
 };
diff --git a/regression-test/data/nereids_syntax_p0/agg_4_phase.out 
b/regression-test/data/nereids_syntax_p0/agg_4_phase.out
index 97a97b8816d..0d22b7039e9 100644
--- a/regression-test/data/nereids_syntax_p0/agg_4_phase.out
+++ b/regression-test/data/nereids_syntax_p0/agg_4_phase.out
@@ -17,3 +17,5 @@
 2      -4,-4   1       b       1
 3      -4      1       f       1
 
+-- !serial_exchange_distinct_sum --
+45     44850
diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy 
b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
index e38c92018ff..772ba119b16 100644
--- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
+++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
@@ -85,11 +85,27 @@ suite("agg_4_phase") {
         from agg_4_phase_tbl2
         group by id
         order by id"""
+
+    sql "drop table if exists agg_4_phase_serial_exchange_tbl"
+    sql """
+        create table agg_4_phase_serial_exchange_tbl (
+            pk int,
+            x int,
+            y int
+        ) engine=olap
+        duplicate key(pk)
+        distributed by hash(pk) buckets 10
+        properties("replication_num"="1");
+    """
+    sql """
+        insert into agg_4_phase_serial_exchange_tbl
+        select number, number % 10, number from numbers("number" = "300");
+    """
     multi_sql """
 set runtime_filter_type= "BLOOM_FILTER,MIN_MAX";
 set enable_runtime_filter_prune= "false";
 set exchange_multi_blocks_byte_size= "4722978";
-set parallel_pipeline_task_num= "3";
+set parallel_pipeline_task_num= "4";
 set experimental_parallel_scan_min_rows_per_scanner= "256";
 set enable_strong_consistency_read= "true";
 set runtime_filter_wait_infinitely= "true";
@@ -97,6 +113,7 @@ set enable_share_hash_table_for_broadcast_join= "false";
 set experimental_parallel_scan_max_scanners_count= "8";
 set disable_streaming_preaggregations= "true";
 set experimental_use_serial_exchange= "true";
+set enable_local_exchange_before_agg= "false";
     """
         qt_phase4_multi_distinct """
         select
@@ -108,4 +125,10 @@ set experimental_use_serial_exchange= "true";
         from agg_4_phase_tbl2
         group by id
         order by id"""
+
+    qt_serial_exchange_distinct_sum """
+        select
+            sum(distinct x),
+            sum(y)
+        from agg_4_phase_serial_exchange_tbl"""
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to