This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit ca5bf9842abb014c089b4d212932aa09fd2d24ce Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Sat Aug 24 21:49:42 2024 +0800 [feat]reserve and spill --- be/src/clucene | 2 +- be/src/pipeline/exec/aggregation_sink_operator.cpp | 19 +++ be/src/pipeline/exec/aggregation_sink_operator.h | 6 + .../pipeline/exec/aggregation_source_operator.cpp | 43 ++--- be/src/pipeline/exec/analytic_sink_operator.cpp | 10 ++ be/src/pipeline/exec/analytic_sink_operator.h | 4 + be/src/pipeline/exec/analytic_source_operator.cpp | 4 +- be/src/pipeline/exec/assert_num_rows_operator.cpp | 4 +- be/src/pipeline/exec/datagen_operator.cpp | 4 +- .../distinct_streaming_aggregation_operator.cpp | 3 +- be/src/pipeline/exec/exchange_source_operator.cpp | 3 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 39 +++++ be/src/pipeline/exec/hashjoin_build_sink.h | 6 + be/src/pipeline/exec/hashjoin_probe_operator.cpp | 14 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 + .../exec/multi_cast_data_stream_source.cpp | 4 +- .../exec/nested_loop_join_probe_operator.cpp | 8 +- be/src/pipeline/exec/operator.cpp | 22 ++- be/src/pipeline/exec/operator.h | 79 ++++++++- .../pipeline/exec/partition_sort_sink_operator.cpp | 13 ++ .../pipeline/exec/partition_sort_sink_operator.h | 2 + .../exec/partition_sort_source_operator.cpp | 14 +- .../exec/partitioned_aggregation_sink_operator.cpp | 24 ++- .../exec/partitioned_aggregation_sink_operator.h | 5 + .../exec/partitioned_hash_join_probe_operator.cpp | 22 +-- .../exec/partitioned_hash_join_probe_operator.h | 3 + .../exec/partitioned_hash_join_sink_operator.cpp | 90 ++++++++--- .../exec/partitioned_hash_join_sink_operator.h | 4 + be/src/pipeline/exec/repeat_operator.cpp | 9 +- be/src/pipeline/exec/schema_scan_operator.cpp | 4 +- be/src/pipeline/exec/select_operator.h | 3 +- be/src/pipeline/exec/set_probe_sink_operator.cpp | 7 + be/src/pipeline/exec/set_probe_sink_operator.h | 4 + be/src/pipeline/exec/set_sink_operator.cpp | 27 ++++ be/src/pipeline/exec/set_sink_operator.h | 2 + be/src/pipeline/exec/set_source_operator.cpp | 5 +- be/src/pipeline/exec/sort_source_operator.cpp | 2 + be/src/pipeline/exec/spill_sort_sink_operator.cpp | 10 +- .../exec/streaming_aggregation_operator.cpp | 6 +- be/src/pipeline/exec/table_function_operator.cpp | 3 +- be/src/pipeline/exec/union_source_operator.cpp | 3 + be/src/pipeline/pipeline_fragment_context.cpp | 36 +++++ be/src/pipeline/pipeline_fragment_context.h | 4 + be/src/pipeline/pipeline_task.cpp | 81 +++++++++- be/src/pipeline/pipeline_task.h | 26 ++- be/src/pipeline/task_scheduler.cpp | 177 ++++++++++++++++++++- be/src/pipeline/task_scheduler.h | 17 +- be/src/runtime/query_context.cpp | 68 +++++++- be/src/runtime/query_context.h | 26 ++- be/src/runtime/runtime_state.h | 9 +- be/src/vec/common/hash_table/hash_table.h | 12 ++ be/src/vec/common/hash_table/ph_hash_map.h | 9 ++ be/src/vec/common/hash_table/string_hash_table.h | 26 +++ be/src/vec/exprs/vectorized_fn_call.cpp | 18 +++ be/src/vec/exprs/vectorized_fn_call.h | 2 + be/src/vec/exprs/vexpr.cpp | 18 +++ be/src/vec/exprs/vexpr.h | 2 + be/src/vec/exprs/vexpr_context.cpp | 46 +++++- be/src/vec/exprs/vexpr_context.h | 14 ++ be/src/vec/exprs/vin_predicate.cpp | 20 +++ be/src/vec/exprs/vin_predicate.h | 1 + be/src/vec/exprs/vslot_ref.h | 2 + 62 files changed, 1011 insertions(+), 141 deletions(-) diff --git a/be/src/clucene b/be/src/clucene index c5d02a7e411..fdbf2204031 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit c5d02a7e41194b02444be6d684e3aeb4ff1b5595 +Subproject commit fdbf2204031128b2bd8505fc73c06403b7c1a815 diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 260a599a947..51734bbd5ee 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -23,6 +23,7 @@ #include "common/status.h" #include "pipeline/exec/operator.h" #include "runtime/primitive_type.h" +#include "runtime/thread_context.h" #include "vec/common/hash_table/hash.h" #include "vec/exprs/vectorized_agg_fn.h" @@ -176,6 +177,8 @@ Status AggSinkLocalState::_create_agg_status(vectorized::AggregateDataPtr data) Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { DCHECK(_agg_data->without_key != nullptr); SCOPED_TIMER(_build_timer); + _memory_usage_last_executing = 0; + ScopedMemTracker mem_tracker(_memory_usage_last_executing); for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add( block, @@ -187,6 +190,8 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { } Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) { + _memory_usage_last_executing = 0; + ScopedMemTracker mem_tracker(_memory_usage_last_executing); if (_shared_state->reach_limit) { return _merge_with_serialized_key_helper<true, false>(block); } else { @@ -394,6 +399,9 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { SCOPED_TIMER(_merge_timer); DCHECK(_agg_data->without_key != nullptr); + + _memory_usage_last_executing = 0; + ScopedMemTracker mem_tracker(_memory_usage_last_executing); for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) { int col_id = AggSharedState::get_slot_column_id( @@ -431,6 +439,8 @@ void AggSinkLocalState::_update_memusage_without_key() { } Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) { + _memory_usage_last_executing = 0; + ScopedMemTracker mem_tracker(_memory_usage_last_executing); if (_shared_state->reach_limit) { return _execute_with_serialized_key_helper<true>(block); } else { @@ -708,6 +718,10 @@ Status AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& return Status::OK(); } +size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const { + return _memory_usage(); +} + AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id), @@ -864,6 +878,11 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) { return Status::OK(); } +size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state.get_reserve_mem_size(state); +} + Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_close_timer); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 97440de3f09..129aea5eb76 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -96,6 +96,8 @@ protected: Status _create_agg_status(vectorized::AggregateDataPtr data); size_t _memory_usage() const; + size_t get_reserve_mem_size(RuntimeState* state) const; + RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr; @@ -123,6 +125,8 @@ protected: std::unique_ptr<vectorized::Arena> _agg_profile_arena; std::unique_ptr<ExecutorBase> _executor = nullptr; + + size_t _memory_usage_last_executing = 0; }; class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> { @@ -163,6 +167,8 @@ public: Status reset_hash_table(RuntimeState* state); + size_t get_reserve_mem_size(RuntimeState* state) override; + using DataSinkOperatorX<AggSinkLocalState>::node_id; using DataSinkOperatorX<AggSinkLocalState>::operator_id; using DataSinkOperatorX<AggSinkLocalState>::get_local_state; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index a5f40a431c5..c1cb187f3e6 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -22,17 +22,13 @@ #include "common/exception.h" #include "pipeline/exec/operator.h" +#include "runtime/thread_context.h" #include "vec/exprs/vectorized_agg_fn.h" +#include "vec/exprs/vexpr_fwd.h" namespace doris::pipeline { -AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) - : Base(state, parent), - _get_results_timer(nullptr), - _serialize_result_timer(nullptr), - _hash_table_iterate_timer(nullptr), - _insert_keys_to_column_timer(nullptr), - _serialize_data_timer(nullptr) {} +AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -53,23 +49,27 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->template cast<AggSourceOperatorX>(); if (p._without_key) { if (p._needs_finalize) { - _executor.get_result = std::bind<Status>(&AggLocalState::_get_without_key_result, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + _executor.get_result = [this](RuntimeState* state, vectorized::Block* block, + bool* eos) { + return _get_without_key_result(state, block, eos); + }; } else { - _executor.get_result = std::bind<Status>(&AggLocalState::_serialize_without_key, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + _executor.get_result = [this](RuntimeState* state, vectorized::Block* block, + bool* eos) { + return _serialize_without_key(state, block, eos); + }; } } else { if (p._needs_finalize) { - _executor.get_result = std::bind<Status>( - &AggLocalState::_get_with_serialized_key_result, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3); + _executor.get_result = [this](RuntimeState* state, vectorized::Block* block, + bool* eos) { + return _get_with_serialized_key_result(state, block, eos); + }; } else { - _executor.get_result = std::bind<Status>( - &AggLocalState::_serialize_with_serialized_key_result, this, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + _executor.get_result = [this](RuntimeState* state, vectorized::Block* block, + bool* eos) { + return _serialize_with_serialized_key_result(state, block, eos); + }; } } @@ -440,11 +440,11 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.do_agg_limit(block, eos); return Status::OK(); } @@ -482,6 +482,7 @@ void AggLocalState::make_nullable_output_key(vectorized::Block* block) { template <bool limit> Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block) { SCOPED_TIMER(_merge_timer); + ScopedMemTracker scoped_tracker(_estimate_memory_usage); size_t key_size = Base::_shared_state->probe_expr_ctxs.size(); vectorized::ColumnRawPtrs key_columns(key_size); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 85d7773bdbd..6e3aeec283e 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -21,6 +21,7 @@ #include <string> #include "pipeline/exec/operator.h" +#include "runtime/runtime_state.h" #include "vec/exprs/vectorized_agg_fn.h" namespace doris::pipeline { @@ -263,6 +264,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + + local_state._reserve_mem_size = 0; + ScopedMemTracker mem_tracker(local_state._reserve_mem_size); + local_state._shared_state->input_eos = eos; if (local_state._shared_state->input_eos && input_block->rows() == 0) { local_state._dependency->set_ready_to_read(); @@ -325,6 +330,11 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block return Status::OK(); } +size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state._reserve_mem_size; +} + Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index b8615717198..a82a1e41044 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -61,6 +61,8 @@ private: RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr; std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs; + + size_t _reserve_mem_size = 0; }; class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> { @@ -93,6 +95,8 @@ public: return !_partition_by_eq_expr_ctxs.empty() && _order_by_eq_expr_ctxs.empty(); } + size_t get_reserve_mem_size(RuntimeState* state) override; + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index b521a9b583f..2661314592d 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -512,6 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); if (local_state._shared_state->input_eos && (local_state._output_block_index == local_state._shared_state->input_blocks.size() || local_state._shared_state->input_total_rows == 0)) { @@ -539,8 +540,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block } } RETURN_IF_ERROR(local_state.output_current_block(block)); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 5aa27b51c45..6c6a28029e2 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -42,6 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); local_state.add_num_rows_returned(block->rows()); int64_t num_rows_returned = local_state.num_rows_returned(); bool assert_res = false; @@ -116,8 +117,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc } COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); return Status::OK(); } diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 93b3d058154..03fe9db37bd 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -68,9 +68,9 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); Status res = local_state._table_func->get_next(state, block, eos); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.reached_limit(block, eos); return res; } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 70b73225f06..29975be396a 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -456,8 +456,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc local_state._make_nullable_output_key(block); if (!_is_streaming_preagg) { // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); } local_state.add_num_rows_returned(block->rows()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index cf2055ec47b..9db0bca0c43 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -148,8 +148,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block return Status::OK(); } auto status = local_state.stream_recvr->get_next(block, eos); - RETURN_IF_ERROR(doris::vectorized::VExprContext::filter_block(local_state.conjuncts(), block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns())); // In vsortrunmerger, it will set eos=true, and block not empty // so that eos==true, could not make sure that block not have valid data if (!*eos || block->rows() > 0) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8f7b176a979..589af5e1f42 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -110,6 +110,37 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { return Status::OK(); } +size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { + if (!_should_build_hash_table) { + return 0; + } + size_t size_to_reserve = 0; + + if (!_build_side_mutable_block.empty()) { + size_to_reserve += _build_side_mutable_block.allocated_bytes(); + + // estimating for serialized key + for (auto id : _build_col_ids) { + size_to_reserve += _build_side_mutable_block.get_column_by_position(id)->byte_size(); + } + } + + const size_t rows = _build_side_mutable_block.rows() + state->batch_size(); + size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows); + + size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first + size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next + + auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); + if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_OUTER_JOIN || + p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == TJoinOp::RIGHT_SEMI_JOIN) { + size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited + } + size_to_reserve += _evaluate_mem_usage; + + return size_to_reserve; +} + Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { auto p = _parent->cast<HashJoinBuildSinkOperatorX>(); Defer defer {[&]() { @@ -170,6 +201,7 @@ Status HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, RuntimeProfile::Counter& expr_call_timer, std::vector<int>& res_col_ids) { + auto origin_size = block.allocated_bytes(); for (size_t i = 0; i < exprs.size(); ++i) { int result_col_id = -1; // execute build column @@ -183,6 +215,8 @@ Status HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block, block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); res_col_ids[i] = result_col_id; } + + _evaluate_mem_usage = block.allocated_bytes() - origin_size; return Status::OK(); } @@ -621,4 +655,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* return Status::OK(); } +size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state.get_reserve_mem_size(state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cf677833fb5..cb626a81cb1 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -54,6 +54,8 @@ public: Status close(RuntimeState* state, Status exec_status) override; + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state); + protected: void _hash_table_init(RuntimeState* state); void _set_build_ignore_flag(vectorized::Block& block, const std::vector<int>& res_col_ids); @@ -77,6 +79,8 @@ protected: int64_t _build_side_mem_used = 0; int64_t _build_side_last_mem_used = 0; + size_t _evaluate_mem_usage = 0; + size_t _build_side_rows = 0; vectorized::MutableBlock _build_side_mutable_block; @@ -122,6 +126,8 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; + size_t get_reserve_mem_size(RuntimeState* state) override; + bool should_dry_run(RuntimeState* state) override { return _is_broadcast_join && !state->get_sink_local_state() ->cast<HashJoinBuildSinkLocalState>() diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index f91e1eaa2a1..44d51c2416b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -347,6 +347,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return st; } + local_state._estimate_memory_usage += temp_block.allocated_bytes(); RETURN_IF_ERROR( local_state.filter_data_and_build_output(state, output_block, eos, &temp_block)); // Here make _join_block release the columns' ptr @@ -445,8 +446,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state } { SCOPED_TIMER(_join_filter_timer); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, temp_block, - temp_block->columns())); + RETURN_IF_ERROR(filter_block(_conjuncts, temp_block, temp_block->columns())); } RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false)); @@ -487,8 +487,12 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu auto& local_state = get_local_state(state); local_state.prepare_for_next(); local_state._probe_eos = eos; - if (input_block->rows() > 0) { - COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows()); + + const auto rows = input_block->rows(); + size_t origin_size = input_block->allocated_bytes(); + + if (rows > 0) { + COUNTER_UPDATE(local_state._probe_rows_counter, rows); std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size()); RETURN_IF_ERROR(_do_evaluate(*input_block, local_state._probe_expr_ctxs, *local_state._probe_expr_call_timer, res_col_ids)); @@ -499,6 +503,8 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids)); + local_state._estimate_memory_usage += (input_block->allocated_bytes() - origin_size); + if (&local_state._probe_block != input_block) { input_block->swap(local_state._probe_block); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index d3bca8fa7cd..e03fcf9dbdb 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -116,6 +116,8 @@ private: std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>(); + ssize_t _estimated_mem_in_push = -1; + RuntimeProfile::Counter* _probe_expr_call_timer = nullptr; RuntimeProfile::Counter* _probe_next_timer = nullptr; RuntimeProfile::Counter* _probe_side_output_timer = nullptr; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 1028bca7ce2..6dce8e60874 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -90,8 +90,8 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, output_block, eos)); if (!local_state._conjuncts.empty()) { - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, - output_block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, + output_block->columns())); } if (!local_state._output_expr_contexts.empty() && output_block->rows() > 0) { diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 9546ed8df56..bb53755edb2 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -466,6 +466,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized bool eos) const { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state._probe_rows_counter, block->rows()); + ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); local_state._cur_probe_row_visited_flags.resize(block->rows()); std::fill(local_state._cur_probe_row_visited_flags.begin(), local_state._cur_probe_row_visited_flags.end(), 0); @@ -492,10 +493,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block bool* eos) const { auto& local_state = get_local_state(state); if (_is_output_left_side_only) { + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block)); *eos = local_state._shared_state->left_side_eos; local_state._need_more_input_data = !local_state._shared_state->left_side_eos; } else { + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); *eos = ((_match_all_build || _is_right_semi_anti) ? local_state._output_null_idx_build_side == local_state._shared_state->build_blocks.size() && @@ -511,8 +514,8 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block local_state.add_tuple_is_null_column(&tmp_block); { SCOPED_TIMER(local_state._join_filter_timer); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block( - local_state._conjuncts, &tmp_block, tmp_block.columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, &tmp_block, + tmp_block.columns())); } RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, false)); local_state._reset_tuple_is_null_column(); @@ -528,6 +531,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block state, join_op_variants); }; SCOPED_TIMER(local_state._loop_join_timer); + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); RETURN_IF_ERROR(std::visit( func, local_state._shared_state->join_op_variants, vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 4a93bac67fe..cf1e82f57dd 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -242,6 +242,14 @@ void PipelineXLocalStateBase::clear_origin_block() { _origin_block.clear_column_data(_parent->intermediate_row_desc().num_materialized_slots()); } +Status PipelineXLocalStateBase::filter_block(const vectorized::VExprContextSPtrs& expr_contexts, + vectorized::Block* block, int column_to_keep) { + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, block, column_to_keep)); + + _estimate_memory_usage += vectorized::VExprContext::get_memory_usage(expr_contexts); + return Status::OK(); +} + Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { auto* local_state = state->get_local_state(operator_id()); @@ -254,11 +262,14 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori vectorized::Block input_block = *origin_block; std::vector<int> result_column_ids; + size_t bytes_usage = 0; for (const auto& projections : local_state->_intermediate_projections) { result_column_ids.resize(projections.size()); for (int i = 0; i < projections.size(); i++) { RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i])); } + + bytes_usage += input_block.allocated_bytes(); input_block.shuffle_columns(result_column_ids); } @@ -269,12 +280,14 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to); null_column.get_nested_column().insert_range_from(*from, 0, rows); null_column.get_null_map_column().get_data().resize_fill(rows, 0); + bytes_usage += null_column.allocated_bytes(); } else { to = make_nullable(from, false)->assume_mutable(); } } else { if (_keep_origin || !from->is_exclusive()) { to->insert_range_from(*from, 0, rows); + bytes_usage += from->allocated_bytes(); } else { to = from->assume_mutable(); } @@ -287,18 +300,24 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori *_output_row_descriptor); if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); + const size_t origin_columns_count = input_block.columns(); DCHECK(mutable_columns.size() == local_state->_projections.size()); for (int i = 0; i < mutable_columns.size(); ++i) { auto result_column_id = -1; RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id)); auto column_ptr = input_block.get_by_position(result_column_id) .column->convert_to_full_column_if_const(); + if (result_column_id >= origin_columns_count) { + bytes_usage += column_ptr->allocated_bytes(); + } insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); } + local_state->_estimate_memory_usage += bytes_usage; + return Status::OK(); } @@ -322,6 +341,7 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized:: if (UNLIKELY(!status.ok())) { return status; } + return do_projections(state, &local_state->_origin_block, block); } local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption()); @@ -365,7 +385,7 @@ std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int indenta Status DataSinkOperatorXBase::init(const TDataSink& tsink) { std::string op_name = "UNKNOWN_SINK"; - std::map<int, const char*>::const_iterator it = _TDataSinkType_VALUES_TO_NAMES.find(tsink.type); + auto it = _TDataSinkType_VALUES_TO_NAMES.find(tsink.type); if (it != _TDataSinkType_VALUES_TO_NAMES.end()) { op_name = it->second; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 48f8a2d1836..14cd56f5751 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -20,6 +20,7 @@ #include <fmt/format.h> #include <glog/logging.h> +#include <atomic> #include <cstdint> #include <functional> #include <memory> @@ -32,8 +33,10 @@ #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" #include "pipeline/local_exchange/local_exchanger.h" +#include "runtime/memory/mem_tracker.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/runtime_profile.h" #include "vec/core/block.h" #include "vec/runtime/vdata_stream_recvr.h" @@ -178,11 +181,21 @@ public: std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; } + Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts, + vectorized::Block* block, int column_to_keep); + + void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage += usage; } + + size_t& estimate_memory_usage() { return _estimate_memory_usage; } + + void reset_estimate_memory_usage() { _estimate_memory_usage = 0; } + protected: friend class OperatorXBase; ObjectPool* _pool = nullptr; int64_t _num_rows_returned {0}; + size_t _estimate_memory_usage {0}; std::unique_ptr<RuntimeProfile> _runtime_profile; @@ -215,6 +228,24 @@ protected: vectorized::Block _origin_block; }; +class ScopedMemTracker { +public: + ScopedMemTracker(size_t& counter) : _counter(counter), _mem_tracker("ScopedMemTracker") { + thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker); + _peak_usage = _mem_tracker.peak_consumption(); + } + + ~ScopedMemTracker() { + thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); + _counter += (_mem_tracker.peak_consumption() - _peak_usage); + } + +private: + size_t& _counter; + size_t _peak_usage = 0; + MemTracker _mem_tracker; +}; + template <typename SharedStateArg = FakeSharedState> class PipelineXLocalState : public PipelineXLocalStateBase { public: @@ -386,7 +417,11 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status open(RuntimeState* state) override { return Status::OK(); } + Status open(RuntimeState* state) override { + _spill_dependency = state->get_spill_dependency(); + DCHECK(_spill_dependency != nullptr); + return Status::OK(); + } Status close(RuntimeState* state, Status exec_status) override; @@ -414,6 +449,7 @@ public: protected: Dependency* _dependency = nullptr; + Dependency* _spill_dependency = nullptr; SharedStateType* _shared_state = nullptr; private: @@ -424,13 +460,13 @@ private: class DataSinkOperatorXBase : public OperatorBase { public: DataSinkOperatorXBase(const int operator_id, const int node_id) - : OperatorBase(), _operator_id(operator_id), _node_id(node_id), _dests_id({1}) {} + : _operator_id(operator_id), _node_id(node_id), _dests_id({1}) {} DataSinkOperatorXBase(const int operator_id, const int node_id, const int dest_id) - : OperatorBase(), _operator_id(operator_id), _node_id(node_id), _dests_id({dest_id}) {} + : _operator_id(operator_id), _node_id(node_id), _dests_id({dest_id}) {} DataSinkOperatorXBase(const int operator_id, const int node_id, std::vector<int>& sources) - : OperatorBase(), _operator_id(operator_id), _node_id(node_id), _dests_id(sources) {} + : _operator_id(operator_id), _node_id(node_id), _dests_id(sources) {} ~DataSinkOperatorXBase() override = default; @@ -511,6 +547,13 @@ public: [[nodiscard]] std::string get_name() const override { return _name; } + [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { return 0; } + + [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, vectorized::Block* block, + bool eos) { + return true; + } + virtual bool should_dry_run(RuntimeState* state) { return false; } protected: @@ -582,6 +625,11 @@ public: return Status::OK(); } + std::vector<Dependency*> dependencies() const override { + auto dependencies = Base::dependencies(); + return dependencies; + } + RuntimeProfile::Counter* _spill_counters = nullptr; RuntimeProfile::Counter* _spill_timer = nullptr; RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; @@ -739,6 +787,10 @@ public: void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; } int parallel_tasks() const { return _parallel_tasks; } + [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { return 0; } + + virtual void reset_reserve_mem_size(RuntimeState* state) {} + protected: template <typename Dependency> friend class PipelineXLocalState; @@ -770,6 +822,7 @@ protected: int64_t _limit; // -1: no limit uint32_t _debug_point_count = 0; + std::atomic_uint32_t _bytes_per_row = 0; std::string _op_name; bool _ignore_data_distribution = false; @@ -795,6 +848,24 @@ public: [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { return state->get_local_state(operator_id())->template cast<LocalState>(); } + + size_t get_reserve_mem_size(RuntimeState* state) override { + auto& local_state = get_local_state(state); + auto estimated_size = local_state.estimate_memory_usage(); + if (!is_source() && _child_x) { + estimated_size += _child_x->get_reserve_mem_size(state); + } + return estimated_size; + } + + void reset_reserve_mem_size(RuntimeState* state) override { + auto& local_state = get_local_state(state); + local_state.reset_estimate_memory_usage(); + + if (!is_source() && _child_x) { + _child_x->reset_reserve_mem_size(state); + } + } }; /** diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 94c51e160da..caa2ac8a134 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -244,6 +244,19 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition( return Status::OK(); } +size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + auto rows = state->batch_size(); + size_t reserve_mem_size = std::visit( + vectorized::Overload {[&](std::monostate&) -> size_t { return 0; }, + [&](auto& agg_method) -> size_t { + return agg_method.hash_table->estimate_memory(rows); + }}, + local_state._partitioned_data->method_variant); + reserve_mem_size += rows * sizeof(size_t); // hash values + return reserve_mem_size; +} + Status PartitionSortSinkOperatorX::_emplace_into_hash_table( const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index e58ac5fea9e..8f660c7e094 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -269,6 +269,8 @@ public: return {ExchangeType::PASSTHROUGH}; } + size_t get_reserve_mem_size(RuntimeState* state) override; + private: friend class PartitionSortSinkLocalState; ObjectPool* _pool = nullptr; diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 2f94a652a89..48822650dca 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -42,18 +42,18 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: output_block->clear_column_data(); { std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex); - if (local_state._shared_state->blocks_buffer.empty() == false) { + if (!local_state._shared_state->blocks_buffer.empty()) { local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); //if buffer have no data and sink not eos, block reading and wait for signal again - RETURN_IF_ERROR(vectorized::VExprContext::filter_block( - local_state._conjuncts, output_block, output_block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, + output_block->columns())); if (local_state._shared_state->blocks_buffer.empty() && - local_state._shared_state->sink_eos == false) { + !local_state._shared_state->sink_eos) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock); - if (local_state._shared_state->sink_eos == false) { + if (!local_state._shared_state->sink_eos) { local_state._dependency->block(); } } @@ -71,8 +71,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: // if we move the _blocks_buffer output at last(behind 286 line), // it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0) RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, - output_block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, + output_block->columns())); { std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 469716b7a22..314806529b7 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -32,7 +32,7 @@ PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase : Base(parent, state) { _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), - parent->get_name() + "_SPILL_DEPENDENCY", true); + parent->get_name() + "_FINISH_DEPENDENCY", true); } Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, @@ -228,9 +228,16 @@ Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) return sink_local_state->open(state); } +size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + auto* runtime_state = local_state._runtime_state.get(); + return _agg_sink_operator->get_reserve_mem_size(runtime_state); +} + Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " - << Base::_parent->node_id() << " revoke_memory" + << Base::_parent->node_id() + << " revoke_memory, size: " << _parent->revocable_mem_size(state) << ", eos: " << _eos; RETURN_IF_ERROR(Base::_shared_state->sink_status); if (!_shared_state->is_spilled) { @@ -240,14 +247,14 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { // TODO: spill thread may set_ready before the task::execute thread put the task to blocked state if (!_eos) { - Base::_dependency->Dependency::block(); + Base::_spill_dependency->Dependency::block(); } auto& parent = Base::_parent->template cast<Parent>(); Status status; Defer defer {[&]() { if (!status.ok()) { if (!_eos) { - Base::_dependency->Dependency::set_ready(); + Base::_spill_dependency->Dependency::set_ready(); } } }}; @@ -262,6 +269,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { return status; }); + state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( state, _shared_state->shared_from_this(), [this, &parent, state, query_id, submit_timer] { @@ -285,16 +293,16 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { _shared_state->close(); } else { VLOG_DEBUG << "query " << print_id(query_id) << " agg node " - << Base::_parent->node_id() << " revoke_memory finish" - << ", eos: " << _eos; + << Base::_parent->node_id() << " revoke_memory finish, size: " + << _parent->revocable_mem_size(state) << ", eos: " << _eos; } if (_eos) { Base::_dependency->set_ready_to_read(); _finish_dependency->set_ready(); - } else { - Base::_dependency->Dependency::set_ready(); } + Base::_spill_dependency->Dependency::set_ready(); + state->get_query_ctx()->decrease_revoking_tasks_count(); }}; auto* runtime_state = _runtime_state.get(); auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 259d7580877..1a94d9077be 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -16,7 +16,10 @@ // under the License. #pragma once +#include <memory> + #include "aggregation_sink_operator.h" +#include "pipeline/dependency.h" #include "pipeline/exec/operator.h" #include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vexpr.h" @@ -322,6 +325,8 @@ public: Status revoke_memory(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state) override; + private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 018d63a6dee..9a939c62287 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -17,6 +17,8 @@ #include "partitioned_hash_join_probe_operator.h" +#include <glog/logging.h> + #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" @@ -142,6 +144,8 @@ void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(PipelineXSpillLocalState::open(state)); + _spill_dependency = state->get_spill_dependency(); + DCHECK(_spill_dependency != nullptr); return _parent->cast<PartitionedHashJoinProbeOperatorX>()._partitioner->clone(state, _partitioner); } @@ -221,10 +225,10 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat _spill_status_ok = false; _spill_status = std::move(status); } - _dependency->set_ready(); + _spill_dependency->set_ready(); }; - _dependency->block(); + _spill_dependency->block(); DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func", { return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed"); @@ -361,12 +365,12 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti _spill_status_ok = false; _spill_status = std::move(status); } - _dependency->set_ready(); + _spill_dependency->set_ready(); }; auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; - _dependency->block(); + _spill_dependency->block(); { auto* pipeline_task = state->get_task(); if (pipeline_task) { @@ -477,12 +481,12 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti _spill_status_ok = false; _spill_status = std::move(status); } - _dependency->set_ready(); + _spill_dependency->set_ready(); }; auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); - _dependency->block(); + _spill_dependency->block(); has_data = true; DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_probe_blocks_submit_func", { @@ -655,6 +659,7 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_probe sink failed"); }); + RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), &block, true)); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", internal build operator finished, node id: " << node_id() @@ -777,11 +782,8 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const { auto& local_state = get_local_state(state); - const auto revocable_size = revocable_mem_size(state); - if (PipelineTask::should_revoke_memory(state, revocable_size)) { - return true; - } if (local_state._shared_state->need_to_spill) { + const auto revocable_size = revocable_mem_size(state); const auto min_revocable_size = state->min_revocable_mem(); return revocable_size > min_revocable_size; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 8cccc9f8fae..f1b635208eb 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/dependency.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" @@ -88,6 +89,8 @@ private: bool _need_to_setup_internal_operators {true}; + Dependency* _spill_dependency {nullptr}; + RuntimeProfile::Counter* _spill_and_partition_label = nullptr; RuntimeProfile::Counter* _partition_timer = nullptr; RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a7297be493f..8d565bda4dc 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -17,9 +17,14 @@ #include "partitioned_hash_join_sink_operator.h" +#include <glog/logging.h> + +#include <algorithm> + #include "pipeline/exec/operator.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" +#include "util/runtime_profile.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -39,6 +44,8 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _partition_shuffle_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime", "Spill", 1); _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1); + _in_mem_rows_counter = + ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "InMemRow", TUnit::UNIT, "Spill", 1); return Status::OK(); } @@ -76,9 +83,9 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state /// If no need to spill, all rows were sunk into the `_inner_sink_operator` without partitioned. if (!_shared_state->need_to_spill) { if (_shared_state->inner_shared_state) { - auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); + auto* inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); if (inner_sink_state_) { - auto inner_sink_state = + auto* inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_); return inner_sink_state->_build_side_mem_used; } @@ -99,6 +106,20 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state return mem_size; } +size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* state) { + size_t size_to_reserve = 0; + auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + if (_shared_state->need_to_spill) { + size_to_reserve = p._partition_count * vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM; + } else { + if (_shared_state->inner_runtime_state) { + size_to_reserve = p._inner_sink_operator->get_reserve_mem_size( + _shared_state->inner_runtime_state.get()); + } + } + return size_to_reserve; +} + Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) { auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); _shared_state->inner_shared_state->hash_table_variants.reset(); @@ -139,7 +160,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta std::unique_lock<std::mutex> lock(_spill_lock); _spill_status = status; _spill_status_ok = false; - _dependency->set_ready(); + _spill_dependency->set_ready(); return false; } return true; @@ -187,7 +208,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta std::unique_lock<std::mutex> lock(_spill_lock); _spill_status = st; _spill_status_ok = false; - _dependency->set_ready(); + _spill_dependency->set_ready(); return; } partitions_indexes[partition_idx].clear(); @@ -203,7 +224,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta } } - _dependency->set_ready(); + _spill_dependency->set_ready(); }; auto exception_catch_func = [spill_func, this]() mutable { @@ -216,7 +237,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta std::unique_lock<std::mutex> lock(_spill_lock); _spill_status = status; _spill_status_ok = false; - _dependency->set_ready(); + _spill_dependency->set_ready(); } }; @@ -225,7 +246,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); - _dependency->block(); + _spill_dependency->block(); DBUG_EXECUTE_IF( "fault_inject::partitioned_hash_join_sink::revoke_unpartitioned_block_submit_func", { return Status::Error<INTERNAL_ERROR>( @@ -239,6 +260,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; DCHECK_EQ(_spilling_streams_count, 0); + CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); if (!_shared_state->need_to_spill) { profile()->add_info_string("Spilled", "true"); @@ -275,9 +297,10 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { "fault_inject partitioned_hash_join_sink revoke_memory submit_func failed"); }); + state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( state, _shared_state->shared_from_this(), - [this, query_id, spilling_stream, i, submit_timer] { + [this, state, query_id, spilling_stream, i, submit_timer] { DBUG_EXECUTE_IF( "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( @@ -294,12 +317,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { return Status::OK(); }(); - if (!status.OK()) { + if (!status.ok()) { std::unique_lock<std::mutex> lock(_spill_lock); - _dependency->set_ready(); + _spill_dependency->set_ready(); _spill_status_ok = false; _spill_status = std::move(status); } + + state->get_query_ctx()->decrease_revoking_tasks_count(); }); if (st.ok()) { st = spill_io_pool->submit(std::move(spill_runnable)); @@ -314,10 +339,16 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { if (_spilling_streams_count > 0) { std::unique_lock<std::mutex> lock(_spill_lock); if (_spilling_streams_count > 0) { - _dependency->block(); + _spill_dependency->block(); } else if (_child_eos) { LOG(INFO) << "hash join sink " << _parent->node_id() << " set_ready_to_read" << ", task id: " << state->task_id(); + std::for_each(_shared_state->partitioned_build_blocks.begin(), + _shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); + } + }); _dependency->set_ready_to_read(); } } @@ -384,10 +415,16 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (num == 1) { std::unique_lock<std::mutex> lock(_spill_lock); - _dependency->set_ready(); + _spill_dependency->set_ready(); if (_child_eos) { LOG(INFO) << "hash join sink " << _parent->node_id() << " set_ready_to_read" << ", task id: " << state()->task_id(); + std::for_each(_shared_state->partitioned_build_blocks.begin(), + _shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); + } + }); _dependency->set_ready_to_read(); } } @@ -477,6 +514,7 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); + CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr); local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); if (!local_state._spill_status_ok) { @@ -492,7 +530,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (rows == 0) { if (eos) { LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id(); + << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; if (!need_to_spill) { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { @@ -506,6 +544,14 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); } + + std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), + local_state._shared_state->partitioned_build_blocks.end(), + [&](auto& block) { + if (block) { + COUNTER_UPDATE(local_state._in_mem_rows_counter, block->rows()); + } + }); local_state._dependency->set_ready_to_read(); } return Status::OK(); @@ -514,11 +560,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (need_to_spill) { RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); - - const auto revocable_size = revocable_mem_size(state); - if (revocable_size > state->min_revocable_mem()) { - return local_state.revoke_memory(state); - } } else { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { RETURN_IF_ERROR(_setup_internal_operator(state)); @@ -534,7 +575,13 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (eos) { LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id(); + << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; + std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), + local_state._shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(local_state._in_mem_rows_counter, block->rows()); + } + }); local_state._dependency->set_ready_to_read(); } @@ -553,4 +600,9 @@ Status PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) { return local_state.revoke_memory(state); } +size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state.get_reserve_mem_size(state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 1376964663f..b2c79967b97 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -45,6 +45,7 @@ public: Status close(RuntimeState* state, Status exec_status) override; Status revoke_memory(RuntimeState* state); size_t revocable_mem_size(RuntimeState* state) const; + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state); protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) @@ -76,6 +77,7 @@ protected: RuntimeProfile::Counter* _partition_timer = nullptr; RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; RuntimeProfile::Counter* _spill_build_timer = nullptr; + RuntimeProfile::Counter* _in_mem_rows_counter = nullptr; }; class PartitionedHashJoinSinkOperatorX @@ -102,6 +104,8 @@ public: Status revoke_memory(RuntimeState* state) override; + size_t get_reserve_mem_size(RuntimeState* state) override; + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index d355d99c2e3..3706a9262fb 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -171,6 +171,7 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block auto& _expr_ctxs = local_state._expr_ctxs; DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); if (input_block->rows() > 0) { + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); _intermediate_block = vectorized::Block::create_unique(); for (auto& expr : _expr_ctxs) { @@ -196,6 +197,9 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp auto& _child_eos = local_state._child_eos; auto& _intermediate_block = local_state._intermediate_block; RETURN_IF_CANCELLED(state); + + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + DCHECK(_repeat_id_idx >= 0); for (const std::vector<int64_t>& v : _grouping_list) { DCHECK(_repeat_id_idx <= (int)v.size()); @@ -227,8 +231,9 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp } _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); } - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, - output_block->columns())); + + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, + output_block->columns())); *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); COUNTER_SET(local_state._rows_returned_counter, local_state._num_rows_returned); diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index fcc1ed2bbb1..556cf2a4988 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -259,8 +259,8 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl *src_block.get_by_name(dest_slot_desc->col_name()).column, 0, src_block.rows()); } - RETURN_IF_ERROR(vectorized::VExprContext::filter_block( - local_state._conjuncts, block, _dest_tuple_desc->slots().size())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, + _dest_tuple_desc->slots().size())); src_block.clear(); } } while (block->rows() == 0 && !*eos); diff --git a/be/src/pipeline/exec/select_operator.h b/be/src/pipeline/exec/select_operator.h index 5370cd9e293..e5e57e57203 100644 --- a/be/src/pipeline/exec/select_operator.h +++ b/be/src/pipeline/exec/select_operator.h @@ -46,8 +46,7 @@ public: auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 955f956f60d..9d20456e093 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -68,6 +68,7 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + ScopedMemTracker mem_tracker(local_state._estimate_memory_usage); auto probe_rows = in_block->rows(); if (probe_rows > 0) { @@ -194,6 +195,12 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe( } } +template <bool is_intersect> +size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state._estimate_memory_usage; +} + template <bool is_intersect> void SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table( SetProbeSinkLocalState<is_intersect>& local_state) { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index ab53f5358c2..a19479e281a 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -55,6 +55,8 @@ private: template <class HashTableContext, bool is_intersected> friend struct vectorized::HashTableProbe; + size_t _estimate_memory_usage = 0; + //record insert column id during probe std::vector<uint16_t> _probe_column_inserted_id; vectorized::ColumnRawPtrs _probe_columns; @@ -100,6 +102,8 @@ public: std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } + size_t get_reserve_mem_size(RuntimeState* state) override; + private: void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state); Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& local_state, diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 38667293d48..e96340741e5 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -206,6 +206,33 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState return Status::OK(); } +template <bool is_intersect> +size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + size_t size_to_reserve = std::visit( + [&](auto&& arg) -> size_t { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (std::is_same_v<HashTableCtxType, std::monostate>) { + return 0; + } else { + return arg.hash_table->estimate_memory(state->batch_size()); + } + }, + *local_state._shared_state->hash_table_variants); + + size_to_reserve += local_state._mutable_block.allocated_bytes(); + for (auto& _child_expr : _child_exprs) { + size_to_reserve += _child_expr->root()->estimate_memory(state->batch_size()); + } + return size_to_reserve; +} + +template <bool is_intersect> +Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc()); +} + template <bool is_intersect> Status SetSinkOperatorX<is_intersect>::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 1c08eddc141..be600ff6fd5 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -95,6 +95,8 @@ public: } bool require_shuffled_data_distribution() const override { return true; } + size_t get_reserve_mem_size(RuntimeState* state) override; + private: template <class HashTableContext, bool is_intersected> friend struct HashTableBuild; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 554a58caf14..6c3260ba850 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -74,6 +74,8 @@ Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + _create_mutable_cols(local_state, block); auto st = std::visit( [&](auto&& arg) -> Status { @@ -88,8 +90,7 @@ Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz }, *local_state._shared_state->hash_table_variants); RETURN_IF_ERROR(st); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 02a99e183c8..7e657859ce9 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -53,6 +53,8 @@ Status SortSourceOperatorX::open(RuntimeState* state) { Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos)); local_state.reached_limit(block, eos); return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 4bf1ab04efb..c82404afb03 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -205,7 +205,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { // TODO: spill thread may set_ready before the task::execute thread put the task to blocked state if (!_eos) { - Base::_dependency->Dependency::block(); + Base::_spill_dependency->Dependency::block(); } auto query_id = state->query_id(); @@ -236,8 +236,10 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { _dependency->set_ready_to_read(); _finish_dependency->set_ready(); } else { - _dependency->Dependency::set_ready(); + _spill_dependency->Dependency::set_ready(); } + + state->get_query_ctx()->decrease_revoking_tasks_count(); }}; _shared_state->sink_status = @@ -288,15 +290,17 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { "revoke_memory submit_func failed"); }); if (status.ok()) { + state->get_query_ctx()->increase_revoking_tasks_count(); status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), exception_catch_func)); } if (!status.ok()) { if (!_eos) { - Base::_dependency->Dependency::set_ready(); + Base::_spill_dependency->Dependency::set_ready(); } } return status; } + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index dfbe42c637e..38dfee5c46d 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1274,14 +1274,14 @@ Status StreamingAggLocalState::close(RuntimeState* state) { Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) const { auto& local_state = get_local_state(state); + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); if (!local_state._pre_aggregated_block->empty()) { local_state._pre_aggregated_block->swap(*block); } else { RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); } local_state.reached_limit(block, eos); @@ -1291,6 +1291,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, bool eos) const { auto& local_state = get_local_state(state); + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + local_state._input_num_rows += in_block->rows(); if (in_block->rows() > 0) { RETURN_IF_ERROR(local_state.do_pre_agg(in_block, local_state._pre_aggregated_block.get())); diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index ff9dfe632fa..364670877f2 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -198,8 +198,7 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, } // 3. eval conjuncts - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, - output_block->columns())); + RETURN_IF_ERROR(filter_block(_conjuncts, output_block, output_block->columns())); *eos = _child_eos && _cur_child_offset == -1; return Status::OK(); diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index ecaaf22922b..24d172f6708 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -140,6 +140,9 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Blo DCHECK_EQ(state->per_fragment_instance_idx(), 0); auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>(); DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size()); + + ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + auto& _const_expr_list_idx = local_state._const_expr_list_idx; vectorized::MutableBlock mblock = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8fb750b9e97..e082bb1980f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -22,6 +22,7 @@ #include <gen_cpp/PlanNodes_types.h> #include <pthread.h> +#include <algorithm> #include <cstdlib> // IWYU pragma: no_include <bits/chrono.h> #include <fmt/format.h> @@ -1818,6 +1819,41 @@ Status PipelineFragmentContext::send_report(bool done) { req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this())); } +size_t PipelineFragmentContext::get_revocable_size(bool& has_running_task) const { + size_t revocable_size = 0; + for (const auto& task_instances : _tasks) { + for (const auto& task : task_instances) { + if (task->is_running() || task->is_revoking()) { + LOG_EVERY_N(INFO, 50) << "query: " << print_id(_query_id) + << " is running, task: " << (void*)task.get() + << ", task->is_revoking(): " << task->is_revoking() << ", " + << task->is_running(); + has_running_task = true; + return 0; + } + + size_t revocable_size_ = task->get_revocable_size(); + if (revocable_size_ > _runtime_state->min_revocable_mem()) { + revocable_size += task->get_revocable_size(); + } + } + } + return revocable_size; +} + +std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const { + std::vector<PipelineTask*> revocable_tasks; + for (const auto& task_instances : _tasks) { + for (const auto& task : task_instances) { + size_t revocable_size_ = task->get_revocable_size(); + if (revocable_size_ > _runtime_state->min_revocable_mem()) { + revocable_tasks.emplace_back(task.get()); + } + } + } + return revocable_tasks; +} + std::string PipelineFragmentContext::debug_string() { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n"); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f95eb03fb12..7ea39c7377b 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -117,6 +117,10 @@ public: [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; } + [[nodiscard]] size_t get_revocable_size(bool& has_running_task) const; + + [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const; + void instance_ids(std::vector<TUniqueId>& ins_ids) const { ins_ids.resize(_fragment_instance_ids.size()); for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4f362ac5042..5a987cba416 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -20,17 +20,19 @@ #include <fmt/format.h> #include <gen_cpp/Metrics_types.h> #include <glog/logging.h> -#include <stddef.h> +#include <cstddef> #include <ostream> #include <vector> #include "common/status.h" +#include "pipeline/dependency.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/pipeline.h" #include "pipeline/pipeline_fragment_context.h" #include "pipeline/task_queue.h" +#include "pipeline/task_scheduler.h" #include "runtime/descriptors.h" #include "runtime/query_context.h" #include "runtime/thread_context.h" @@ -38,6 +40,7 @@ #include "util/defer_op.h" #include "util/mem_info.h" #include "util/runtime_profile.h" +#include "util/uid_util.h" namespace doris { class RuntimeState; @@ -64,9 +67,14 @@ PipelineTask::PipelineTask( _sink(pipeline->sink_shared_pointer()), _le_state_map(std::move(le_state_map)), _task_idx(task_idx), - _execution_dep(state->get_query_ctx()->get_execution_dependency()) { + _execution_dep(state->get_query_ctx()->get_execution_dependency()), + _memory_sufficient_dependency( + state->get_query_ctx()->get_memory_sufficient_dependency()) { _pipeline_task_watcher.start(); + _spill_dependency = Dependency::create_shared(-1, -1, "PipelineTaskSpillDependency", true); + + _state->set_spill_dependency(_spill_dependency.get()); auto shared_state = _sink->create_shared_state(); if (shared_state) { _sink_shared_state = shared_state; @@ -265,6 +273,12 @@ bool PipelineTask::_is_blocked() { } } + _blocked_dep = _spill_dependency->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return true; + } + for (auto* op_dep : _write_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { @@ -272,6 +286,12 @@ bool PipelineTask::_is_blocked() { return true; } } + + _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return true; + } return false; } @@ -312,6 +332,8 @@ Status PipelineTask::execute(bool* eos) { RETURN_IF_ERROR(_open()); } + const auto query_id = _state->query_id(); + while (!_fragment_context->is_canceled()) { if (_is_blocked()) { return Status::OK(); @@ -332,24 +354,50 @@ Status PipelineTask::execute(bool* eos) { auto* block = _block.get(); auto sink_revocable_mem_size = _sink->revocable_mem_size(_state); - if (should_revoke_memory(_state, sink_revocable_mem_size)) { + + bool is_wg_mem_low_water_mark = false; + bool is_wg_mem_high_water_mark = false; + if (should_revoke_memory(_state, sink_revocable_mem_size, is_wg_mem_low_water_mark, + is_wg_mem_high_water_mark)) { RETURN_IF_ERROR(_sink->revoke_memory(_state)); continue; } + *eos = _eos; DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", { Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed"); return status; }); + + DEFER_RELEASE_RESERVED(); + // `_dry_run` means sink operator need no more data // `_sink->is_finished(_state)` means sink operator should be finished + size_t reserve_size = 0; + bool has_enough_memory = true; if (_dry_run || _sink->is_finished(_state)) { *eos = true; _eos = true; } else { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); + size_t sink_reserve_size = _sink->get_reserve_mem_size(_state); + reserve_size = _root->get_reserve_mem_size(_state) + sink_reserve_size; + _root->reset_reserve_mem_size(_state); + DCHECK_EQ(_root->get_reserve_mem_size(_state), 0); + + if (reserve_size > 0) { + auto st = thread_context()->try_reserve_memory(reserve_size); + if (!st.ok()) { + LOG(INFO) << "query: " << print_id(query_id) + << ", try to reserve: " << reserve_size + << " failed: " << st.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + has_enough_memory = false; + } + } + RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); } @@ -373,13 +421,25 @@ Status PipelineTask::execute(bool* eos) { return Status::OK(); } } + + if (!has_enough_memory) { + COUNTER_UPDATE(_yield_counts, 1); + + LOG(INFO) << "query: " << print_id(query_id) << ", task: " << (void*)this + << ", insufficient memory. reserve_size: " << reserve_size; + _memory_sufficient_dependency->block(); + _state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this); + break; + } } static_cast<void>(get_task_queue()->push_back(this)); return Status::OK(); } -bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes) { +bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes, + bool& is_wg_mem_low_water_mark, + bool& is_wg_mem_high_water_mark) { auto* query_ctx = state->get_query_ctx(); auto wg = query_ctx->workload_group(); if (!wg) { @@ -395,8 +455,6 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m } } - bool is_wg_mem_low_water_mark = false; - bool is_wg_mem_high_water_mark = false; wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark); if (is_wg_mem_high_water_mark) { if (revocable_mem_bytes > min_revocable_mem_bytes) { @@ -507,6 +565,9 @@ std::string PipelineTask::debug_string() { } } + fmt::format_to(debug_string_buffer, "{}. {}\n", i, + _memory_sufficient_dependency->debug_string(i++)); + fmt::format_to(debug_string_buffer, "Write Dependency Information: \n"); for (size_t j = 0; j < _write_dependencies.size(); j++, i++) { fmt::format_to(debug_string_buffer, "{}. {}\n", i, @@ -527,6 +588,14 @@ std::string PipelineTask::debug_string() { return fmt::to_string(debug_string_buffer); } +size_t PipelineTask::get_revocable_size() const { + return (_running || _eos) ? 0 : _sink->revocable_mem_size(_state); +} + +Status PipelineTask::revoke_memory() { + return _sink->revoke_memory(_state); +} + void PipelineTask::wake_up() { // call by dependency static_cast<void>(get_task_queue()->push_back(this)); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index dd2ead4b5dc..79da888cda6 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -17,8 +17,7 @@ #pragma once -#include <stdint.h> - +#include <cstdint> #include <memory> #include <string> #include <vector> @@ -141,6 +140,8 @@ public: std::unique_lock<std::mutex> lc(_dependency_lock); if (!_finalized) { _execution_dep->set_always_ready(); + _memory_sufficient_dependency->set_always_ready(); + _spill_dependency->set_always_ready(); for (auto* dep : _filter_dependencies) { dep->set_always_ready(); } @@ -180,10 +181,14 @@ public: /** * Return true if: * 1. `enable_force_spill` is true which forces this task to spill data. - * 2. Or memory consumption reaches the high water mark of current workload group (80% of memory limitation by default) and revocable_mem_bytes is bigger than min_revocable_mem_bytes. - * 3. Or memory consumption is higher than the low water mark of current workload group (50% of memory limitation by default) and `query_weighted_consumption >= query_weighted_limit` and revocable memory is big enough. + * 2. Or memory consumption reaches the high water mark of current workload group (80% of memory limitation by default) + and revocable_mem_bytes is bigger than min_revocable_mem_bytes. + * 3. Or memory consumption is higher than the low water mark of current workload group (50% of memory limitation by default) + and `query_weighted_consumption >= query_weighted_limit` and revocable memory is big enough. */ - static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); + static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes, + bool& is_wg_mem_low_water_mark, + bool& is_wg_mem_high_water_mark); void put_in_runnable_queue() { _schedule_time++; @@ -193,7 +198,8 @@ public: void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } bool is_running() { return _running.load(); } - void set_running(bool running) { _running = running; } + bool is_revoking() { return _spill_dependency->is_blocked_by(nullptr) != nullptr; } + bool set_running(bool running) { return _running.exchange(running); } bool is_exceed_debug_timeout() { if (_has_exceed_timeout) { @@ -231,6 +237,9 @@ public: } } + [[nodiscard]] size_t get_revocable_size() const; + [[nodiscard]] Status revoke_memory(); + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -306,11 +315,16 @@ private: Dependency* _execution_dep = nullptr; + Dependency* _memory_sufficient_dependency = nullptr; + + std::shared_ptr<Dependency> _spill_dependency; + std::atomic<bool> _finalized {false}; std::mutex _dependency_lock; std::atomic<bool> _running {false}; std::atomic<bool> _eos {false}; + std::atomic<bool> _revoking {false}; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 8be30773ee1..f76affffb70 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -24,19 +24,25 @@ #include <sched.h> // IWYU pragma: no_include <bits/chrono.h> +#include <algorithm> #include <chrono> // IWYU pragma: keep +#include <cstddef> #include <functional> +#include <memory> +#include <mutex> #include <ostream> #include <string> #include <thread> #include <utility> #include "common/logging.h" +#include "common/status.h" #include "pipeline/pipeline_task.h" #include "pipeline/task_queue.h" #include "pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" +#include "runtime/thread_context.h" #include "util/thread.h" #include "util/threadpool.h" #include "util/time.h" @@ -53,16 +59,17 @@ TaskScheduler::~TaskScheduler() { Status TaskScheduler::start() { int cores = _task_queue->cores(); RETURN_IF_ERROR(ThreadPoolBuilder(_name) - .set_min_threads(cores) - .set_max_threads(cores) + .set_min_threads(cores + 1) + .set_max_threads(cores + 1) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); LOG_INFO("TaskScheduler set cores").tag("size", cores); - _markers.resize(cores, true); for (size_t i = 0; i < cores; ++i) { RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } + + RETURN_IF_ERROR(_fix_thread_pool->submit_func([this] { _paused_queries_handler(); })); return Status::OK(); } @@ -98,17 +105,19 @@ void _close_task(PipelineTask* task, Status exec_status) { } void TaskScheduler::_do_work(size_t index) { - while (_markers[index]) { + while (!_need_to_stop) { auto* task = _task_queue->take(index); if (!task) { continue; } + if (task->is_running()) { static_cast<void>(_task_queue->push_back(task, index)); continue; } - task->log_detail_if_need(); task->set_running(true); + + task->log_detail_if_need(); task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled(); @@ -187,15 +196,167 @@ void TaskScheduler::_do_work(size_t index) { } } +void TaskScheduler::add_paused_task(PipelineTask* task) { + std::lock_guard<std::mutex> lock(_paused_queries_lock); + auto query_ctx_sptr = task->runtime_state()->get_query_ctx()->shared_from_this(); + DCHECK(query_ctx_sptr != nullptr); + auto wg = query_ctx_sptr->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace(std::move(query_ctx_sptr)); + if (inserted) { + LOG(INFO) << "here insert one new paused query: " << print_id(it->get()->query_id()); + } + + _paused_queries_cv.notify_all(); +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group is below low water mark, we make all queries in this wg runnable. + * strategy 3: Pick the query which has the max revocable size to revoke memory. + * strategy 4: If all queries are not revocable and they all have not any running task, + * we choose the max memory usage query to cancel. + */ +void TaskScheduler::_paused_queries_handler() { + while (!_need_to_stop) { + { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list.empty()) { + _paused_queries_cv.wait(lock, [&] { return !_paused_queries_list.empty(); }); + } + + if (_need_to_stop) { + break; + } + + if (_paused_queries_list.empty()) { + continue; + } + + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + if (queries_list.empty()) { + LOG(INFO) << "wg: " << wg->debug_string() << " has no paused query"; + it = _paused_queries_list.erase(it); + continue; + } + + bool is_low_wartermark = false; + bool is_high_wartermark = false; + + wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); + + if (!is_low_wartermark && !is_high_wartermark) { + LOG(INFO) << "**** there are " << queries_list.size() << " to resume"; + for (const auto& query : queries_list) { + LOG(INFO) << "**** resume paused query: " << print_id(query->query_id()); + query->set_memory_sufficient(true); + } + + queries_list.clear(); + it = _paused_queries_list.erase(it); + continue; + } else { + ++it; + } + + std::shared_ptr<QueryContext> max_revocable_query; + std::shared_ptr<QueryContext> max_memory_usage_query; + std::shared_ptr<QueryContext> running_query; + bool has_running_query = false; + size_t max_revocable_size = 0; + size_t max_memory_usage = 0; + auto it_to_remove = queries_list.end(); + + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + const auto& query_ctx = *query_it; + size_t revocable_size = 0; + size_t memory_usage = 0; + bool has_running_task = false; + + if (query_ctx->is_cancelled()) { + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << "was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + query_ctx->get_revocable_info(revocable_size, memory_usage, has_running_task); + if (has_running_task) { + has_running_query = true; + running_query = query_ctx; + break; + } else if (revocable_size > max_revocable_size) { + max_revocable_query = query_ctx; + max_revocable_size = revocable_size; + it_to_remove = query_it; + } else if (memory_usage > max_memory_usage) { + max_memory_usage_query = query_ctx; + max_memory_usage = memory_usage; + it_to_remove = query_it; + } + + ++query_it; + } + + if (has_running_query) { + LOG(INFO) << "has running task, query: " << print_id(running_query->query_id()); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } else if (max_revocable_query) { + queries_list.erase(it_to_remove); + queries_list.insert(queries_list.begin(), max_revocable_query); + + auto revocable_tasks = max_revocable_query->get_revocable_tasks(); + DCHECK(!revocable_tasks.empty()); + + LOG(INFO) << "query: " << print_id(max_revocable_query->query_id()) << ", has " + << revocable_tasks.size() + << " tasks to revoke memory, max revocable size: " + << max_revocable_size; + SCOPED_ATTACH_TASK(max_revocable_query.get()); + for (auto* task : revocable_tasks) { + auto st = task->revoke_memory(); + if (!st.ok()) { + max_revocable_query->cancel(st); + break; + } + } + } else if (max_memory_usage_query) { + bool new_is_low_wartermark = false; + bool new_is_high_wartermark = false; + wg->check_mem_used(&new_is_low_wartermark, &new_is_high_wartermark); + if (new_is_high_wartermark) { + LOG(INFO) << "memory insufficient and cannot find revocable query, cancel " + "the query: " + << print_id(max_memory_usage_query->query_id()) + << ", usage: " << max_memory_usage + << ", wg info: " << wg->debug_string(); + max_memory_usage_query->cancel(Status::InternalError( + "memory insufficient and cannot find revocable query, cancel the " + "biggest usage({}) query({})", + max_memory_usage, print_id(max_memory_usage_query->query_id()))); + } else { + LOG(INFO) << "new_is_high_wartermark is false, resume max memory usage " + "paused query: " + << print_id(max_memory_usage_query->query_id()); + max_memory_usage_query->set_memory_sufficient(true); + queries_list.erase(it_to_remove); + } + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } +} + void TaskScheduler::stop() { if (!_shutdown) { if (_task_queue) { _task_queue->close(); } if (_fix_thread_pool) { - for (size_t i = 0; i < _markers.size(); ++i) { - _markers[i] = false; - } + _need_to_stop = true; + _paused_queries_cv.notify_all(); _fix_thread_pool->shutdown(); _fix_thread_pool->wait(); } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 9a20807ea26..987dcd81bd0 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -17,10 +17,9 @@ #pragma once -#include <stddef.h> - #include <atomic> #include <condition_variable> +#include <cstddef> #include <list> #include <memory> #include <mutex> @@ -30,6 +29,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" #include "pipeline_task.h" +#include "runtime/query_context.h" #include "runtime/workload_group/workload_group.h" #include "util/thread.h" @@ -49,7 +49,6 @@ public: TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, std::string name, CgroupCpuCtl* cgroup_cpu_ctl) : _task_queue(std::move(task_queue)), - _shutdown(false), _name(std::move(name)), _cgroup_cpu_ctl(cgroup_cpu_ctl) {} @@ -63,14 +62,22 @@ public: std::vector<int> thread_debug_info() { return _fix_thread_pool->debug_info(); } + void add_paused_task(PipelineTask* task); + private: std::unique_ptr<ThreadPool> _fix_thread_pool; std::shared_ptr<TaskQueue> _task_queue; - std::vector<bool> _markers; - bool _shutdown; + bool _need_to_stop = false; + bool _shutdown = false; std::string _name; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::map<WorkloadGroupPtr, std::set<std::shared_ptr<QueryContext>>> _paused_queries_list; + std::mutex _paused_queries_lock; + std::condition_variable _paused_queries_cv; + void _do_work(size_t index); + + void _paused_queries_handler(); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 10f5ca19add..9a04658876d 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -77,7 +77,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe, QuerySource query_source) : _timeout_second(-1), - _query_id(query_id), + _query_id(std::move(query_id)), _exec_env(exec_env), _is_pipeline(is_pipeline), _is_nereids(is_nereids), @@ -88,6 +88,9 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, _query_watcher.start(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency"); + _memory_sufficient_dependency = + pipeline::Dependency::create_unique(-1, -1, "MemorySufficientDependency", true); + _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>( TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker); @@ -191,6 +194,7 @@ QueryContext::~QueryContext() { } _runtime_filter_mgr.reset(); _execution_dependency.reset(); + _memory_sufficient_dependency.reset(); _shared_hash_table_controller.reset(); _runtime_predicates.clear(); file_scan_range_params_map.clear(); @@ -218,6 +222,14 @@ void QueryContext::set_execution_dependency_ready() { _execution_dependency->set_ready(); } +void QueryContext::set_memory_sufficient(bool sufficient) { + if (sufficient) { + _memory_sufficient_dependency->set_ready(); + } else { + _memory_sufficient_dependency->block(); + } +} + void QueryContext::cancel(Status new_status, int fragment_id) { if (!_exec_status.update(new_status)) { return; @@ -386,11 +398,63 @@ void QueryContext::_report_query_profile() { ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile(); } +void QueryContext::get_revocable_info(size_t& revocable_size, size_t& memory_usage, + bool& has_running_task) const { + revocable_size = 0; + for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { + auto fragment_ctx = fragment_wptr.lock(); + if (!fragment_ctx) { + continue; + } + + revocable_size += fragment_ctx->get_revocable_size(has_running_task); + + // Should wait for all tasks are not running before revoking memory. + if (has_running_task) { + break; + } + } + + memory_usage = query_mem_tracker->consumption(); +} + +size_t QueryContext::get_revocable_size() const { + size_t revocable_size = 0; + for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { + auto fragment_ctx = fragment_wptr.lock(); + if (!fragment_ctx) { + continue; + } + + bool has_running_task = false; + revocable_size += fragment_ctx->get_revocable_size(has_running_task); + + // Should wait for all tasks are not running before revoking memory. + if (has_running_task) { + return 0; + } + } + return revocable_size; +} + +std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const { + std::vector<pipeline::PipelineTask*> tasks; + for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { + auto fragment_ctx = fragment_wptr.lock(); + if (!fragment_ctx) { + continue; + } + auto tasks_of_fragment = fragment_ctx->get_revocable_tasks(); + tasks.insert(tasks.end(), tasks_of_fragment.cbegin(), tasks_of_fragment.cend()); + } + return tasks; +} + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> QueryContext::_collect_realtime_query_profile() const { std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res; - for (auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) { + for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) { if (auto fragment_ctx = fragment_ctx_wptr.lock()) { if (fragment_ctx == nullptr) { std::string msg = diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 7a6d6d3c53d..1edb204f049 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -30,6 +30,7 @@ #include "common/config.h" #include "common/factory_creator.h" #include "common/object_pool.h" +#include "pipeline/dependency.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_statistics.h" @@ -45,6 +46,7 @@ namespace doris { namespace pipeline { class PipelineFragmentContext; +class PipelineTask; } // namespace pipeline struct ReportStatusRequest { @@ -76,7 +78,7 @@ const std::string toString(QuerySource query_source); // Some components like DescriptorTbl may be very large // that will slow down each execution of fragments when DeSer them every time. class DescriptorTbl; -class QueryContext { +class QueryContext : public std::enable_shared_from_this<QueryContext> { ENABLE_FACTORY_CREATOR(QueryContext); public: @@ -118,6 +120,8 @@ public: void set_execution_dependency_ready(); + void set_memory_sufficient(bool sufficient); + void set_ready_to_execute_only(); std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() { @@ -181,6 +185,12 @@ public: pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } + pipeline::Dependency* get_memory_sufficient_dependency() { + return _memory_sufficient_dependency.get(); + } + + std::vector<pipeline::PipelineTask*> get_revocable_tasks() const; + void register_query_statistics(std::shared_ptr<QueryStatistics> qs); std::shared_ptr<QueryStatistics> get_query_statistics(); @@ -218,6 +228,16 @@ public: return _running_big_mem_op_num.load(std::memory_order_relaxed); } + void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); } + + void decrease_revoking_tasks_count() { _revoking_tasks_count.fetch_sub(1); } + + int get_revoking_tasks_count() const { return _revoking_tasks_count.load(); } + + void get_revocable_info(size_t& revocable_size, size_t& memory_usage, + bool& has_running_task) const; + size_t get_revocable_size() const; + void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; } int64_t spill_threshold() { return _spill_threshold; } DescriptorTbl* desc_tbl = nullptr; @@ -253,6 +273,7 @@ private: bool _is_pipeline = false; bool _is_nereids = false; std::atomic<int> _running_big_mem_op_num = 0; + std::atomic<int> _revoking_tasks_count = 0; // A token used to submit olap scanner to the "_limited_scan_thread_pool", // This thread pool token is created from "_limited_scan_thread_pool" from exec env. @@ -280,6 +301,9 @@ private: vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr<pipeline::Dependency> _execution_dependency; + std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency; + std::vector<std::weak_ptr<pipeline::PipelineTask>> _pipeline_tasks; + std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr; // This shared ptr is never used. It is just a reference to hold the object. // There is a weak ptr in runtime filter manager to reference this object. diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f43d0a163df..87c54564ae5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -56,6 +56,7 @@ class PipelineXLocalStateBase; class PipelineXSinkLocalStateBase; class PipelineFragmentContext; class PipelineTask; +class Dependency; } // namespace pipeline class DescriptorTbl; @@ -600,7 +601,11 @@ public: vectorized::ColumnInt64* partial_update_auto_inc_column() { return _partial_update_auto_inc_column; - }; + } + + void set_spill_dependency(pipeline::Dependency* dependency) { _spill_dependency = dependency; } + + pipeline::Dependency* get_spill_dependency() { return _spill_dependency; } private: Status create_error_log_file(); @@ -697,6 +702,8 @@ private: int _task_id = -1; int _task_num = 0; + pipeline::Dependency* _spill_dependency; + std::vector<THivePartitionUpdate> _hive_partition_updates; std::vector<TIcebergCommitData> _iceberg_commit_datas; diff --git a/be/src/vec/common/hash_table/hash_table.h b/be/src/vec/common/hash_table/hash_table.h index 490cd501692..ea912da32f1 100644 --- a/be/src/vec/common/hash_table/hash_table.h +++ b/be/src/vec/common/hash_table/hash_table.h @@ -807,6 +807,18 @@ public: } } + size_t estimate_memory(size_t num_elem) const { + if (!add_elem_size_overflow(num_elem)) { + return 0; + } + + auto new_size = num_elem + grower.buf_size(); + Grower new_grower = grower; + new_grower.set(new_size); + + return new_grower.buf_size() * sizeof(Cell); + } + /// Insert a value. In the case of any more complex values, it is better to use the `emplace` function. std::pair<LookupResult, bool> ALWAYS_INLINE insert(const value_type& x) { std::pair<LookupResult, bool> res; diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h index 8f61ee966c5..10896c73a77 100644 --- a/be/src/vec/common/hash_table/ph_hash_map.h +++ b/be/src/vec/common/hash_table/ph_hash_map.h @@ -219,6 +219,15 @@ public: return (_hash_map.size() + row) > (capacity * 7 / 8); } + size_t estimate_memory(size_t num_elem) const { + if (!add_elem_size_overflow(num_elem)) { + return 0; + } + + auto new_size = _hash_map.capacity() * 2 + 1; + return new_size * sizeof(typename HashMapImpl::slot_type); + } + size_t size() const { return _hash_map.size(); } template <typename MappedType> char* get_null_key_data() { diff --git a/be/src/vec/common/hash_table/string_hash_table.h b/be/src/vec/common/hash_table/string_hash_table.h index 74be1e85e1e..8ae1f235613 100644 --- a/be/src/vec/common/hash_table/string_hash_table.h +++ b/be/src/vec/common/hash_table/string_hash_table.h @@ -679,4 +679,30 @@ public: m3.add_elem_size_overflow(add_size) || m4.add_elem_size_overflow(add_size) || ms.add_elem_size_overflow(add_size); } + + size_t estimate_memory(size_t num_elem) const { + size_t estimate_size = 0; + + if (m1.add_elem_size_overflow(num_elem)) { + estimate_size = m1.estimate_memory(num_elem); + } + + if (m2.add_elem_size_overflow(num_elem)) { + estimate_size = std::max(estimate_size, m2.estimate_memory(num_elem)); + } + + if (m3.add_elem_size_overflow(num_elem)) { + estimate_size = std::max(estimate_size, m3.estimate_memory(num_elem)); + } + + if (m4.add_elem_size_overflow(num_elem)) { + estimate_size = std::max(estimate_size, m4.estimate_memory(num_elem)); + } + + if (ms.add_elem_size_overflow(num_elem)) { + estimate_size = std::max(estimate_size, ms.estimate_memory(num_elem)); + } + + return estimate_size; + } }; diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index 65d4230488a..9ff45c5dad9 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -189,6 +189,24 @@ Status VectorizedFnCall::_do_execute(doris::vectorized::VExprContext* context, return Status::OK(); } +size_t VectorizedFnCall::estimate_memory(const size_t rows) { + if (is_const_and_have_executed()) { // const have execute in open function + return 0; + } + + size_t estimate_size = 0; + for (auto& child : _children) { + estimate_size += child->estimate_memory(rows); + } + + if (_data_type->have_maximum_size_of_value()) { + estimate_size += rows * _data_type->get_maximum_size_of_value_in_memory(); + } else { + estimate_size += rows * 512; /// FIXME: estimated value... + } + return estimate_size; +} + Status VectorizedFnCall::execute_runtime_fitler(doris::vectorized::VExprContext* context, doris::vectorized::Block* block, int* result_column_id, std::vector<size_t>& args) { diff --git a/be/src/vec/exprs/vectorized_fn_call.h b/be/src/vec/exprs/vectorized_fn_call.h index bae996136dd..b78b27e9764 100644 --- a/be/src/vec/exprs/vectorized_fn_call.h +++ b/be/src/vec/exprs/vectorized_fn_call.h @@ -72,6 +72,8 @@ public: bool can_push_down_to_index() const override; bool equals(const VExpr& other) override; + size_t estimate_memory(const size_t rows) override; + protected: FunctionBasePtr _function; std::string _expr_name; diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index da496cdd9f0..be869e4cc8f 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -731,6 +731,24 @@ Status VExpr::_evaluate_inverted_index(VExprContext* context, const FunctionBase return Status::OK(); } +size_t VExpr::estimate_memory(const size_t rows) { + if (is_const_and_have_executed()) { + return 0; + } + + size_t estimate_size = 0; + for (auto& child : _children) { + estimate_size += child->estimate_memory(rows); + } + + if (_data_type->have_maximum_size_of_value()) { + estimate_size += rows * _data_type->get_maximum_size_of_value_in_memory(); + } else { + estimate_size += rows * 64; /// TODO: need a more reasonable value + } + return estimate_size; +} + bool VExpr::fast_execute(doris::vectorized::VExprContext* context, doris::vectorized::Block* block, int* result_column_id) { if (context->get_inverted_index_context() && diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 382713b2afc..573f5c38b59 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -126,6 +126,8 @@ public: Status _evaluate_inverted_index(VExprContext* context, const FunctionBasePtr& function, uint32_t segment_num_rows); + virtual size_t estimate_memory(const size_t rows); + // Only the 4th parameter is used in the runtime filter. In and MinMax need overwrite the // interface virtual Status execute_runtime_fitler(VExprContext* context, Block* block, diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index bcfd7cda102..4db2033983b 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -17,6 +17,7 @@ #include "vec/exprs/vexpr_context.h" +#include <algorithm> #include <ostream> #include <string> @@ -136,7 +137,9 @@ Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block, int col return Status::OK(); } int result_column_id = -1; + size_t origin_size = block->allocated_bytes(); RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id)); + vexpr_ctx->_memory_usage = (block->allocated_bytes() - origin_size); return Block::filter_block(block, result_column_id, column_to_keep); } @@ -283,14 +286,14 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, Block for (const auto& conjunct : conjuncts) { int result_column_id = -1; RETURN_IF_ERROR(conjunct->execute(block, &result_column_id)); - auto& filter_column = + const auto& filter_column = unpack_if_const(block->get_by_position(result_column_id).column).first; - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { + if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); const IColumn::Filter& result = assert_cast<const ColumnUInt8&>(*nested_column).get_data(); - auto* __restrict filter_data = result.data(); - auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + const auto* __restrict filter_data = result.data(); + const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); DCHECK_EQ(rows, nullable_column->size()); for (size_t i = 0; i != rows; ++i) { @@ -302,7 +305,8 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, Block final_filter_ptr[i] = final_filter_ptr[i] & filter_data[i]; } } else { - auto* filter_data = assert_cast<const ColumnUInt8&>(*filter_column).get_data().data(); + const auto* filter_data = + assert_cast<const ColumnUInt8&>(*filter_column).get_data().data(); for (size_t i = 0; i != rows; ++i) { final_filter_ptr[i] = final_filter_ptr[i] & filter_data[i]; } @@ -318,11 +322,19 @@ Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& int column_to_keep) { IColumn::Filter result_filter(block->rows(), 1); bool can_filter_all; + + _reset_memory_usage(ctxs); + RETURN_IF_ERROR( execute_conjuncts(ctxs, nullptr, false, block, &result_filter, &can_filter_all)); + + // Accumulate the usage of `result_filter` into the first context. + if (!ctxs.empty()) { + ctxs[0]->_memory_usage += result_filter.allocated_bytes(); + } if (can_filter_all) { for (auto& col : columns_to_filter) { - std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + block->get_by_position(col).column->assume_mutable()->clear(); } } else { try { @@ -349,12 +361,18 @@ Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& std::vector<uint32_t>& columns_to_filter, int column_to_keep, IColumn::Filter& filter) { + _reset_memory_usage(ctxs); filter.resize_fill(block->rows(), 1); bool can_filter_all; RETURN_IF_ERROR(execute_conjuncts(ctxs, nullptr, false, block, &filter, &can_filter_all)); + + // Accumulate the usage of `result_filter` into the first context. + if (!ctxs.empty()) { + ctxs[0]->_memory_usage += filter.allocated_bytes(); + } if (can_filter_all) { for (auto& col : columns_to_filter) { - std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + block->get_by_position(col).column->assume_mutable()->clear(); } } else { RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, filter)); @@ -375,13 +393,20 @@ Status VExprContext::get_output_block_after_execute_exprs( auto rows = input_block.rows(); vectorized::Block tmp_block(input_block.get_columns_with_type_and_name()); vectorized::ColumnsWithTypeAndName result_columns; + _reset_memory_usage(output_vexpr_ctxs); + for (const auto& vexpr_ctx : output_vexpr_ctxs) { int result_column_id = -1; + int origin_columns = tmp_block.columns(); + size_t origin_usage = tmp_block.allocated_bytes(); RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id)); DCHECK(result_column_id != -1); + + vexpr_ctx->_memory_usage = tmp_block.allocated_bytes() - origin_usage; const auto& col = tmp_block.get_by_position(result_column_id); - if (do_projection) { + if (do_projection && origin_columns <= result_column_id) { result_columns.emplace_back(col.column->clone_resized(rows), col.type, col.name); + vexpr_ctx->_memory_usage += result_columns.back().column->allocated_bytes(); } else { result_columns.emplace_back(tmp_block.get_by_position(result_column_id)); } @@ -390,4 +415,9 @@ Status VExprContext::get_output_block_after_execute_exprs( return Status::OK(); } +void VExprContext::_reset_memory_usage(const VExprContextSPtrs& contexts) { + std::for_each(contexts.begin(), contexts.end(), + [](auto&& context) { context->_memory_usage = 0; }); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index d022cf6169e..02628c94a17 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -19,6 +19,8 @@ #include <glog/logging.h> +#include <algorithm> +#include <cstddef> #include <memory> #include <utility> #include <vector> @@ -271,10 +273,21 @@ public: return *this; } + [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& contexts) { + size_t usage = 0; + std::for_each(contexts.cbegin(), contexts.cend(), + [&usage](auto&& context) { usage += context->_memory_usage; }); + return usage; + } + + [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; } + private: // Close method is called in vexpr context dector, not need call expicility void close(); + static void _reset_memory_usage(const VExprContextSPtrs& contexts); + friend class VExpr; /// The expr tree this context is for. @@ -301,5 +314,6 @@ private: bool _force_materialize_slot = false; std::shared_ptr<InvertedIndexContext> _inverted_index_context; + size_t _memory_usage = 0; }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vin_predicate.cpp b/be/src/vec/exprs/vin_predicate.cpp index 9e00a3afbae..3e15138fe59 100644 --- a/be/src/vec/exprs/vin_predicate.cpp +++ b/be/src/vec/exprs/vin_predicate.cpp @@ -155,6 +155,26 @@ Status VInPredicate::execute(VExprContext* context, Block* block, int* result_co return Status::OK(); } +size_t VInPredicate::estimate_memory(const size_t rows) { + if (is_const_and_have_executed()) { + return 0; + } + + size_t estimate_size = 0; + + for (int i = 0; i < _children.size(); ++i) { + estimate_size += _children[i]->estimate_memory(rows); + } + + if (_data_type->is_nullable()) { + estimate_size += rows * sizeof(uint8_t); + } + + estimate_size += rows * sizeof(uint8_t); + + return estimate_size; +} + const std::string& VInPredicate::expr_name() const { return _expr_name; } diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h index 024ad68f2ba..10caa3e7543 100644 --- a/be/src/vec/exprs/vin_predicate.h +++ b/be/src/vec/exprs/vin_predicate.h @@ -43,6 +43,7 @@ public: VInPredicate(const TExprNode& node); ~VInPredicate() override = default; Status execute(VExprContext* context, Block* block, int* result_column_id) override; + size_t estimate_memory(const size_t rows) override; Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; Status open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h index 2bea6ea5c06..f9165ee1e35 100644 --- a/be/src/vec/exprs/vslot_ref.h +++ b/be/src/vec/exprs/vslot_ref.h @@ -54,6 +54,8 @@ public: bool equals(const VExpr& other) override; + size_t estimate_memory(const size_t rows) override { return 0; } + private: int _slot_id; int _column_id; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org