This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 3ba5406e00a [fix] Use separage memory sufficient depenedency for each 
PipelineTask (#42198)
3ba5406e00a is described below

commit 3ba5406e00a353e342d936f5d3b077531bd515fd
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Oct 21 17:41:26 2024 +0800

    [fix] Use separage memory sufficient depenedency for each PipelineTask 
(#42198)
    
    ## Proposed changes
    
    1. [opt] Limit the number of scanners in FileScanOperator
    2. [fix] avoid finishing spilling streams repeatly in partitioned join
    3. [fix] Query blocking issue caused by pending block in PipelineTask
---
 be/src/pipeline/exec/file_scan_operator.cpp        | 15 ++++-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  5 ++
 .../exec/partitioned_hash_join_sink_operator.h     |  1 +
 be/src/pipeline/pipeline_fragment_context.cpp      | 13 ++++
 be/src/pipeline/pipeline_fragment_context.h        |  2 +
 be/src/pipeline/pipeline_task.cpp                  | 70 ++++++++++------------
 be/src/pipeline/pipeline_task.h                    |  6 +-
 be/src/runtime/query_context.cpp                   | 21 +++----
 be/src/runtime/query_context.h                     |  5 --
 .../workload_group/workload_group_manager.cpp      |  3 +-
 10 files changed, 82 insertions(+), 59 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 6fa7401e278..7018c279d35 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -60,9 +60,20 @@ std::string FileScanLocalState::name_suffix() const {
 
 void FileScanLocalState::set_scan_ranges(RuntimeState* state,
                                          const std::vector<TScanRangeParams>& 
scan_ranges) {
+    auto wg_ptr = state->get_query_ctx()->workload_group();
     _max_scanners =
             config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    _max_scanners = std::max(std::max(_max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
+    if (wg_ptr && state->get_query_ctx()->enable_query_slot_hard_limit()) {
+        const auto total_slots = wg_ptr->total_query_slot_count();
+        const auto query_slots = state->get_query_ctx()->get_slot_count();
+        _max_scanners = _max_scanners * query_slots / total_slots;
+    }
+
+    const auto parallel_scan_max_scanners_count = 
state->parallel_scan_max_scanners_count();
+    if (parallel_scan_max_scanners_count > 0) {
+        _max_scanners =
+                std::max(std::min(_max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
+    }
     // For select * from table limit 10; should just use one thread.
     if (should_run_serial()) {
         _max_scanners = 1;
@@ -82,7 +93,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
                 
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, 
_max_scanners);
     }
     _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
-    if (scan_ranges.size() > 0 &&
+    if (!scan_ranges.empty() &&
         
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
         // for compatibility.
         // in new implement, the tuple id is set in prepare phase
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d3d010e0d7c..baa99d3fe14 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -412,6 +412,11 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
 }
 
 Status PartitionedHashJoinSinkLocalState::_finish_spilling() {
+    bool expected = false;
+    if (!_spilling_finished.compare_exchange_strong(expected, true)) {
+        return Status::OK();
+    }
+
     for (auto& stream : _shared_state->spilled_streams) {
         if (stream) {
             RETURN_IF_ERROR(stream->spill_eof());
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 8a844e69963..97c40d43a66 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -67,6 +67,7 @@ protected:
 
     friend class PartitionedHashJoinSinkOperatorX;
 
+    std::atomic<bool> _spilling_finished {false};
     vectorized::Block _pending_block;
 
     bool _child_eos {false};
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6e6c2bd3e73..03db7e674f1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1872,6 +1872,19 @@ std::vector<PipelineTask*> 
PipelineFragmentContext::get_revocable_tasks() const
     return revocable_tasks;
 }
 
+void PipelineFragmentContext::set_memory_sufficient(bool sufficient) {
+    for (const auto& task_instances : _tasks) {
+        for (const auto& task : task_instances) {
+            auto* dependency = task->get_memory_sufficient_dependency();
+            if (sufficient) {
+                dependency->set_ready();
+            } else {
+                dependency->block();
+            }
+        }
+    }
+}
+
 std::string PipelineFragmentContext::debug_string() {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 9c2ed36b919..8d55d0ce285 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -121,6 +121,8 @@ public:
 
     [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
 
+    void set_memory_sufficient(bool sufficient);
+
     void instance_ids(std::vector<TUniqueId>& ins_ids) const {
         ins_ids.resize(_fragment_instance_ids.size());
         for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index b3282810d86..affb8c44382 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -17,6 +17,7 @@
 
 #include "pipeline_task.h"
 
+#include <fmt/core.h>
 #include <fmt/format.h>
 #include <gen_cpp/Metrics_types.h>
 #include <glog/logging.h>
@@ -71,15 +72,18 @@ PipelineTask::PipelineTask(
           _sink(pipeline->sink_shared_pointer()),
           _le_state_map(std::move(le_state_map)),
           _task_idx(task_idx),
-          _execution_dep(state->get_query_ctx()->get_execution_dependency()),
-          _memory_sufficient_dependency(
-                  state->get_query_ctx()->get_memory_sufficient_dependency()) {
+          _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
     _pipeline_task_watcher.start();
 
     auto shared_state = _sink->create_shared_state();
     if (shared_state) {
         _sink_shared_state = shared_state;
     }
+
+    const auto dependency_name =
+            fmt::format("MemorySufficientDependency_{}_{}", _sink->node_id(), 
task_id);
+    _memory_sufficient_dependency =
+            pipeline::Dependency::create_unique(-1, -1, dependency_name, true);
 }
 
 Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink,
@@ -317,6 +321,12 @@ Status PipelineTask::execute(bool* eos) {
     SCOPED_ATTACH_TASK(_state);
     _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
     *eos = _eos;
+
+    // If `_wake_up_by_downstream` is true, the pending block will not be sank.
+    if (_wake_up_by_downstream) {
+        _pending_block.reset();
+    }
+
     if (_eos && !_pending_block) {
         // If task is waken up by finish dependency, `_eos` is set to true by 
last execution, and we should return here.
         return Status::OK();
@@ -388,26 +398,22 @@ Status PipelineTask::execute(bool* eos) {
         // Every loop should check if memory is not enough.
         // _state->get_query_ctx()->update_low_memory_mode();
 
-        // `_dry_run` means sink operator need no more data
-        // `_sink->is_finished(_state)` means sink operator should be finished
-        int64_t reserve_size = 0;
-        bool has_enough_memory = true;
-        if (_dry_run || _sink->is_finished(_state)) {
-            *eos = true;
-            _eos = true;
-        } else if (_pending_block) [[unlikely]] {
+        if (_pending_block) [[unlikely]] {
             LOG(INFO) << "query: " << print_id(query_id)
                       << " has pending block, size: " << 
_pending_block->allocated_bytes();
             _block = std::move(_pending_block);
             block = _block.get();
+        }
+        // `_dry_run` means sink operator need no more data
+        // `_sink->is_finished(_state)` means sink operator should be finished
+        else if (_dry_run || _sink->is_finished(_state)) {
+            *eos = true;
+            _eos = true;
         } else {
             SCOPED_TIMER(_get_block_timer);
             DEFER_RELEASE_RESERVED();
             _get_block_counter->update(1);
-            // size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
-            // sink_reserve_size =
-            //         std::max(sink_reserve_size, 
_state->minimum_operator_memory_required_bytes());
-            reserve_size = _root->get_reserve_mem_size(_state);
+            const auto reserve_size = _root->get_reserve_mem_size(_state);
             _root->reset_reserve_mem_size(_state);
 
             auto workload_group = _state->get_query_ctx()->workload_group();
@@ -426,19 +432,14 @@ Status PipelineTask::execute(bool* eos) {
                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
 
                     _state->get_query_ctx()->update_paused_reason(st);
-                    // _state->get_query_ctx()->set_low_memory_mode();
-                    bool is_high_wartermark = false;
-                    bool is_low_wartermark = false;
-                    workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
-                    if (is_low_wartermark || is_high_wartermark) {
-                        
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                                _state->get_query_ctx()->shared_from_this(), 
reserve_size);
-                        continue;
-                    }
-                    has_enough_memory = false;
+                    _state->get_query_ctx()->set_low_memory_mode();
+                    
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                            _state->get_query_ctx()->shared_from_this(), 
reserve_size);
+                    continue;
                 }
             }
 
+            DCHECK_EQ(_pending_block.get(), nullptr);
             RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
eos));
         }
 
@@ -447,9 +448,10 @@ Status PipelineTask::execute(bool* eos) {
             Status status = Status::OK();
             DEFER_RELEASE_RESERVED();
             COUNTER_UPDATE(_memory_reserve_times, 1);
-            size_t sink_reserve_size = _sink->get_reserve_mem_size(_state, 
*eos);
+            const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, 
*eos);
             status = thread_context()->try_reserve_memory(sink_reserve_size);
             if (!status.ok()) {
+                COUNTER_UPDATE(_memory_reserve_failed_times, 1);
                 LOG(INFO) << "query: " << print_id(query_id) << ", try to 
reserve: "
                           << PrettyPrinter::print(sink_reserve_size, 
TUnit::BYTES)
                           << ", sink name: " << _sink->get_name()
@@ -457,11 +459,12 @@ Status PipelineTask::execute(bool* eos) {
                           << ", failed: " << status.to_string()
                           << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
                 _state->get_query_ctx()->update_paused_reason(status);
-                _memory_sufficient_dependency->block();
+                _state->get_query_ctx()->set_low_memory_mode();
                 ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                         _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size);
+                DCHECK_EQ(_pending_block.get(), nullptr);
                 _pending_block = std::move(_block);
-                _block = vectorized::Block::create_unique();
+                _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
                 _eos = *eos;
                 *eos = false;
                 continue;
@@ -484,17 +487,6 @@ Status PipelineTask::execute(bool* eos) {
                 return Status::OK();
             }
         }
-
-        if (!has_enough_memory) {
-            COUNTER_UPDATE(_yield_counts, 1);
-
-            LOG(INFO) << "query: " << print_id(query_id) << ", task: " << 
(void*)this
-                      << ", insufficient memory. reserve_size: "
-                      << PrettyPrinter::print(reserve_size, TUnit::BYTES);
-            ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                    _state->get_query_ctx()->shared_from_this(), reserve_size);
-            break;
-        }
     }
 
     static_cast<void>(get_task_queue()->push_back(this));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 44dfdd7832a..a3505f7a407 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -244,6 +244,10 @@ public:
         _spill_dependencies.emplace_back(dependency);
     }
 
+    Dependency* get_memory_sufficient_dependency() const {
+        return _memory_sufficient_dependency.get();
+    }
+
 private:
     friend class RuntimeFilterDependency;
     bool _is_blocked();
@@ -325,7 +329,7 @@ private:
     Dependency* _execution_dep = nullptr;
 
     std::atomic<bool> _wake_up_by_downstream = false;
-    Dependency* _memory_sufficient_dependency = nullptr;
+    std::unique_ptr<Dependency> _memory_sufficient_dependency;
 
     std::atomic<bool> _finalized {false};
     std::mutex _dependency_lock;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 527c7ca684d..a1c89394c7b 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -92,8 +92,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
     _query_watcher.start();
     _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _execution_dependency = pipeline::Dependency::create_unique(-1, -1, 
"ExecutionDependency");
-    _memory_sufficient_dependency =
-            pipeline::Dependency::create_unique(-1, -1, 
"MemorySufficientDependency", true);
 
     _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
             TUniqueId(), RuntimeFilterParamsContext::create(this), 
query_mem_tracker);
@@ -203,7 +201,6 @@ QueryContext::~QueryContext() {
     }
     _runtime_filter_mgr.reset();
     _execution_dependency.reset();
-    _memory_sufficient_dependency.reset();
     _shared_hash_table_controller.reset();
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
@@ -239,12 +236,18 @@ void QueryContext::set_memory_sufficient(bool sufficient) 
{
             _paused_timer.stop();
             _paused_period_secs += _paused_timer.elapsed_time() / (1000L * 
1000L * 1000L);
         }
-        _memory_sufficient_dependency->set_ready();
     } else {
-        _memory_sufficient_dependency->block();
         _paused_timer.start();
         ++_paused_count;
     }
+
+    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+        auto fragment_ctx = fragment_wptr.lock();
+        if (!fragment_ctx) {
+            continue;
+        }
+        fragment_ctx->set_memory_sufficient(sufficient);
+    }
 }
 
 void QueryContext::cancel(Status new_status, int fragment_id) {
@@ -530,15 +533,13 @@ std::vector<pipeline::PipelineTask*> 
QueryContext::get_revocable_tasks() const {
 std::string QueryContext::debug_string() {
     std::lock_guard l(_paused_mutex);
     return fmt::format(
-            "QueryId={}, Memory [Used={}, Limit={}, Peak={}], "
-            "Spill[RunningSpillTaskCnt={}, TotalPausedPeriodSecs={}, "
-            "MemorySufficient={}, LatestPausedReason={}]",
+            "QueryId={}, Memory [Used={}, Limit={}, Peak={}], 
Spill[RunningSpillTaskCnt={}, "
+            "TotalPausedPeriodSecs={}, LatestPausedReason={}]",
             print_id(_query_id),
             PrettyPrinter::print(query_mem_tracker->consumption(), 
TUnit::BYTES),
             PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES),
             PrettyPrinter::print(query_mem_tracker->peak_consumption(), 
TUnit::BYTES),
-            _revoking_tasks_count, _paused_period_secs, 
_memory_sufficient_dependency->ready(),
-            _paused_reason.to_string());
+            _revoking_tasks_count, _paused_period_secs, 
_paused_reason.to_string());
 }
 
 std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 9dd75cb340d..f16bd0fcf95 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -184,10 +184,6 @@ public:
 
     pipeline::Dependency* get_execution_dependency() { return 
_execution_dependency.get(); }
 
-    pipeline::Dependency* get_memory_sufficient_dependency() {
-        return _memory_sufficient_dependency.get();
-    }
-
     std::vector<pipeline::PipelineTask*> get_revocable_tasks() const;
 
     Status revoke_memory();
@@ -402,7 +398,6 @@ private:
     vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
-    std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency;
     std::vector<std::weak_ptr<pipeline::PipelineTask>> _pipeline_tasks;
 
     std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index c2c51a35429..b42aeeb1b43 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -692,8 +692,7 @@ void 
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
             
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
         }
     }
-    LOG(INFO) << debug_msg;
-    //LOG_EVERY_T(INFO, 60) << debug_msg;
+    LOG_EVERY_T(INFO, 60) << debug_msg;
 }
 
 void WorkloadGroupMgr::stop() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to