This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 24b37cf43fc [Bug](spill) fix wrong offset process on spill sort (#53672) 24b37cf43fc is described below commit 24b37cf43fcdfb752b1004cfad3241712222927b Author: Pxl <x...@selectdb.com> AuthorDate: Mon Jul 28 14:27:41 2025 +0800 [Bug](spill) fix wrong offset process on spill sort (#53672) fix wrong offset process on spill sort --- be/src/pipeline/common/partition_sort_utils.cpp | 2 +- be/src/pipeline/dependency.h | 2 + .../pipeline/exec/partition_sort_sink_operator.cpp | 2 +- be/src/pipeline/exec/sort_sink_operator.cpp | 4 +- be/src/pipeline/exec/sort_sink_operator.h | 3 ++ be/src/pipeline/exec/spill_sort_sink_operator.cpp | 7 +++- .../pipeline/exec/spill_sort_source_operator.cpp | 14 +++++-- be/src/vec/common/sort/heap_sorter.cpp | 5 ++- be/src/vec/common/sort/heap_sorter.h | 2 +- be/src/vec/common/sort/partition_sorter.cpp | 5 ++- be/src/vec/common/sort/partition_sorter.h | 2 +- be/src/vec/common/sort/sorter.cpp | 7 +++- be/src/vec/common/sort/sorter.h | 6 ++- be/src/vec/common/sort/topn_sorter.cpp | 5 ++- be/src/vec/common/sort/topn_sorter.h | 2 +- be/test/testutil/mock/mock_sorter.h | 2 +- be/test/vec/exec/sort/heap_sorter_test.cpp | 2 +- be/test/vec/exec/sort/partition_sorter_test.cpp | 6 +-- be/test/vec/exec/sort/sort_test.cpp | 2 +- .../data/query_p0/sort_spill/sort_spill.out | Bin 0 -> 280 bytes .../suites/query_p0/sort_spill/sort_spill.groovy | 41 +++++++++++++++++++++ 21 files changed, 97 insertions(+), 24 deletions(-) diff --git a/be/src/pipeline/common/partition_sort_utils.cpp b/be/src/pipeline/common/partition_sort_utils.cpp index b6fdbd5915e..cf6b20048db 100644 --- a/be/src/pipeline/common/partition_sort_utils.cpp +++ b/be/src/pipeline/common/partition_sort_utils.cpp @@ -75,7 +75,7 @@ Status PartitionBlocks::do_partition_topn_sort() { RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get())); } _blocks.clear(); - RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read()); + RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read(false)); bool current_eos = false; while (!current_eos) { // output_block maybe need better way diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index eec06b115cc..3a3f497330c 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -529,6 +529,8 @@ struct SpillSortSharedState : public BasicSharedState, SortSharedState* in_mem_shared_state = nullptr; bool enable_spill = false; bool is_spilled = false; + int64_t limit = -1; + int64_t offset = 0; std::atomic_bool is_closed = false; std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index ea3b94d4320..cc759ad0547 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR(sorter->append_block(block.get())); } local_state._value_places[i]->_blocks.clear(); - RETURN_IF_ERROR(sorter->prepare_for_read()); + RETURN_IF_ERROR(sorter->prepare_for_read(false)); INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> lc( local_state._shared_state->prepared_finish_lock)); sorter->set_prepared_finish(); diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 8b72bf6b9d1..2a7b329a10b 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -160,7 +160,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in } if (eos) { - RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read()); + RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read(false)); local_state._dependency->set_ready_to_read(); } return Status::OK(); @@ -178,7 +178,7 @@ size_t SortSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const { Status SortSinkOperatorX::prepare_for_spill(RuntimeState* state) { auto& local_state = get_local_state(state); - return local_state._shared_state->sorter->prepare_for_read(); + return local_state._shared_state->sorter->prepare_for_read(true); } Status SortSinkOperatorX::merge_sort_read_for_spill(RuntimeState* state, diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index be37cef07dc..a8d1e9005b2 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -101,6 +101,9 @@ public: int batch_size, bool* eos); void reset(RuntimeState* state); + int64_t limit() const { return _limit; } + int64_t offset() const { return _offset; } + private: friend class SortSinkLocalState; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index d6ba8ec6414..2a10baaf093 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -161,7 +161,8 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc } } else { RETURN_IF_ERROR( - local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read()); + local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read( + false)); local_state._dependency->set_ready_to_read(); } } @@ -176,8 +177,11 @@ size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { + auto& parent = Base::_parent->template cast<Parent>(); if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; + _shared_state->limit = parent._sort_sink_operator->limit(); + _shared_state->offset = parent._sort_sink_operator->offset(); custom_profile()->add_info_string("Spilled", "true"); } @@ -193,7 +197,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, _shared_state->sorted_streams.emplace_back(_spilling_stream); - auto& parent = Base::_parent->template cast<Parent>(); auto query_id = state->query_id(); auto spill_func = [this, state, query_id, &parent] { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index de28bf60305..550e7789346 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -19,6 +19,8 @@ #include <glog/logging.h> +#include <cstdint> + #include "common/status.h" #include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" @@ -179,10 +181,16 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat Status SpillSortLocalState::_create_intermediate_merger( int num_blocks, const vectorized::SortDescription& sort_description) { std::vector<vectorized::BlockSupplier> child_block_suppliers; + int64_t limit = -1; + int64_t offset = 0; + if (num_blocks >= _shared_state->sorted_streams.size()) { + // final round use real limit and offset + limit = Base::_shared_state->limit; + offset = Base::_shared_state->offset; + } + _merger = std::make_unique<vectorized::VSortedRunMerger>( - sort_description, _runtime_state->batch_size(), - Base::_shared_state->in_mem_shared_state->sorter->limit(), - Base::_shared_state->in_mem_shared_state->sorter->offset(), custom_profile()); + sort_description, _runtime_state->batch_size(), limit, offset, custom_profile()); _current_merging_streams.clear(); for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty(); ++i) { diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index ec12e61ebf2..c1b6a735afd 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -51,7 +51,10 @@ Status HeapSorter::append_block(Block* block) { return Status::OK(); } -Status HeapSorter::prepare_for_read() { +Status HeapSorter::prepare_for_read(bool is_spill) { + if (is_spill) { + return Status::InternalError("HeapSorter does not support spill"); + } while (_queue.is_valid()) { auto [current, current_rows] = _queue.current(); if (current_rows) { diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h index 51b14ff0f1b..35108cbafc2 100644 --- a/be/src/vec/common/sort/heap_sorter.h +++ b/be/src/vec/common/sort/heap_sorter.h @@ -34,7 +34,7 @@ public: Status append_block(Block* block) override; - Status prepare_for_read() override; + Status prepare_for_read(bool is_spill) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 0e22bf2fc21..305a803c9e0 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -63,7 +63,10 @@ Status PartitionSorter::append_block(Block* input_block) { return Status::OK(); } -Status PartitionSorter::prepare_for_read() { +Status PartitionSorter::prepare_for_read(bool is_spill) { + if (is_spill) { + return Status::InternalError("PartitionSorter does not support spill"); + } auto& blocks = _state->get_sorted_block(); auto& queue = _state->get_queue(); std::vector<MergeSortCursor> cursors; diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index d20ea1bd220..e7d3f37941f 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -88,7 +88,7 @@ public: Status append_block(Block* block) override; - Status prepare_for_read() override; + Status prepare_for_read(bool is_spill) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 4901b4d77b0..951281f1383 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -262,7 +262,12 @@ Status FullSorter::append_block(Block* block) { return Status::OK(); } -Status FullSorter::prepare_for_read() { +Status FullSorter::prepare_for_read(bool is_spill) { + if (is_spill) { + _limit += _offset; + _offset = 0; + _state->ignore_offset(); + } if (_state->unsorted_block()->rows() > 0) { RETURN_IF_ERROR(_do_sort()); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index ca33a9eacfa..149939b9bd9 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -83,6 +83,8 @@ public: std::unique_ptr<Block>& unsorted_block() { return _unsorted_block; } + void ignore_offset() { _offset = 0; } + private: void _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); @@ -129,7 +131,7 @@ public: virtual Status append_block(Block* block) = 0; - virtual Status prepare_for_read() = 0; + virtual Status prepare_for_read(bool is_spill) = 0; virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0; @@ -182,7 +184,7 @@ public: Status append_block(Block* block) override; - Status prepare_for_read() override; + Status prepare_for_read(bool is_spill) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index fe3cecca5cd..daacd064118 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -53,7 +53,10 @@ Status TopNSorter::append_block(Block* block) { return Status::OK(); } -Status TopNSorter::prepare_for_read() { +Status TopNSorter::prepare_for_read(bool is_spill) { + if (is_spill) { + return Status::InternalError("TopN sorter does not support spill"); + } return _state->build_merge_tree(_sort_description); } diff --git a/be/src/vec/common/sort/topn_sorter.h b/be/src/vec/common/sort/topn_sorter.h index 54a2e838ffc..80e280cd802 100644 --- a/be/src/vec/common/sort/topn_sorter.h +++ b/be/src/vec/common/sort/topn_sorter.h @@ -52,7 +52,7 @@ public: Status append_block(Block* block) override; - Status prepare_for_read() override; + Status prepare_for_read(bool is_spill) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/test/testutil/mock/mock_sorter.h b/be/test/testutil/mock/mock_sorter.h index 8b31484741c..30e87f741ca 100644 --- a/be/test/testutil/mock/mock_sorter.h +++ b/be/test/testutil/mock/mock_sorter.h @@ -24,7 +24,7 @@ struct MockSorter : public Sorter { MockSorter() = default; Status append_block(Block* block) override { return Status::OK(); } - Status prepare_for_read() override { return Status::OK(); } + Status prepare_for_read(bool is_spill) override { return Status::OK(); } Status get_next(RuntimeState* state, Block* block, bool* eos) override { *eos = true; diff --git a/be/test/vec/exec/sort/heap_sorter_test.cpp b/be/test/vec/exec/sort/heap_sorter_test.cpp index ed83368c294..14be58b1618 100644 --- a/be/test/vec/exec/sort/heap_sorter_test.cpp +++ b/be/test/vec/exec/sort/heap_sorter_test.cpp @@ -111,7 +111,7 @@ TEST_F(HeapSorterTest, test_topn_sorter1) { EXPECT_EQ(value, real); } - EXPECT_TRUE(sorter->prepare_for_read()); + EXPECT_TRUE(sorter->prepare_for_read(false)); { Block block; diff --git a/be/test/vec/exec/sort/partition_sorter_test.cpp b/be/test/vec/exec/sort/partition_sorter_test.cpp index 00baa50826a..9c8fad5ff47 100644 --- a/be/test/vec/exec/sort/partition_sorter_test.cpp +++ b/be/test/vec/exec/sort/partition_sorter_test.cpp @@ -94,7 +94,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_read_row_num) { } { - auto st = sorter->prepare_for_read(); + auto st = sorter->prepare_for_read(false); EXPECT_TRUE(st.ok()) << st.msg(); } { @@ -140,7 +140,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_DENSE_RANK) { } { - auto st = sorter->prepare_for_read(); + auto st = sorter->prepare_for_read(false); EXPECT_TRUE(st.ok()) << st.msg(); } { @@ -179,7 +179,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_RANK) { } { - auto st = sorter->prepare_for_read(); + auto st = sorter->prepare_for_read(false); EXPECT_TRUE(st.ok()) << st.msg(); } { diff --git a/be/test/vec/exec/sort/sort_test.cpp b/be/test/vec/exec/sort/sort_test.cpp index 505c0076174..ceea6bb3bf6 100644 --- a/be/test/vec/exec/sort/sort_test.cpp +++ b/be/test/vec/exec/sort/sort_test.cpp @@ -87,7 +87,7 @@ public: EXPECT_TRUE(sorter->append_block(&block).ok()); } - void prepare_for_read() { EXPECT_TRUE(sorter->prepare_for_read().ok()); } + void prepare_for_read() { EXPECT_TRUE(sorter->prepare_for_read(false).ok()); } void check_sort_column(ColumnPtr column) { MutableBlock sorted_block(VectorizedUtils::create_columns_with_type_and_name(*row_desc)); diff --git a/regression-test/data/query_p0/sort_spill/sort_spill.out b/regression-test/data/query_p0/sort_spill/sort_spill.out new file mode 100644 index 00000000000..00c0dd6f19a Binary files /dev/null and b/regression-test/data/query_p0/sort_spill/sort_spill.out differ diff --git a/regression-test/suites/query_p0/sort_spill/sort_spill.groovy b/regression-test/suites/query_p0/sort_spill/sort_spill.groovy new file mode 100644 index 00000000000..13f7cfa6fa9 --- /dev/null +++ b/regression-test/suites/query_p0/sort_spill/sort_spill.groovy @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("sort_spill") { + sql """ + drop table if exists d_table; + """ + sql """ + create table d_table( + k1 int, + k2 int, + ) distributed by random buckets 10 + properties ("replication_num"="1"); + """ + sql """ + insert into d_table select e1,e1 from (select 1 k1) as t lateral view explode_numbers(10000) tmp1 as e1; + """ + sql """ set parallel_pipeline_task_num = 2; """ + sql """ set batch_size = 100; """ + sql """ set enable_force_spill=true; """ + sql """ set enable_topn_lazy_materialization=false;""" + sql """ set enable_reserve_memory=true; """ + sql """ set force_sort_algorithm = "full"; """ + sql """ set enable_parallel_result_sink=true; """ + qt_select_1 "select k1,row_number () over (ORDER BY k2 DESC) from d_table order by k1 limit 10 offset 9900;" + qt_select_2 "select k1,row_number () over (ORDER BY k2 DESC) from d_table order by k1 limit 10;" +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org