This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b37cf43fc [Bug](spill) fix wrong offset process on spill sort 
(#53672)
24b37cf43fc is described below

commit 24b37cf43fcdfb752b1004cfad3241712222927b
Author: Pxl <x...@selectdb.com>
AuthorDate: Mon Jul 28 14:27:41 2025 +0800

    [Bug](spill) fix wrong offset process on spill sort (#53672)
    
    fix wrong offset process on spill sort
---
 be/src/pipeline/common/partition_sort_utils.cpp    |   2 +-
 be/src/pipeline/dependency.h                       |   2 +
 .../pipeline/exec/partition_sort_sink_operator.cpp |   2 +-
 be/src/pipeline/exec/sort_sink_operator.cpp        |   4 +-
 be/src/pipeline/exec/sort_sink_operator.h          |   3 ++
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |   7 +++-
 .../pipeline/exec/spill_sort_source_operator.cpp   |  14 +++++--
 be/src/vec/common/sort/heap_sorter.cpp             |   5 ++-
 be/src/vec/common/sort/heap_sorter.h               |   2 +-
 be/src/vec/common/sort/partition_sorter.cpp        |   5 ++-
 be/src/vec/common/sort/partition_sorter.h          |   2 +-
 be/src/vec/common/sort/sorter.cpp                  |   7 +++-
 be/src/vec/common/sort/sorter.h                    |   6 ++-
 be/src/vec/common/sort/topn_sorter.cpp             |   5 ++-
 be/src/vec/common/sort/topn_sorter.h               |   2 +-
 be/test/testutil/mock/mock_sorter.h                |   2 +-
 be/test/vec/exec/sort/heap_sorter_test.cpp         |   2 +-
 be/test/vec/exec/sort/partition_sorter_test.cpp    |   6 +--
 be/test/vec/exec/sort/sort_test.cpp                |   2 +-
 .../data/query_p0/sort_spill/sort_spill.out        | Bin 0 -> 280 bytes
 .../suites/query_p0/sort_spill/sort_spill.groovy   |  41 +++++++++++++++++++++
 21 files changed, 97 insertions(+), 24 deletions(-)

diff --git a/be/src/pipeline/common/partition_sort_utils.cpp 
b/be/src/pipeline/common/partition_sort_utils.cpp
index b6fdbd5915e..cf6b20048db 100644
--- a/be/src/pipeline/common/partition_sort_utils.cpp
+++ b/be/src/pipeline/common/partition_sort_utils.cpp
@@ -75,7 +75,7 @@ Status PartitionBlocks::do_partition_topn_sort() {
         RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get()));
     }
     _blocks.clear();
-    RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
+    RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read(false));
     bool current_eos = false;
     while (!current_eos) {
         // output_block maybe need better way
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index eec06b115cc..3a3f497330c 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -529,6 +529,8 @@ struct SpillSortSharedState : public BasicSharedState,
     SortSharedState* in_mem_shared_state = nullptr;
     bool enable_spill = false;
     bool is_spilled = false;
+    int64_t limit = -1;
+    int64_t offset = 0;
     std::atomic_bool is_closed = false;
     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
 
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index ea3b94d4320..cc759ad0547 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 RETURN_IF_ERROR(sorter->append_block(block.get()));
             }
             local_state._value_places[i]->_blocks.clear();
-            RETURN_IF_ERROR(sorter->prepare_for_read());
+            RETURN_IF_ERROR(sorter->prepare_for_read(false));
             INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> lc(
                     local_state._shared_state->prepared_finish_lock));
             sorter->set_prepared_finish();
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 8b72bf6b9d1..2a7b329a10b 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -160,7 +160,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in
     }
 
     if (eos) {
-        RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read());
+        
RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read(false));
         local_state._dependency->set_ready_to_read();
     }
     return Status::OK();
