This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 20000e6fdc0 [Exec](agg) Fix agg limit result error (#37025) 20000e6fdc0 is described below commit 20000e6fdc0815021579cbc137f43d7bb6fc2ac7 Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon Jul 1 09:49:04 2024 +0800 [Exec](agg) Fix agg limit result error (#37025) Before merge #34853, should merge the pr firstly --- be/src/pipeline/dependency.cpp | 10 ++++++---- be/src/pipeline/dependency.h | 3 ++- be/src/pipeline/exec/aggregation_sink_operator.cpp | 4 +++- be/src/pipeline/exec/aggregation_source_operator.cpp | 8 +++++++- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 68c00af409d..4938883062a 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -248,7 +248,8 @@ void AggSharedState::build_limit_heap(size_t hash_table_size) { limit_columns_min = limit_heap.top()._row_id; } -bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows) { +bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows, + const std::vector<int>* key_locs) { if (num_rows) { cmp_res.resize(num_rows); need_computes.resize(num_rows); @@ -257,9 +258,10 @@ bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows) const auto key_size = null_directions.size(); for (int i = 0; i < key_size; i++) { - block->get_by_position(i).column->compare_internal( - limit_columns_min, *limit_columns[i], null_directions[i], order_directions[i], - cmp_res, need_computes.data()); + block->get_by_position(key_locs ? key_locs->operator[](i) : i) + .column->compare_internal(limit_columns_min, *limit_columns[i], + null_directions[i], order_directions[i], cmp_res, + need_computes.data()); } auto set_computes_arr = [](auto* __restrict res, auto* __restrict computes, int rows) { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 5214022db13..8adc24d3b4e 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -311,7 +311,8 @@ public: Status reset_hash_table(); - bool do_limit_filter(vectorized::Block* block, size_t num_rows); + bool do_limit_filter(vectorized::Block* block, size_t num_rows, + const std::vector<int>* key_locs = nullptr); void build_limit_heap(size_t hash_table_size); // We should call this function only at 1st phase. diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index fae987394b4..1dab1669dd5 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -329,6 +329,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b if (limit) { need_do_agg = _emplace_into_hash_table_limit(_places.data(), block, key_locs, key_columns, rows); + rows = block->rows(); } else { _emplace_into_hash_table(_places.data(), key_columns, rows); } @@ -589,7 +590,8 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData bool need_filter = false; { SCOPED_TIMER(_hash_table_limit_compute_timer); - need_filter = _shared_state->do_limit_filter(block, num_rows); + need_filter = + _shared_state->do_limit_filter(block, num_rows, &key_locs); } auto& need_computes = _shared_state->need_computes; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 5b371877f36..1b7a151e2af 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -452,8 +452,14 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { if (_shared_state->reach_limit) { if (_shared_state->do_sort_limit && _shared_state->do_limit_filter(block, block->rows())) { vectorized::Block::filter_block_internal(block, _shared_state->need_computes); + if (auto rows = block->rows()) { + _num_rows_returned += rows; + COUNTER_UPDATE(_blocks_returned_counter, 1); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + } + } else { + reached_limit(block, eos); } - reached_limit(block, eos); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org