This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 20000e6fdc0 [Exec](agg) Fix agg limit result error (#37025)
20000e6fdc0 is described below

commit 20000e6fdc0815021579cbc137f43d7bb6fc2ac7
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Jul 1 09:49:04 2024 +0800

    [Exec](agg) Fix agg limit result error (#37025)
    
    Before merge #34853, should merge the pr firstly
---
 be/src/pipeline/dependency.cpp                       | 10 ++++++----
 be/src/pipeline/dependency.h                         |  3 ++-
 be/src/pipeline/exec/aggregation_sink_operator.cpp   |  4 +++-
 be/src/pipeline/exec/aggregation_source_operator.cpp |  8 +++++++-
 4 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 68c00af409d..4938883062a 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -248,7 +248,8 @@ void AggSharedState::build_limit_heap(size_t 
hash_table_size) {
     limit_columns_min = limit_heap.top()._row_id;
 }
 
-bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t 
num_rows) {
+bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows,
+                                     const std::vector<int>* key_locs) {
     if (num_rows) {
         cmp_res.resize(num_rows);
         need_computes.resize(num_rows);
@@ -257,9 +258,10 @@ bool AggSharedState::do_limit_filter(vectorized::Block* 
block, size_t num_rows)
 
         const auto key_size = null_directions.size();
         for (int i = 0; i < key_size; i++) {
-            block->get_by_position(i).column->compare_internal(
-                    limit_columns_min, *limit_columns[i], null_directions[i], 
order_directions[i],
-                    cmp_res, need_computes.data());
+            block->get_by_position(key_locs ? key_locs->operator[](i) : i)
+                    .column->compare_internal(limit_columns_min, 
*limit_columns[i],
+                                              null_directions[i], 
order_directions[i], cmp_res,
+                                              need_computes.data());
         }
 
         auto set_computes_arr = [](auto* __restrict res, auto* __restrict 
computes, int rows) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 5214022db13..8adc24d3b4e 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -311,7 +311,8 @@ public:
 
     Status reset_hash_table();
 
-    bool do_limit_filter(vectorized::Block* block, size_t num_rows);
+    bool do_limit_filter(vectorized::Block* block, size_t num_rows,
+                         const std::vector<int>* key_locs = nullptr);
     void build_limit_heap(size_t hash_table_size);
 
     // We should call this function only at 1st phase.
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index fae987394b4..1dab1669dd5 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -329,6 +329,7 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
         if (limit) {
             need_do_agg = _emplace_into_hash_table_limit(_places.data(), 
block, key_locs,
                                                          key_columns, rows);
+            rows = block->rows();
         } else {
             _emplace_into_hash_table(_places.data(), key_columns, rows);
         }
@@ -589,7 +590,8 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                         bool need_filter = false;
                         {
                             SCOPED_TIMER(_hash_table_limit_compute_timer);
-                            need_filter = 
_shared_state->do_limit_filter(block, num_rows);
+                            need_filter =
+                                    _shared_state->do_limit_filter(block, 
num_rows, &key_locs);
                         }
 
                         auto& need_computes = _shared_state->need_computes;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 5b371877f36..1b7a151e2af 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -452,8 +452,14 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, 
bool* eos) {
     if (_shared_state->reach_limit) {
         if (_shared_state->do_sort_limit && 
_shared_state->do_limit_filter(block, block->rows())) {
             vectorized::Block::filter_block_internal(block, 
_shared_state->need_computes);
+            if (auto rows = block->rows()) {
+                _num_rows_returned += rows;
+                COUNTER_UPDATE(_blocks_returned_counter, 1);
+                COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+            }
+        } else {
+            reached_limit(block, eos);
         }
-        reached_limit(block, eos);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to