This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b95038ed42d [fix](be) Preserve shuffle for serial merge aggregation
(#63529)
b95038ed42d is described below
commit b95038ed42ddf4a1b2a992f180b92a6eff9c6142
Author: Pxl <[email protected]>
AuthorDate: Tue May 26 14:27:59 2026 +0800
[fix](be) Preserve shuffle for serial merge aggregation (#63529)
Problem Summary: PR #62438 added `enable_local_exchange_before_agg` to
allow skipping local exchange before non-finalizing aggregation. That
optimization used `!_needs_finalize` as the condition, but
non-finalizing aggregation includes both first-phase update/serialize
aggregation and merge/serialize aggregation.
When `experimental_use_serial_exchange` is enabled and
`enable_local_exchange_before_agg` is disabled, a serial exchange source
can be followed by the default `PASSTHROUGH` local exchange before a
non-finalizing merge aggregation. For DISTINCT aggregation, the merge
aggregation is the stage that deduplicates distinct keys after hash
exchange. `PASSTHROUGH` only restores local parallelism and does not
preserve key distribution, so duplicate distinct keys can be processed
by different local tasks and later partial sums can produce incorrect
results.
This PR keeps the knob behavior for aggregation stages that can safely
skip the local hash exchange, but excludes merge aggregation with a
serial child from the skip path. That case falls through to the existing
`HASH_SHUFFLE` / `BUCKET_HASH_SHUFFLE` distribution requirement. The PR
also computes the merge flag during `AggSinkOperatorX::init()` because
local exchange planning runs before `prepare()`.
### Release note
Fix occasional incorrect DISTINCT aggregate results when serial exchange
is enabled.
---
be/src/exec/operator/aggregation_sink_operator.cpp | 5 +----
be/src/exec/operator/aggregation_sink_operator.h | 3 ++-
.../agg_operator_group_by_limit_opt_test.cpp | 8 ++++++-
be/test/exec/operator/agg_operator_test.cpp | 8 ++++++-
.../data/nereids_syntax_p0/agg_4_phase.out | 2 ++
.../suites/nereids_syntax_p0/agg_4_phase.groovy | 25 +++++++++++++++++++++-
6 files changed, 43 insertions(+), 8 deletions(-)
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 0808361ad74..8e40a53d3d4 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -880,6 +880,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
tnode.agg_node.__isset.agg_sort_infos ?
tnode.agg_node.agg_sort_infos[i] : dummy,
tnode.agg_node.grouping_exprs.empty(), false, &evaluator));
_aggregate_evaluators.push_back(evaluator);
+ _is_merge |= evaluator->is_merge();
}
if (tnode.agg_node.__isset.agg_sort_info_by_group_key) {
@@ -952,7 +953,6 @@ Status
AggSinkOperatorX::_init_aggregate_evaluators(RuntimeState* state) {
Status AggSinkOperatorX::_calc_aggregate_evaluators() {
_offsets_of_aggregate_states.resize(_aggregate_evaluators.size());
- _is_merge = false;
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
@@ -975,9 +975,6 @@ Status AggSinkOperatorX::_calc_aggregate_evaluators() {
(_total_size_of_aggregate_states + alignment_of_next_state
- 1) /
alignment_of_next_state * alignment_of_next_state;
}
- if (_aggregate_evaluators[i]->is_merge()) {
- _is_merge = true;
- }
}
return Status::OK();
}
diff --git a/be/src/exec/operator/aggregation_sink_operator.h
b/be/src/exec/operator/aggregation_sink_operator.h
index f0cc6b70d4e..605bb62a1dd 100644
--- a/be/src/exec/operator/aggregation_sink_operator.h
+++ b/be/src/exec/operator/aggregation_sink_operator.h
@@ -163,7 +163,8 @@ public:
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(
state);
}
- if (!_needs_finalize && !state->enable_local_exchange_before_agg()) {
+ if (!_needs_finalize && !state->enable_local_exchange_before_agg() &&
+ !(_is_merge && _child && _child->is_serial_operator())) {
return
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(state);
}
return _is_colocate && _require_bucket_distribution
diff --git a/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
b/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
index 26f312ea9c9..0e5d2f83248 100644
--- a/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
+++ b/be/test/exec/operator/agg_operator_group_by_limit_opt_test.cpp
@@ -80,7 +80,13 @@ struct MockAggsinkOperator : public AggSinkOperatorX {
Status _init_probe_expr_ctx(RuntimeState* state) override { return
Status::OK(); }
- Status _init_aggregate_evaluators(RuntimeState* state) override { return
Status::OK(); }
+ Status _init_aggregate_evaluators(RuntimeState* state) override {
+ _is_merge = false;
+ for (auto* evaluator : _aggregate_evaluators) {
+ _is_merge |= evaluator->is_merge();
+ }
+ return Status::OK();
+ }
Status _check_agg_fn_output() override { return Status::OK(); }
};
diff --git a/be/test/exec/operator/agg_operator_test.cpp
b/be/test/exec/operator/agg_operator_test.cpp
index ae750013c84..75592bfa097 100644
--- a/be/test/exec/operator/agg_operator_test.cpp
+++ b/be/test/exec/operator/agg_operator_test.cpp
@@ -81,7 +81,13 @@ struct MockAggsinkOperator : public AggSinkOperatorX {
Status _init_probe_expr_ctx(RuntimeState* state) override { return
Status::OK(); }
- Status _init_aggregate_evaluators(RuntimeState* state) override { return
Status::OK(); }
+ Status _init_aggregate_evaluators(RuntimeState* state) override {
+ _is_merge = false;
+ for (auto* evaluator : _aggregate_evaluators) {
+ _is_merge |= evaluator->is_merge();
+ }
+ return Status::OK();
+ }
Status _check_agg_fn_output() override { return Status::OK(); }
};
diff --git a/regression-test/data/nereids_syntax_p0/agg_4_phase.out
b/regression-test/data/nereids_syntax_p0/agg_4_phase.out
index 97a97b8816d..0d22b7039e9 100644
--- a/regression-test/data/nereids_syntax_p0/agg_4_phase.out
+++ b/regression-test/data/nereids_syntax_p0/agg_4_phase.out
@@ -17,3 +17,5 @@
2 -4,-4 1 b 1
3 -4 1 f 1
+-- !serial_exchange_distinct_sum --
+45 44850
diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
index e38c92018ff..772ba119b16 100644
--- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
+++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
@@ -85,11 +85,27 @@ suite("agg_4_phase") {
from agg_4_phase_tbl2
group by id
order by id"""
+
+ sql "drop table if exists agg_4_phase_serial_exchange_tbl"
+ sql """
+ create table agg_4_phase_serial_exchange_tbl (
+ pk int,
+ x int,
+ y int
+ ) engine=olap
+ duplicate key(pk)
+ distributed by hash(pk) buckets 10
+ properties("replication_num"="1");
+ """
+ sql """
+ insert into agg_4_phase_serial_exchange_tbl
+ select number, number % 10, number from numbers("number" = "300");
+ """
multi_sql """
set runtime_filter_type= "BLOOM_FILTER,MIN_MAX";
set enable_runtime_filter_prune= "false";
set exchange_multi_blocks_byte_size= "4722978";
-set parallel_pipeline_task_num= "3";
+set parallel_pipeline_task_num= "4";
set experimental_parallel_scan_min_rows_per_scanner= "256";
set enable_strong_consistency_read= "true";
set runtime_filter_wait_infinitely= "true";
@@ -97,6 +113,7 @@ set enable_share_hash_table_for_broadcast_join= "false";
set experimental_parallel_scan_max_scanners_count= "8";
set disable_streaming_preaggregations= "true";
set experimental_use_serial_exchange= "true";
+set enable_local_exchange_before_agg= "false";
"""
qt_phase4_multi_distinct """
select
@@ -108,4 +125,10 @@ set experimental_use_serial_exchange= "true";
from agg_4_phase_tbl2
group by id
order by id"""
+
+ qt_serial_exchange_distinct_sum """
+ select
+ sum(distinct x),
+ sum(y)
+ from agg_4_phase_serial_exchange_tbl"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]