This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d8eefd0be89dba2d4283dce292fca2fa3f9cf203 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon May 27 17:07:34 2024 +0800 [fix] fix wrong result of spill agg with limit (#35403) --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 5 ++- .../pipeline/exec/aggregation_source_operator.cpp | 5 --- be/src/pipeline/exec/aggregation_source_operator.h | 3 -- regression-test/data/spill_p0/aggregate_spill.out | 4 ++ .../suites/spill_p0/aggregate_spill.groovy | 43 ++++++++++++++++++++++ 5 files changed, 50 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 5e28af6e7e1..730337561e8 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -142,7 +142,8 @@ Status AggSinkLocalState::open(RuntimeState* state) { _should_limit_output = p._limit != -1 && // has limit (!p._have_conjuncts) && // no having conjunct - p._needs_finalize; // agg's finalize step + p._needs_finalize && // agg's finalize step + !Base::_shared_state->enable_spill; } // move _create_agg_status to open not in during prepare, // because during prepare and open thread is not the same one, @@ -459,7 +460,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block* _places.data(), _agg_arena_pool)); } - if (_should_limit_output && !Base::_shared_state->enable_spill) { + if (_should_limit_output) { _reach_limit = _get_hash_table_size() >= Base::_parent->template cast<AggSinkOperatorX>()._limit; if (_reach_limit && diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index e7648e09e0d..fadddee9034 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -525,11 +525,6 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block) _shared_state->agg_arena_pool.get(), rows); } } - - if (_should_limit_output) { - _reach_limit = _get_hash_table_size() >= - Base::_parent->template cast<AggSourceOperatorX>()._limit; - } } return Status::OK(); diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 1d1f564d41a..5b85ec0f238 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -114,9 +114,6 @@ protected: }; executor _executor; - - bool _should_limit_output = false; - bool _reach_limit = false; }; class AggSourceOperatorX : public OperatorX<AggLocalState> { diff --git a/regression-test/data/spill_p0/aggregate_spill.out b/regression-test/data/spill_p0/aggregate_spill.out new file mode 100644 index 00000000000..e7251ff58e9 --- /dev/null +++ b/regression-test/data/spill_p0/aggregate_spill.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !aggregate_spill -- +5 1 + diff --git a/regression-test/suites/spill_p0/aggregate_spill.groovy b/regression-test/suites/spill_p0/aggregate_spill.groovy new file mode 100644 index 00000000000..180ab37200f --- /dev/null +++ b/regression-test/suites/spill_p0/aggregate_spill.groovy @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("aggregate_spill") { + sql """ + set enable_agg_spill = true; + """ + sql """ + set enable_force_spill = true; + """ + sql """ + set min_revocable_mem = 1; + """ + sql """ + set parallel_pipeline_task_num = 4; + """ + sql """ + drop table if exists aggregate_spill_test; + """ + sql """ + CREATE TABLE `aggregate_spill_test` (k1 int, k2 int replace) distributed by hash(k1) properties("replication_num"="1"); + """ + sql """ + insert into aggregate_spill_test values(1, 1), (2, 1), (3, 1), (4, 1), (5, 1); + """ + qt_aggregate_spill """ + select count(), k2 from aggregate_spill_test group by k2 limit 1; + """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org