This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bcfafcc759c [refactor](spill) Refine logics in pipeline task (#50010) bcfafcc759c is described below commit bcfafcc759cd686cc435e5b9758ffd73fbb46584 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Apr 14 21:55:14 2025 +0800 [refactor](spill) Refine logics in pipeline task (#50010) --- be/src/pipeline/exec/operator.h | 5 +- be/src/pipeline/pipeline_task.cpp | 130 +++++++++++++++----------------------- be/src/pipeline/pipeline_task.h | 4 ++ 3 files changed, 57 insertions(+), 82 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 75a767aaa83..b8b6577a843 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -104,6 +104,7 @@ public: [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0; [[nodiscard]] virtual Status terminate(RuntimeState* state) = 0; [[nodiscard]] virtual Status close(RuntimeState* state); + [[nodiscard]] virtual int node_id() const = 0; [[nodiscard]] virtual Status set_child(OperatorPtr child) { if (_child && child != nullptr) { @@ -625,7 +626,7 @@ public: [[nodiscard]] int nereids_id() const { return _nereids_id; } - [[nodiscard]] int node_id() const { return _node_id; } + [[nodiscard]] int node_id() const override { return _node_id; } [[nodiscard]] std::string get_name() const override { return _name; } @@ -887,7 +888,7 @@ public: [[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; } [[nodiscard]] int operator_id() const { return _operator_id; } - [[nodiscard]] int node_id() const { return _node_id; } + [[nodiscard]] int node_id() const override { return _node_id; } [[nodiscard]] int nereids_id() const { return _nereids_id; } [[nodiscard]] int64_t limit() const { return _limit; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 3216c8e034d..f6988c175d4 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -355,10 +355,6 @@ Status PipelineTask::execute(bool* done) { (fragment_context->is_canceled() || !_is_pending_finish())) { *done = true; } - // If this run is pended by a spilling request, the block will be output in next run. - if (!_spilling) { - _block->clear_column_data(_root->row_desc().num_materialized_slots()); - } }}; const auto query_id = _state->query_id(); // If this task is already EOS and block is empty (which means we already output all blocks), @@ -464,43 +460,8 @@ Status PipelineTask::execute(bool* done) { if (workload_group && _state->get_query_ctx()->enable_reserve_memory() && reserve_size > 0) { - auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size); - - COUNTER_UPDATE(_memory_reserve_times, 1); - if (!st.ok() && !_state->enable_force_spill()) { - COUNTER_UPDATE(_memory_reserve_failed_times, 1); - auto sink_revokable_mem_size = _sink->revocable_mem_size(_state); - auto debug_msg = fmt::format( - "Query: {} , try to reserve: {}, operator name: {}, operator " - "id: {}, task id: {}, root revocable mem size: {}, sink revocable mem" - "size: {}, failed: {}", - print_id(query_id), PrettyPrinter::print_bytes(reserve_size), - _root->get_name(), _root->node_id(), _state->task_id(), - PrettyPrinter::print_bytes(_root->revocable_mem_size(_state)), - PrettyPrinter::print_bytes(sink_revokable_mem_size), st.to_string()); - // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str - if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { - debug_msg += fmt::format(", debug info: {}", - GlobalMemoryArbitrator::process_mem_log_str()); - } - LOG_EVERY_N(INFO, 100) << debug_msg; - // If sink has enough revocable memory, trigger revoke memory - if (sink_revokable_mem_size >= _state->spill_min_revocable_mem()) { - LOG(INFO) << fmt::format( - "Query: {} sink: {}, node id: {}, task id: " - "{}, revocable mem size: {}", - print_id(query_id), _sink->get_name(), _sink->node_id(), - _state->task_id(), - PrettyPrinter::print_bytes(sink_revokable_mem_size)); - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), reserve_size, st); - continue; - } else { - // If reserve failed, not add this query to paused list, because it is very small, will not - // consume a lot of memory. But need set low memory mode to indicate that the system should - // not use too much memory. - _state->get_query_ctx()->set_low_memory_mode(); - } + if (!_try_to_reserve_memory(reserve_size, _root)) { + continue; } } @@ -517,45 +478,8 @@ Status PipelineTask::execute(bool* done) { if (_state->get_query_ctx()->enable_reserve_memory() && workload_group && !(_wake_up_early || _dry_run)) { const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, _eos); - status = sink_reserve_size != 0 - ? thread_context()->thread_mem_tracker_mgr->try_reserve( - sink_reserve_size) - : Status::OK(); - - auto sink_revocable_mem_size = _sink->revocable_mem_size(_state); - if (status.ok() && _state->enable_force_spill() && _sink->is_spillable() && - sink_revocable_mem_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - status = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force Spill"); - } - - if (!status.ok()) { - COUNTER_UPDATE(_memory_reserve_failed_times, 1); - auto debug_msg = fmt::format( - "Query: {} try to reserve: {}, sink name: {}, node id: {}, task " - "id: " - "{}, sink revocable mem size: {}, failed: {}", - print_id(query_id), PrettyPrinter::print_bytes(sink_reserve_size), - _sink->get_name(), _sink->node_id(), _state->task_id(), - PrettyPrinter::print_bytes(sink_revocable_mem_size), - status.to_string()); - // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str - if (!status.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { - debug_msg += fmt::format(", debug info: {}", - GlobalMemoryArbitrator::process_mem_log_str()); - } - // If the operator is not spillable or it is spillable but not has much memory to spill - // not need add to paused list, just let it go. - if (sink_revocable_mem_size >= - vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - VLOG_DEBUG << debug_msg; - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), sink_reserve_size, - status); - _spilling = true; - continue; - } else { - _state->get_query_ctx()->set_low_memory_mode(); - } + if (!_try_to_reserve_memory(sink_reserve_size, _sink.get())) { + continue; } } @@ -617,6 +541,52 @@ Status PipelineTask::execute(bool* done) { return Status::OK(); } +bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, OperatorBase* op) { + auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size); + COUNTER_UPDATE(_memory_reserve_times, 1); + auto sink_revocable_mem_size = + reserve_size > 0 ? _sink->revocable_mem_size(_state) : Status::OK(); + if (st.ok() && _state->enable_force_spill() && _sink->is_spillable() && + sink_revocable_mem_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + st = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force Spill"); + } + if (!st.ok()) { + COUNTER_UPDATE(_memory_reserve_failed_times, 1); + auto debug_msg = fmt::format( + "Query: {} , try to reserve: {}, operator name: {}, operator " + "id: {}, task id: {}, root revocable mem size: {}, sink revocable mem" + "size: {}, failed: {}", + print_id(_query_id), PrettyPrinter::print_bytes(reserve_size), op->get_name(), + op->node_id(), _state->task_id(), + PrettyPrinter::print_bytes(op->revocable_mem_size(_state)), + PrettyPrinter::print_bytes(sink_revocable_mem_size), st.to_string()); + // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str + if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { + debug_msg += + fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str()); + } + LOG_EVERY_N(INFO, 100) << debug_msg; + // If sink has enough revocable memory, trigger revoke memory + if (sink_revocable_mem_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + LOG(INFO) << fmt::format( + "Query: {} sink: {}, node id: {}, task id: " + "{}, revocable mem size: {}", + print_id(_query_id), _sink->get_name(), _sink->node_id(), _state->task_id(), + PrettyPrinter::print_bytes(sink_revocable_mem_size)); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this(), reserve_size, st); + _spilling = true; + return false; + } else { + // If reserve failed, not add this query to paused list, because it is very small, will not + // consume a lot of memory. But need set low memory mode to indicate that the system should + // not use too much memory. + _state->get_query_ctx()->set_low_memory_mode(); + } + } + return true; +} + void PipelineTask::stop_if_finished() { auto fragment = _fragment_context.lock(); if (!fragment) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 36a85f7321e..4615e0869e1 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -218,6 +218,10 @@ private: void _fresh_profile_counter(); Status _open(); + // Operator `op` try to reserve memory before executing. Return false if reserve failed + // otherwise return true. + bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op); + const TUniqueId _query_id; const uint32_t _index; PipelinePtr _pipeline; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org