This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 5924d31ef47 fix sort spill, support low mem mod of data_stream_recvr and improve log 5924d31ef47 is described below commit 5924d31ef47e7340f779d42a4d8d19b4dd92fc8b Author: jacktengg <18241664+jackte...@users.noreply.github.com> AuthorDate: Wed Oct 30 17:51:35 2024 +0800 fix sort spill, support low mem mod of data_stream_recvr and improve log --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 +++--- be/src/pipeline/exec/operator.h | 8 ++++-- be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++++ be/src/pipeline/exec/sort_sink_operator.h | 4 +++ be/src/pipeline/exec/spill_sort_sink_operator.cpp | 10 +++++++ be/src/pipeline/exec/spill_sort_sink_operator.h | 3 ++ .../workload_group/workload_group_manager.cpp | 23 ++++++++++++---- be/src/vec/common/sort/sorter.cpp | 32 ++++++++++++++++++++++ be/src/vec/common/sort/sorter.h | 4 +++ be/src/vec/runtime/vdata_stream_recvr.cpp | 3 ++ 10 files changed, 89 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index a334f57859b..360e7087d15 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -23,6 +23,7 @@ #include "exprs/bloom_filter_func.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/operator.h" +#include "util/pretty_printer.h" #include "vec/core/block.h" #include "vec/data_types/data_type_nullable.h" #include "vec/utils/template_helpers.hpp" @@ -129,10 +130,9 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo const auto allocated_bytes = _build_side_mutable_block.allocated_bytes(); const auto bytes_per_row = bytes / build_block_rows; const auto estimated_size_of_next_block = bytes_per_row * state->batch_size(); - - // If the new size is greater than 95% of allocalted bytes, it maybe need to realloc. + // If the new size is greater than 85% of allocalted bytes, it maybe need to realloc. if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes) >= 85) { - size_to_reserve += bytes + estimated_size_of_next_block; + size_to_reserve += (size_t)(allocated_bytes * 1.15); } } @@ -355,7 +355,8 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, } LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " << block.columns() - << ", bytes/allocated_bytes: " << block.bytes() << "/" << block.allocated_bytes(); + << ", bytes/allocated_bytes: " << PrettyPrinter::print_bytes(block.bytes()) << "/" + << PrettyPrinter::print_bytes(block.allocated_bytes()); COUNTER_UPDATE(_build_rows_counter, rows); block.replace_if_overflow(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 7c0bd44a664..054e969ac04 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -302,8 +302,9 @@ public: auto* read_file_bytes = Base::profile()->get_counter("SpillReadFileBytes"); Base::_query_statistics->add_spill_bytes( write_block_bytes ? write_block_bytes->value() : 0, - write_file_bytes ? write_file_bytes->value() : 0, read_block_bytes->value(), - read_file_bytes->value()); + write_file_bytes ? write_file_bytes->value() : 0, + read_block_bytes ? read_block_bytes->value() : 0, + read_file_bytes ? read_file_bytes->value() : 0); } return Base::close(state); } @@ -747,7 +748,8 @@ public: auto* read_block_bytes = Base::profile()->get_counter("SpillReadBlockBytes"); auto* read_file_bytes = Base::profile()->get_counter("SpillReadFileBytes"); Base::_query_statistics->add_spill_bytes( - write_block_bytes->value(), write_file_bytes->value(), + write_block_bytes ? write_block_bytes->value() : 0, + write_file_bytes ? write_file_bytes->value() : 0, read_block_bytes ? read_block_bytes->value() : 0, read_file_bytes ? read_file_bytes->value() : 0); } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index ee8689a8084..d0f30ac48c1 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -148,6 +148,11 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in return Status::OK(); } +size_t SortSinkOperatorX::get_reserve_mem_size_for_next_sink(RuntimeState* state, bool eos) { + auto& local_state = get_local_state(state); + return local_state._shared_state->sorter->get_reserve_mem_size(state, eos); +} + size_t SortSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); return local_state._shared_state->sorter->data_size(); diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 8462472dd02..43e8e59f3de 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -36,6 +36,8 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); + private: friend class SortSinkOperatorX; @@ -77,6 +79,8 @@ public: size_t get_revocable_mem_size(RuntimeState* state) const; + size_t get_reserve_mem_size_for_next_sink(RuntimeState* state, bool eos); + Status prepare_for_spill(RuntimeState* state); Status merge_sort_read_for_spill(RuntimeState* state, doris::vectorized::Block* block, diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 03e9f33553e..00a60a4c747 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -136,6 +136,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) { return _sort_sink_operator->open(state); } +size_t SpillSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { + auto& local_state = get_local_state(state); + return local_state.get_reserve_mem_size(state, eos); +} Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); @@ -190,6 +194,12 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } +size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) { + auto& parent = Base::_parent->template cast<Parent>(); + return parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(), + eos); +} + Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { if (!_shared_state->is_spilled) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 086d93a970c..8984b1e43de 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -41,6 +41,7 @@ public: Dependency* finishdependency() override { return _finish_dependency.get(); } Status setup_in_memory_sort_op(RuntimeState* state); + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context); private: @@ -86,6 +87,8 @@ public: return _sort_sink_operator->set_child(child); } + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; + size_t revocable_mem_size(RuntimeState* state) const override; Status revoke_memory(RuntimeState* state, diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 853f3740551..b2c33f2d378 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -31,6 +31,7 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" #include "util/mem_info.h" +#include "util/pretty_printer.h" #include "util/threadpool.h" #include "util/time.h" #include "vec/core/block.h" @@ -637,12 +638,24 @@ bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_ return true; } else { // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "query({}) reserve memory failed, but could not find memory that " - "could " - "release or spill to disk(memory usage:{}, limit: {})", + auto msg1 = fmt::format( + "query {} reserve memory failed, but could not find memory that could " + "release or spill to disk. Query memory usage: {}, limit: {}, process " + "memory info: {}" + ", wg info: {}.", query_id, PrettyPrinter::print_bytes(memory_usage), - PrettyPrinter::print_bytes(query_ctx->get_mem_limit()))); + PrettyPrinter::print_bytes(query_ctx->get_mem_limit()), + GlobalMemoryArbitrator::process_memory_used_details_str(), + query_ctx->workload_group()->memory_debug_string()); + auto msg2 = msg1 + fmt::format( + " Query Memory Tracker Summary: {}." + " Load Memory Tracker Summary: {}", + MemTrackerLimiter::make_type_trackers_profile_str( + MemTrackerLimiter::Type::QUERY), + MemTrackerLimiter::make_type_trackers_profile_str( + MemTrackerLimiter::Type::LOAD)); + LOG(INFO) << msg2; + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); } } else { if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) { diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 72bf35f3cba..0a9875c0019 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -215,6 +215,38 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)) {} +size_t FullSorter::get_reserve_mem_size(RuntimeState* state, bool eos) const { + size_t size_to_reserve = 0; + const auto rows = _state->unsorted_block_->rows(); + if (rows != 0) { + const auto bytes = _state->unsorted_block_->bytes(); + const auto allocated_bytes = _state->unsorted_block_->allocated_bytes(); + const auto bytes_per_row = bytes / rows; + const auto estimated_size_of_next_block = bytes_per_row * state->batch_size(); + auto new_block_bytes = estimated_size_of_next_block + bytes; + auto new_rows = rows + state->batch_size(); + // If the new size is greater than 85% of allocalted bytes, it maybe need to realloc. + if ((new_block_bytes * 100 / allocated_bytes) >= 85) { + size_to_reserve += (size_t)(allocated_bytes * 1.15); + } + auto sort = new_rows > buffered_block_size_ || new_block_bytes > buffered_block_bytes_; + if (sort) { + // new column is created when doing sort, reserve average size of one column + // for estimation + size_to_reserve += new_block_bytes / _state->unsorted_block_->columns(); + + // helping data structures used during sorting + size_to_reserve += new_rows * sizeof(IColumn::Permutation::value_type); + + auto sort_columns_count = _vsort_exec_exprs.lhs_ordering_expr_ctxs().size(); + if (1 != sort_columns_count) { + size_to_reserve += new_rows * sizeof(EqualRangeIterator); + } + } + } + return size_to_reserve; +} + Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); { diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index aa7d88dfbc2..f89f996fd36 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -121,6 +121,8 @@ public: virtual size_t data_size() const = 0; + virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) const { return 0; } + // for topn runtime predicate const SortDescription& get_sort_description() const { return _sort_description; } virtual Field get_top_value() { return Field {Field::Types::Null}; } @@ -171,6 +173,8 @@ public: size_t data_size() const override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) const override; + Status merge_sort_read_for_spill(RuntimeState* state, doris::vectorized::Block* block, int batch_size, bool* eos) override; void reset() override; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index a83f8d485a3..fa65175172f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -405,6 +405,9 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_n int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr) { + if (_parent->state()->get_query_ctx()->low_memory_mode()) { + set_low_memory_mode(); + } SCOPED_ATTACH_TASK(_query_thread_context); int use_sender_id = _is_merging ? sender_id : 0; return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org