This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9fc0b9c44629efd2c0a09ade2262e0186a084856 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Oct 11 15:59:38 2024 +0800 [feat] reserve memory separately for sink and source operators (#41706) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 6 +- be/src/pipeline/exec/aggregation_sink_operator.h | 4 +- be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- be/src/pipeline/exec/exchange_sink_buffer.h | 4 +- be/src/pipeline/exec/exchange_source_operator.cpp | 4 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 80 ++++++++++++++++++---- be/src/pipeline/exec/hashjoin_build_sink.h | 4 +- be/src/pipeline/exec/operator.h | 14 ++-- .../pipeline/exec/partition_sort_sink_operator.cpp | 2 +- .../pipeline/exec/partition_sort_sink_operator.h | 2 +- .../exec/partitioned_aggregation_sink_operator.cpp | 4 +- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 42 ++++++++++-- .../exec/partitioned_hash_join_probe_operator.h | 4 ++ .../exec/partitioned_hash_join_sink_operator.cpp | 8 +-- .../exec/partitioned_hash_join_sink_operator.h | 7 +- be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_sink_operator.cpp | 2 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/pipeline_task.cpp | 55 +++++++++++---- be/src/pipeline/pipeline_task.h | 4 +- be/src/runtime/query_context.cpp | 10 +-- be/src/vec/common/hash_table/hash_map_context.h | 56 +++++++++++++++ be/src/vec/runtime/vdata_stream_recvr.cpp | 3 +- 26 files changed, 255 insertions(+), 72 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index bcbf83f6290..3dd43c3c4d8 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -718,7 +718,7 @@ Status AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& return Status::OK(); } -size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const { +size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) const { size_t size_to_reserve = std::visit( [&](auto&& arg) -> size_t { using HashTableCtxType = std::decay_t<decltype(arg)>; @@ -891,9 +891,9 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) { return Status::OK(); } -size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { +size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); - return local_state.get_reserve_mem_size(state); + return local_state.get_reserve_mem_size(state, eos); } Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 477df890b4f..8bf3a8493fc 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -97,7 +97,7 @@ protected: Status _create_agg_status(vectorized::AggregateDataPtr data); size_t _memory_usage() const; - size_t get_reserve_mem_size(RuntimeState* state) const; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) const; RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; @@ -170,7 +170,7 @@ public: Status reset_hash_table(RuntimeState* state); - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; using DataSinkOperatorX<AggSinkLocalState>::node_id; using DataSinkOperatorX<AggSinkLocalState>::operator_id; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index e11ffe0c48a..63b93ad4a59 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -332,7 +332,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block return Status::OK(); } -size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { +size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); return local_state._reserve_mem_size; } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index dee7ee27c30..f5b53314450 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -95,7 +95,7 @@ public: return !_partition_by_eq_expr_ctxs.empty() && _order_by_eq_expr_ctxs.empty(); } - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 23ba2220bec..60193b1d7e6 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -91,7 +91,7 @@ public: void set_low_memory_mode() { _total_queue_buffer_size_limit = 1024 * 1024; - _total_queue_blocks_count_limit = 1; + _total_queue_blocks_count_limit = 8; } void acquire(BroadcastPBlockHolder& holder); @@ -207,7 +207,7 @@ public: _set_ready_to_finish(_busy_channels == 0); } - void set_low_memory_mode() { _queue_capacity = 1; } + void set_low_memory_mode() { _queue_capacity = 8; } private: friend class ExchangeSinkLocalState; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index ff25a252cca..dd7b62bcadf 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -17,6 +17,8 @@ #include "exchange_source_operator.h" +#include <fmt/core.h> + #include <memory> #include "pipeline/exec/operator.h" @@ -71,7 +73,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1); for (size_t i = 0; i < queues.size(); i++) { deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "SHUFFLE_DATA_DEPENDENCY"); + fmt::format("SHUFFLE_DATA_DEPENDENCY_{}", i)); queues[i]->set_dependency(deps[i]); metrics[i] = _runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i), TUnit ::TIME_NS, timer_name, 1); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 3132b062af3..79c068a83c1 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -19,9 +19,11 @@ #include <string> +#include "common/exception.h" #include "exprs/bloom_filter_func.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/operator.h" +#include "vec/core/block.h" #include "vec/data_types/data_type_nullable.h" #include "vec/utils/template_helpers.hpp" @@ -110,7 +112,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { +size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) { if (!_should_build_hash_table) { return 0; } @@ -121,26 +123,74 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { size_t size_to_reserve = 0; - if (!_build_side_mutable_block.empty()) { + const size_t build_block_rows = _build_side_mutable_block.rows(); + if (build_block_rows != 0) { const auto bytes = _build_side_mutable_block.bytes(); const auto allocated_bytes = _build_side_mutable_block.allocated_bytes(); - if (allocated_bytes != 0 && ((bytes * 100) / allocated_bytes) >= 85) { - size_to_reserve += bytes; + const auto bytes_per_row = bytes / build_block_rows; + const auto estimated_size_of_next_block = bytes_per_row * state->batch_size(); + + // If the new size is greater than 95% of allocalted bytes, it maybe need to realloc. + if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes) >= 95) { + size_to_reserve += bytes + estimated_size_of_next_block; } } - const size_t rows = _build_side_mutable_block.rows() + state->batch_size(); - size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows); + if (eos) { + const size_t rows = build_block_rows + state->batch_size(); + size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows); - size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first - size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next + size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first + size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next - auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); - if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_OUTER_JOIN || - p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == TJoinOp::RIGHT_SEMI_JOIN) { - size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited + auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); + if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_OUTER_JOIN || + p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == TJoinOp::RIGHT_SEMI_JOIN) { + size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited + } + size_to_reserve += _evaluate_mem_usage; + + vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size()); + + if (build_block_rows > 0) { + auto block = _build_side_mutable_block.to_block(); + Defer defer([&]() { + _build_side_mutable_block = vectorized::MutableBlock(std::move(block)); + }); + vectorized::ColumnUInt8::MutablePtr null_map_val; + if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN) { + _convert_block_to_null(block); + // first row is mocked + for (int i = 0; i < block.columns(); i++) { + auto [column, is_const] = unpack_if_const(block.safe_get_by_position(i).column); + assert_cast<vectorized::ColumnNullable*>(column->assume_mutable().get()) + ->get_null_map_column() + .get_data() + .data()[0] = 1; + } + } + + null_map_val = vectorized::ColumnUInt8::create(); + null_map_val->get_data().assign(build_block_rows, (uint8_t)0); + + // Get the key column that needs to be built + Status st = _extract_join_column(block, null_map_val, raw_ptrs, _build_col_ids); + if (!st.ok()) { + throw Exception(st); + } + + std::visit(vectorized::Overload {[&](std::monostate& arg) { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& hash_map_context) { + size_to_reserve += hash_map_context.estimated_size( + raw_ptrs, block.rows(), true, true, + bucket_size); + }}, + *_shared_state->hash_table_variants); + } } - size_to_reserve += _evaluate_mem_usage; return size_to_reserve; } @@ -702,9 +752,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* return Status::OK(); } -size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { +size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); - return local_state.get_reserve_mem_size(state); + return local_state.get_reserve_mem_size(state, eos); } size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 0ced9a77207..b4a60fa362d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -54,7 +54,7 @@ public: Status close(RuntimeState* state, Status exec_status) override; - [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state); + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); protected: void _hash_table_init(RuntimeState* state); @@ -124,7 +124,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 984d2f00bad..649bc70c238 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -109,11 +109,6 @@ public: virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } - // If this method is not overwrite by child, its default value is 1MB - [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { - return state->minimum_operator_memory_required_bytes(); - } - virtual Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { return Status::OK(); @@ -480,6 +475,10 @@ public: [[nodiscard]] virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; + [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) { + return state->minimum_operator_memory_required_bytes(); + } + template <class TARGET> TARGET& cast() { DCHECK(dynamic_cast<TARGET*>(this)) @@ -749,6 +748,11 @@ public: return Status::OK(); } + // If this method is not overwrite by child, its default value is 1MB + [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { + return state->minimum_operator_memory_required_bytes(); + } + virtual std::string debug_string(int indentation_level = 0) const; virtual std::string debug_string(RuntimeState* state, int indentation_level = 0) const; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index f62d5d13600..129743c3d7d 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -249,7 +249,7 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition( return Status::OK(); } -size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { +size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); auto rows = state->batch_size(); size_t reserve_mem_size = std::visit( diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 5eccb6ac9ca..0f1f3cd1b2c 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -274,7 +274,7 @@ public: return {ExchangeType::PASSTHROUGH}; } - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; private: friend class PartitionSortSinkLocalState; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index f84a3636800..1a86fdb2a9d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -254,10 +254,10 @@ Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) return sink_local_state->open(state); } -size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { +size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); auto* runtime_state = local_state._runtime_state.get(); - auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state); + auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state, eos); COUNTER_SET(local_state._memory_usage_reserved, int64_t(size)); return size; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 1124ba48c35..9b70da54943 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -346,7 +346,7 @@ public: Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) override; - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; private: friend class PartitionedAggSinkLocalState; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index bb65f098151..7b60c9a3e2f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -23,6 +23,7 @@ #include <algorithm> #include <utility> +#include "common/logging.h" #include "common/status.h" #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" @@ -67,6 +68,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _spill_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", 1); _recovery_probe_blocks = ADD_COUNTER(profile(), "SpillRecoveryProbeBlocks", TUnit::UNIT); _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillRecoveryProbeTime", 1); + _get_child_next_timer = ADD_TIMER_WITH_LEVEL(profile(), "GetChildNextTime", 1); + _memory_usage_reserved = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::UNIT, 1); @@ -870,6 +873,35 @@ size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat return mem_size; } +size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + const auto need_to_spill = local_state._shared_state->need_to_spill; + if (!need_to_spill || !local_state._child_eos) { + return Base::get_reserve_mem_size(state); + } + + size_t size_to_reserve = vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM; + + if (local_state._need_to_setup_internal_operators) { + const size_t rows = + (local_state._recovered_build_block ? local_state._recovered_build_block->rows() + : 0) + + state->batch_size(); + size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows); + + size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first + size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next + + if (_join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::RIGHT_OUTER_JOIN || + _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN) { + size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited + } + } + + COUNTER_SET(local_state._memory_usage_reserved, int64_t(size_to_reserve)); + return size_to_reserve; +} + Status PartitionedHashJoinProbeOperatorX::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); @@ -925,7 +957,6 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori bool* eos) { *eos = false; auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.exec_time_counter()); const auto need_to_spill = local_state._shared_state->need_to_spill; #ifndef NDEBUG Defer eos_check_defer([&] { @@ -944,16 +975,19 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori }); if (need_more_input_data(state)) { - RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), - &local_state._child_eos)); + { + SCOPED_TIMER(local_state._get_child_next_timer); + RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), + &local_state._child_eos)); + } + SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._child_block->rows() == 0 && !local_state._child_eos) { return Status::OK(); } Defer defer([&] { local_state._child_block->clear_column_data(); }); if (need_to_spill) { - SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(push(state, local_state._child_block.get(), local_state._child_eos)); if (_should_revoke_memory(state)) { return _revoke_memory(state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 0ffa446f181..e66b730685b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -144,6 +144,8 @@ private: RuntimeProfile::Counter* _join_filter_timer = nullptr; RuntimeProfile::Counter* _build_output_block_timer = nullptr; RuntimeProfile::Counter* _memory_usage_reserved = nullptr; + + RuntimeProfile::Counter* _get_child_next_timer = nullptr; }; class PartitionedHashJoinProbeOperatorX final @@ -183,6 +185,8 @@ public: size_t revocable_mem_size(RuntimeState* state) const override; + size_t get_reserve_mem_size(RuntimeState* state) override; + void set_inner_operators(const std::shared_ptr<HashJoinBuildSinkOperatorX>& sink_operator, const std::shared_ptr<HashJoinProbeOperatorX>& probe_operator) { _inner_sink_operator = sink_operator; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index b02c7ee0971..f7d38fe9d5d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -117,7 +117,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state return mem_size; } -size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* state) { +size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) { size_t size_to_reserve = 0; auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); if (_shared_state->need_to_spill) { @@ -125,7 +125,7 @@ size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta } else { if (_shared_state->inner_runtime_state) { size_to_reserve = p._inner_sink_operator->get_reserve_mem_size( - _shared_state->inner_runtime_state.get()); + _shared_state->inner_runtime_state.get(), eos); } } @@ -680,9 +680,9 @@ Status PartitionedHashJoinSinkOperatorX::revoke_memory( return local_state.revoke_memory(state, spill_context); } -size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { +size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); - return local_state.get_reserve_mem_size(state); + return local_state.get_reserve_mem_size(state, eos); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index d8cb67ca08d..d3725997882 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -25,6 +25,7 @@ #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" #include "pipeline/exec/spill_utils.h" +#include "vec/core/block.h" #include "vec/runtime/partitioner.h" namespace doris { @@ -45,7 +46,7 @@ public: Status close(RuntimeState* state, Status exec_status) override; Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context); size_t revocable_mem_size(RuntimeState* state) const; - [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state); + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) @@ -69,6 +70,8 @@ protected: std::atomic<bool> _spill_status_ok {true}; std::mutex _spill_lock; + vectorized::Block _pending_block; + bool _child_eos {false}; Status _spill_status; @@ -110,7 +113,7 @@ public: Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) override; - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 7e62892f516..1db187ef307 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -196,7 +196,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe( } template <bool is_intersect> -size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state) { +size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); return local_state._estimate_memory_usage; } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 542f6652dd4..5ba248d9fda 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -102,7 +102,7 @@ public: std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; private: void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state); diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index e15cecd22ed..df5d8d44f0a 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -207,7 +207,7 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState } template <bool is_intersect> -size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state) { +size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state, bool eos) { auto& local_state = get_local_state(state); size_t size_to_reserve = std::visit( [&](auto&& arg) -> size_t { diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index be600ff6fd5..ed9aa1afead 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -95,7 +95,7 @@ public: } bool require_shuffled_data_distribution() const override { return true; } - size_t get_reserve_mem_size(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; private: template <class HashTableContext, bool is_intersected> diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5509fcb19ed..b3282810d86 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -43,6 +43,7 @@ #include "util/mem_info.h" #include "util/runtime_profile.h" #include "util/uid_util.h" +#include "vec/core/block.h" #include "vec/spill/spill_stream.h" namespace doris { @@ -316,7 +317,7 @@ Status PipelineTask::execute(bool* eos) { SCOPED_ATTACH_TASK(_state); _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream; *eos = _eos; - if (_eos) { + if (_eos && !_pending_block) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. return Status::OK(); } @@ -384,10 +385,8 @@ Status PipelineTask::execute(bool* eos) { Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed"); return status; }); - - DEFER_RELEASE_RESERVED(); // Every loop should check if memory is not enough. - _state->get_query_ctx()->update_low_memory_mode(); + // _state->get_query_ctx()->update_low_memory_mode(); // `_dry_run` means sink operator need no more data // `_sink->is_finished(_state)` means sink operator should be finished @@ -396,13 +395,19 @@ Status PipelineTask::execute(bool* eos) { if (_dry_run || _sink->is_finished(_state)) { *eos = true; _eos = true; + } else if (_pending_block) [[unlikely]] { + LOG(INFO) << "query: " << print_id(query_id) + << " has pending block, size: " << _pending_block->allocated_bytes(); + _block = std::move(_pending_block); + block = _block.get(); } else { SCOPED_TIMER(_get_block_timer); + DEFER_RELEASE_RESERVED(); _get_block_counter->update(1); - size_t sink_reserve_size = _sink->get_reserve_mem_size(_state); - sink_reserve_size = - std::max(sink_reserve_size, _state->minimum_operator_memory_required_bytes()); - reserve_size = _root->get_reserve_mem_size(_state) + sink_reserve_size; + // size_t sink_reserve_size = _sink->get_reserve_mem_size(_state); + // sink_reserve_size = + // std::max(sink_reserve_size, _state->minimum_operator_memory_required_bytes()); + reserve_size = _root->get_reserve_mem_size(_state); _root->reset_reserve_mem_size(_state); auto workload_group = _state->get_query_ctx()->workload_group(); @@ -414,14 +419,14 @@ Status PipelineTask::execute(bool* eos) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " << PrettyPrinter::print(reserve_size, TUnit::BYTES) - << "(sink reserve size:(" - << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) - << "), sink name: " << _sink->get_name() - << ", node id: " << _sink->node_id() << " failed: " << st.to_string() + << ", sink name: " << _sink->get_name() + << ", node id: " << _sink->node_id() + << ", task id: " << _state->task_id() + << ", failed: " << st.to_string() << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); _state->get_query_ctx()->update_paused_reason(st); - _state->get_query_ctx()->set_low_memory_mode(); + // _state->get_query_ctx()->set_low_memory_mode(); bool is_high_wartermark = false; bool is_low_wartermark = false; workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); @@ -440,6 +445,28 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); Status status = Status::OK(); + DEFER_RELEASE_RESERVED(); + COUNTER_UPDATE(_memory_reserve_times, 1); + size_t sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); + status = thread_context()->try_reserve_memory(sink_reserve_size); + if (!status.ok()) { + LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " + << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) + << ", sink name: " << _sink->get_name() + << ", node id: " << _sink->node_id() << ", task id: " << _state->task_id() + << ", failed: " << status.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + _state->get_query_ctx()->update_paused_reason(status); + _memory_sufficient_dependency->block(); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this(), sink_reserve_size); + _pending_block = std::move(_block); + _block = vectorized::Block::create_unique(); + _eos = *eos; + *eos = false; + continue; + } + // Define a lambda function to catch sink exception, because sink will check // return error status with EOF, it is special, could not return directly. auto sink_function = [&]() -> Status { @@ -570,7 +597,7 @@ std::string PipelineTask::debug_string() { } size_t PipelineTask::get_revocable_size() const { - if (_running || _eos) { + if (_running || (_eos && !_pending_block)) { return 0; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 448b2ec5f6f..44dfdd7832a 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -261,7 +261,9 @@ private: RuntimeState* _state = nullptr; int _previous_schedule_id = -1; uint32_t _schedule_time = 0; - std::unique_ptr<doris::vectorized::Block> _block; + std::unique_ptr<vectorized::Block> _block; + std::unique_ptr<vectorized::Block> _pending_block; + PipelineFragmentContext* _fragment_context = nullptr; TaskQueue* _task_queue = nullptr; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2ea548fd6f5..61d961b6c0e 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -472,7 +472,7 @@ Status QueryContext::revoke_memory() { // Do not use memlimit, use current memory usage. // For example, if current limit is 1.6G, but current used is 1G, if reserve failed // should free 200MB memory, not 300MB - //const int64_t target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2); + const auto target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2); size_t revoked_size = 0; std::vector<pipeline::PipelineTask*> chosen_tasks; @@ -481,10 +481,10 @@ Status QueryContext::revoke_memory() { revoked_size += revocable_size; // Only revoke the largest task to ensure memory is used as much as possible - break; - //if (revoked_size >= target_revoking_size) { - // break; - //} + // break; + if (revoked_size >= target_revoking_size) { + break; + } } std::weak_ptr<QueryContext> this_ctx = shared_from_this(); diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 2d0b46150b1..bd5bab0be97 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -73,6 +73,10 @@ struct MethodBaseInner { const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) = 0; + [[nodiscard]] virtual size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, + bool is_join = false, bool is_build = false, + uint32_t bucket_size = 0) = 0; + virtual size_t serialized_keys_size(bool is_build) const { return 0; } void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const uint8_t* null_map) { @@ -215,6 +219,22 @@ struct MethodSerialized : public MethodBase<TData> { return {begin, sum_size}; } + size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, bool is_join, + bool is_build, uint32_t bucket_size) override { + size_t size = 0; + for (const auto& column : key_columns) { + size += column->byte_size(); + } + + size += sizeof(StringRef) * num_rows; // stored_keys + if (is_join) { + size += sizeof(uint32_t) * num_rows; // bucket_nums + } else { + size += sizeof(size_t) * num_rows; // hash_values + } + return size; + } + void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, DorisVector<StringRef>& input_keys, Arena& input_arena) { input_arena.clear(); @@ -299,6 +319,18 @@ struct MethodStringNoCache : public MethodBase<TData> { : (_stored_keys.size() * sizeof(StringRef)); } + size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, bool is_join, + bool is_build, uint32_t bucket_size) override { + size_t size = 0; + size += sizeof(StringRef) * num_rows; // stored_keys + if (is_join) { + size += sizeof(uint32_t) * num_rows; // bucket_nums + } else { + size += sizeof(size_t) * num_rows; // hash_values + } + return size; + } + void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, DorisVector<StringRef>& stored_keys) { const IColumn& column = *key_columns[0]; @@ -354,6 +386,17 @@ struct MethodOneNumber : public MethodBase<TData> { using State = ColumnsHashing::HashMethodOneNumber<typename Base::Value, typename Base::Mapped, FieldType>; + size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, bool is_join, + bool is_build, uint32_t bucket_size) override { + size_t size = 0; + if (is_join) { + size += sizeof(uint32_t) * num_rows; // bucket_nums + } else { + size += sizeof(size_t) * num_rows; // hash_values + } + return size; + } + void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) override { @@ -468,6 +511,19 @@ struct MethodKeysFixed : public MethodBase<TData> { return (is_build ? build_stored_keys.size() : stored_keys.size()) * sizeof(typename Base::Key); } + + size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, bool is_join, + bool is_build, uint32_t bucket_size) override { + size_t size = 0; + size += sizeof(StringRef) * num_rows; // stored_keys + if (is_join) { + size += sizeof(uint32_t) * num_rows; // bucket_nums + } else { + size += sizeof(size_t) * num_rows; // hash_values + } + return size; + } + void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) override { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 6ff1fcc0aca..a83f8d485a3 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -347,7 +347,8 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang _sender_to_local_channel_dependency.resize(num_queues); for (size_t i = 0; i < num_queues; i++) { _sender_to_local_channel_dependency[i] = pipeline::Dependency::create_shared( - _dest_node_id, _dest_node_id, "LocalExchangeChannelDependency", true); + _dest_node_id, _dest_node_id, fmt::format("LocalExchangeChannelDependency_{}", i), + true); } _sender_queues.reserve(num_queues); int num_sender_per_queue = is_merging ? 1 : num_senders; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org