@@ -178,7 +178,7 @@ size_t 
SortSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const {
 
 Status SortSinkOperatorX::prepare_for_spill(RuntimeState* state) {
     auto& local_state = get_local_state(state);
-    return local_state._shared_state->sorter->prepare_for_read();
+    return local_state._shared_state->sorter->prepare_for_read(true);
 }
 
 Status SortSinkOperatorX::merge_sort_read_for_spill(RuntimeState* state,
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index be37cef07dc..a8d1e9005b2 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -101,6 +101,9 @@ public:
                                      int batch_size, bool* eos);
     void reset(RuntimeState* state);
 
+    int64_t limit() const { return _limit; }
+    int64_t offset() const { return _offset; }
+
 private:
     friend class SortSinkLocalState;
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index d6ba8ec6414..2a10baaf093 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -161,7 +161,8 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
             }
         } else {
             RETURN_IF_ERROR(
-                    
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
+                    
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read(
+                            false));
             local_state._dependency->set_ready_to_read();
         }
     }
@@ -176,8 +177,11 @@ size_t 
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
 
 Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                               const 
std::shared_ptr<SpillContext>& spill_context) {
+    auto& parent = Base::_parent->template cast<Parent>();
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
+        _shared_state->limit = parent._sort_sink_operator->limit();
+        _shared_state->offset = parent._sort_sink_operator->offset();
         custom_profile()->add_info_string("Spilled", "true");
     }
 
@@ -193,7 +197,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
 
     _shared_state->sorted_streams.emplace_back(_spilling_stream);
 
