This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d8eefd0be89dba2d4283dce292fca2fa3f9cf203
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon May 27 17:07:34 2024 +0800

    [fix] fix wrong result of spill agg with limit (#35403)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  5 ++-
 .../pipeline/exec/aggregation_source_operator.cpp  |  5 ---
 be/src/pipeline/exec/aggregation_source_operator.h |  3 --
 regression-test/data/spill_p0/aggregate_spill.out  |  4 ++
 .../suites/spill_p0/aggregate_spill.groovy         | 43 ++++++++++++++++++++++
 5 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 5e28af6e7e1..730337561e8 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -142,7 +142,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
 
         _should_limit_output = p._limit != -1 &&       // has limit
                                (!p._have_conjuncts) && // no having conjunct
-                               p._needs_finalize;      // agg's finalize step
+                               p._needs_finalize &&    // agg's finalize step
+                               !Base::_shared_state->enable_spill;
     }
     // move _create_agg_status to open not in during prepare,
     // because during prepare and open thread is not the same one,
@@ -459,7 +460,7 @@ Status 
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
                     _places.data(), _agg_arena_pool));
         }
 
-        if (_should_limit_output && !Base::_shared_state->enable_spill) {
+        if (_should_limit_output) {
             _reach_limit = _get_hash_table_size() >=
                            Base::_parent->template 
cast<AggSinkOperatorX>()._limit;
             if (_reach_limit &&
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index e7648e09e0d..fadddee9034 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -525,11 +525,6 @@ Status 
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
                         _shared_state->agg_arena_pool.get(), rows);
             }
         }
-
-        if (_should_limit_output) {
-            _reach_limit = _get_hash_table_size() >=
-                           Base::_parent->template 
cast<AggSourceOperatorX>()._limit;
-        }
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 1d1f564d41a..5b85ec0f238 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -114,9 +114,6 @@ protected:
     };
 
     executor _executor;
-
-    bool _should_limit_output = false;
-    bool _reach_limit = false;
 };
 
 class AggSourceOperatorX : public OperatorX<AggLocalState> {
diff --git a/regression-test/data/spill_p0/aggregate_spill.out 
b/regression-test/data/spill_p0/aggregate_spill.out
new file mode 100644
index 00000000000..e7251ff58e9
--- /dev/null
+++ b/regression-test/data/spill_p0/aggregate_spill.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !aggregate_spill --
+5      1
+
diff --git a/regression-test/suites/spill_p0/aggregate_spill.groovy 
b/regression-test/suites/spill_p0/aggregate_spill.groovy
new file mode 100644
index 00000000000..180ab37200f
--- /dev/null
+++ b/regression-test/suites/spill_p0/aggregate_spill.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("aggregate_spill") {
+    sql """
+        set enable_agg_spill = true;
+    """
+    sql """
+        set enable_force_spill = true;
+    """
+    sql """
+        set min_revocable_mem = 1;
+    """
+    sql """
+        set parallel_pipeline_task_num = 4;
+    """
+    sql """
+        drop table if exists aggregate_spill_test;
+    """
+    sql """
+        CREATE TABLE `aggregate_spill_test` (k1 int, k2 int replace) 
distributed by hash(k1) properties("replication_num"="1");
+    """
+    sql """
+        insert into aggregate_spill_test values(1, 1), (2, 1), (3, 1), (4, 1), 
(5, 1);
+    """
+    qt_aggregate_spill """
+        select count(), k2 from aggregate_spill_test group by k2 limit 1;
+    """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to