-    auto& parent = Base::_parent->template cast<Parent>();
     auto query_id = state->query_id();
 
     auto spill_func = [this, state, query_id, &parent] {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index de28bf60305..550e7789346 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -19,6 +19,8 @@
 
 #include <glog/logging.h>
 
+#include <cstdint>
+
 #include "common/status.h"
 #include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
@@ -179,10 +181,16 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 Status SpillSortLocalState::_create_intermediate_merger(
         int num_blocks, const vectorized::SortDescription& sort_description) {
     std::vector<vectorized::BlockSupplier> child_block_suppliers;
+    int64_t limit = -1;
+    int64_t offset = 0;
+    if (num_blocks >= _shared_state->sorted_streams.size()) {
+        // final round use real limit and offset
+        limit = Base::_shared_state->limit;
+        offset = Base::_shared_state->offset;
+    }
+
     _merger = std::make_unique<vectorized::VSortedRunMerger>(
-            sort_description, _runtime_state->batch_size(),
-            Base::_shared_state->in_mem_shared_state->sorter->limit(),
-            Base::_shared_state->in_mem_shared_state->sorter->offset(), 
custom_profile());
+            sort_description, _runtime_state->batch_size(), limit, offset, 
custom_profile());
 
     _current_merging_streams.clear();
     for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty(); 
++i) {
diff --git a/be/src/vec/common/sort/heap_sorter.cpp 
b/be/src/vec/common/sort/heap_sorter.cpp
index ec12e61ebf2..c1b6a735afd 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -51,7 +51,10 @@ Status HeapSorter::append_block(Block* block) {
     return Status::OK();
 }
 
-Status HeapSorter::prepare_for_read() {
+Status HeapSorter::prepare_for_read(bool is_spill) {
+    if (is_spill) {
+        return Status::InternalError("HeapSorter does not support spill");
+    }
     while (_queue.is_valid()) {
         auto [current, current_rows] = _queue.current();
         if (current_rows) {
diff --git a/be/src/vec/common/sort/heap_sorter.h 
b/be/src/vec/common/sort/heap_sorter.h
index 51b14ff0f1b..35108cbafc2 100644
--- a/be/src/vec/common/sort/heap_sorter.h
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -34,7 +34,7 @@ public:
 
     Status append_block(Block* block) override;
 
-    Status prepare_for_read() override;
+    Status prepare_for_read(bool is_spill) override;
 
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
 
diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index 0e22bf2fc21..305a803c9e0 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -63,7 +63,10 @@ Status PartitionSorter::append_block(Block* input_block) {
     return Status::OK();
 }
 
-Status PartitionSorter::prepare_for_read() {
+Status PartitionSorter::prepare_for_read(bool is_spill) {
+    if (is_spill) {
+        return Status::InternalError("PartitionSorter does not support spill");
+    }
     auto& blocks = _state->get_sorted_block();
     auto& queue = _state->get_queue();
     std::vector<MergeSortCursor> cursors;
diff --git a/be/src/vec/common/sort/partition_sorter.h 
b/be/src/vec/common/sort/partition_sorter.h
index d20ea1bd220..e7d3f37941f 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -88,7 +88,7 @@ public:
 
     Status append_block(Block* block) override;
 
-    Status prepare_for_read() override;
+    Status prepare_for_read(bool is_spill) override;
 
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
 
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 4901b4d77b0..951281f1383 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -262,7 +262,12 @@ Status FullSorter::append_block(Block* block) {
     return Status::OK();
 }
 
-Status FullSorter::prepare_for_read() {
+Status FullSorter::prepare_for_read(bool is_spill) {
+    if (is_spill) {
+        _limit += _offset;
+        _offset = 0;
+        _state->ignore_offset();
+    }
     if (_state->unsorted_block()->rows() > 0) {
         RETURN_IF_ERROR(_do_sort());
     }
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index ca33a9eacfa..149939b9bd9 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -83,6 +83,8 @@ public:
 
     std::unique_ptr<Block>& unsorted_block() { return _unsorted_block; }
 
+    void ignore_offset() { _offset = 0; }
+
 private:
     void _merge_sort_read_impl(int batch_size, doris::vectorized::Block* 
block, bool* eos);
 
@@ -129,7 +131,7 @@ public:
 
     virtual Status append_block(Block* block) = 0;
 
-    virtual Status prepare_for_read() = 0;
+    virtual Status prepare_for_read(bool is_spill) = 0;
 
     virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0;
 
@@ -182,7 +184,7 @@ public:
 
     Status append_block(Block* block) override;
 
-    Status prepare_for_read() override;
+    Status prepare_for_read(bool is_spill) override;
 
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
 
diff --git a/be/src/vec/common/sort/topn_sorter.cpp 
b/be/src/vec/common/sort/topn_sorter.cpp
index fe3cecca5cd..daacd064118 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -53,7 +53,10 @@ Status TopNSorter::append_block(Block* block) {
     return Status::OK();
 }
 
-Status TopNSorter::prepare_for_read() {
+Status TopNSorter::prepare_for_read(bool is_spill) {
+    if (is_spill) {
+        return Status::InternalError("TopN sorter does not support spill");
+    }
     return _state->build_merge_tree(_sort_description);
 }
 
diff --git a/be/src/vec/common/sort/topn_sorter.h 
b/be/src/vec/common/sort/topn_sorter.h
index 54a2e838ffc..80e280cd802 100644
--- a/be/src/vec/common/sort/topn_sorter.h
+++ b/be/src/vec/common/sort/topn_sorter.h
@@ -52,7 +52,7 @@ public:
 
     Status append_block(Block* block) override;
 
-    Status prepare_for_read() override;
+    Status prepare_for_read(bool is_spill) override;
 
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
 
diff --git a/be/test/testutil/mock/mock_sorter.h 
b/be/test/testutil/mock/mock_sorter.h
index 8b31484741c..30e87f741ca 100644
--- a/be/test/testutil/mock/mock_sorter.h
+++ b/be/test/testutil/mock/mock_sorter.h
@@ -24,7 +24,7 @@ struct MockSorter : public Sorter {
     MockSorter() = default;
     Status append_block(Block* block) override { return Status::OK(); }
 
-    Status prepare_for_read() override { return Status::OK(); }
+    Status prepare_for_read(bool is_spill) override { return Status::OK(); }
 
     Status get_next(RuntimeState* state, Block* block, bool* eos) override {
         *eos = true;
diff --git a/be/test/vec/exec/sort/heap_sorter_test.cpp 
b/be/test/vec/exec/sort/heap_sorter_test.cpp
index ed83368c294..14be58b1618 100644
--- a/be/test/vec/exec/sort/heap_sorter_test.cpp
+++ b/be/test/vec/exec/sort/heap_sorter_test.cpp
@@ -111,7 +111,7 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
         EXPECT_EQ(value, real);
     }
 
-    EXPECT_TRUE(sorter->prepare_for_read());
+    EXPECT_TRUE(sorter->prepare_for_read(false));
 
     {
         Block block;
diff --git a/be/test/vec/exec/sort/partition_sorter_test.cpp 
b/be/test/vec/exec/sort/partition_sorter_test.cpp
index 00baa50826a..9c8fad5ff47 100644
--- a/be/test/vec/exec/sort/partition_sorter_test.cpp
+++ b/be/test/vec/exec/sort/partition_sorter_test.cpp
@@ -94,7 +94,7 @@ TEST_F(PartitionSorterTest, 
test_partition_sorter_read_row_num) {
     }
 
     {
-        auto st = sorter->prepare_for_read();
+        auto st = sorter->prepare_for_read(false);
         EXPECT_TRUE(st.ok()) << st.msg();
     }
     {
@@ -140,7 +140,7 @@ TEST_F(PartitionSorterTest, 
test_partition_sorter_DENSE_RANK) {
     }
 
     {
-        auto st = sorter->prepare_for_read();
+        auto st = sorter->prepare_for_read(false);
         EXPECT_TRUE(st.ok()) << st.msg();
     }
     {
@@ -179,7 +179,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_RANK) {
     }
 
     {
-        auto st = sorter->prepare_for_read();
+        auto st = sorter->prepare_for_read(false);
         EXPECT_TRUE(st.ok()) << st.msg();
     }
     {
diff --git a/be/test/vec/exec/sort/sort_test.cpp 
b/be/test/vec/exec/sort/sort_test.cpp
index 505c0076174..ceea6bb3bf6 100644
--- a/be/test/vec/exec/sort/sort_test.cpp
+++ b/be/test/vec/exec/sort/sort_test.cpp
@@ -87,7 +87,7 @@ public:
         EXPECT_TRUE(sorter->append_block(&block).ok());
     }
 
-    void prepare_for_read() { EXPECT_TRUE(sorter->prepare_for_read().ok()); }
+    void prepare_for_read() { 
EXPECT_TRUE(sorter->prepare_for_read(false).ok()); }
 
     void check_sort_column(ColumnPtr column) {
         MutableBlock 
sorted_block(VectorizedUtils::create_columns_with_type_and_name(*row_desc));
diff --git a/regression-test/data/query_p0/sort_spill/sort_spill.out 
b/regression-test/data/query_p0/sort_spill/sort_spill.out
new file mode 100644
index 00000000000..00c0dd6f19a
Binary files /dev/null and 
b/regression-test/data/query_p0/sort_spill/sort_spill.out differ
diff --git a/regression-test/suites/query_p0/sort_spill/sort_spill.groovy 
b/regression-test/suites/query_p0/sort_spill/sort_spill.groovy
new file mode 100644
index 00000000000..13f7cfa6fa9
--- /dev/null
+++ b/regression-test/suites/query_p0/sort_spill/sort_spill.groovy
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("sort_spill") {
+    sql """
+        drop table if exists d_table;
+    """
+    sql """
+       create table d_table(
+        k1 int,
+        k2 int,
+        ) distributed by random buckets 10
+        properties ("replication_num"="1");
+    """
+    sql """
+        insert into d_table select e1,e1 from (select 1 k1) as t lateral view 
explode_numbers(10000) tmp1 as e1;
+    """
+    sql """ set parallel_pipeline_task_num = 2; """
+    sql """ set batch_size = 100; """
+    sql """ set enable_force_spill=true; """
+    sql """ set enable_topn_lazy_materialization=false;"""
+    sql """ set enable_reserve_memory=true; """
+    sql """ set force_sort_algorithm = "full"; """
+    sql """ set enable_parallel_result_sink=true; """
+    qt_select_1 "select k1,row_number () over (ORDER BY k2 DESC) from d_table 
order by k1 limit 10 offset 9900;"
+    qt_select_2 "select k1,row_number () over (ORDER BY k2 DESC) from d_table 
order by k1 limit 10;"
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to