This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dfdb36cee59a0b071d8ee7a75cb14c24d886a2f0
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Sep 4 17:42:40 2024 +0800

    updated
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 10 ++++
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 +
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  7 ++-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  5 ++
 be/src/pipeline/pipeline_fragment_context.cpp      |  2 +-
 be/src/pipeline/pipeline_task.cpp                  | 53 +++++++++++++---------
 be/src/pipeline/task_scheduler.cpp                 | 42 +++++++++--------
 be/src/pipeline/task_scheduler.h                   | 29 +++++++++++-
 9 files changed, 106 insertions(+), 46 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 589af5e1f42..9bf6c422af4 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -114,6 +114,11 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
     if (!_should_build_hash_table) {
         return 0;
     }
+
+    if (_shared_state->build_block) {
+        return 0;
+    }
+
     size_t size_to_reserve = 0;
 
     if (!_build_side_mutable_block.empty()) {
@@ -660,4 +665,9 @@ size_t 
HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
     return local_state.get_reserve_mem_size(state);
 }
 
+size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const 
{
+    auto& local_state = get_local_state(state);
+    return local_state._mem_tracker->consumption();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index cb626a81cb1..9c95d0c10d6 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -128,6 +128,8 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state) override;
 
+    [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;
+
     bool should_dry_run(RuntimeState* state) override {
         return _is_broadcast_join && !state->get_sink_local_state()
                                               
->cast<HashJoinBuildSinkLocalState>()
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1a94d9077be..4a6abb32ed4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -331,6 +331,6 @@ private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
 
-    size_t _spill_partition_count_bits = 4;
+    size_t _spill_partition_count_bits = 6;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 9a939c62287..d91b424440a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -343,9 +343,10 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
 
         
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
         shared_state_sptr->spilled_streams[partition_index].reset();
+        const size_t rows = mutable_block ? mutable_block->rows() : 0;
         VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
                    << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-                   << ", recovery build data done";
+                   << ", recovery build data done, rows: " << rows;
     };
 
     auto exception_catch_func = [read_func, query_id, this]() {
@@ -664,7 +665,9 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
     VLOG_DEBUG << "query: " << print_id(state->query_id())
                << ", internal build operator finished, node id: " << node_id()
                << ", task id: " << state->task_id()
-               << ", partition: " << local_state._partition_cursor;
+               << ", partition: " << local_state._partition_cursor << "rows: " 
<< block.rows()
+               << ", usage: "
+               << 
_inner_sink_operator->get_memory_usage(local_state._runtime_state.get());
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 8d565bda4dc..e6a14aaf603 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -21,6 +21,7 @@
 
 #include <algorithm>
 
+#include "common/logging.h"
 #include "pipeline/exec/operator.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
@@ -543,6 +544,10 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                 });
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
+                VLOG_DEBUG << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
+                           << ", task id: " << state->task_id() << ", nonspill 
build usage: "
+                           << _inner_sink_operator->get_memory_usage(
+                                      
local_state._shared_state->inner_runtime_state.get());
             }
 
             
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 71dc601f014..70a3c0d01af 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1412,7 +1412,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             auto tnode_ = tnode;
             /// TODO: support rf in partitioned hash join
             tnode_.runtime_filters.clear();
-            const uint32_t partition_count = 32;
+            const uint32_t partition_count = 128;
             auto inner_probe_operator =
                     std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, 
descs);
             auto inner_sink_operator = 
std::make_shared<HashJoinBuildSinkOperatorX>(
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 5a987cba416..e591f58a163 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -250,6 +250,18 @@ bool PipelineTask::_wait_to_start() {
 }
 
 bool PipelineTask::_is_blocked() {
+    _blocked_dep = _spill_dependency->is_blocked_by(this);
+    if (_blocked_dep != nullptr) {
+        _blocked_dep->start_watcher();
+        return true;
+    }
+
+    _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this);
+    if (_blocked_dep != nullptr) {
+        _blocked_dep->start_watcher();
+        return true;
+    }
+
     // `_dry_run = true` means we do not need data from source operator.
     if (!_dry_run) {
         for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
@@ -263,22 +275,15 @@ bool PipelineTask::_is_blocked() {
             }
             // If all dependencies are ready for this operator, we can execute 
this task if no datum is needed from upstream operators.
             if (!_operators[i]->need_more_input_data(_state)) {
-                if (VLOG_DEBUG_IS_ON) {
-                    VLOG_DEBUG << "query: " << print_id(_state->query_id())
-                               << ", task id: " << _index << ", operator " << i
-                               << " not need_more_input_data";
-                }
+                // if (VLOG_DEBUG_IS_ON) {
+                //     VLOG_DEBUG << "query: " << print_id(_state->query_id())
+                //                << ", task id: " << _index << ", operator " 
<< i
+                //                << " not need_more_input_data";
+                // }
                 break;
             }
         }
     }
-
-    _blocked_dep = _spill_dependency->is_blocked_by(this);
-    if (_blocked_dep != nullptr) {
-        _blocked_dep->start_watcher();
-        return true;
-    }
-
     for (auto* op_dep : _write_dependencies) {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
@@ -286,12 +291,6 @@ bool PipelineTask::_is_blocked() {
             return true;
         }
     }
-
-    _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this);
-    if (_blocked_dep != nullptr) {
-        _blocked_dep->start_watcher();
-        return true;
-    }
     return false;
 }
 
@@ -390,10 +389,20 @@ Status PipelineTask::execute(bool* eos) {
             if (reserve_size > 0) {
                 auto st = thread_context()->try_reserve_memory(reserve_size);
                 if (!st.ok()) {
-                    LOG(INFO) << "query: " << print_id(query_id)
-                              << ", try to reserve: " << reserve_size
-                              << " failed: " << st.to_string()
-                              << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    VLOG_DEBUG << "query: " << print_id(query_id)
+                               << ", try to reserve: " << reserve_size << 
"(sink reserve size:("
+                               << sink_reserve_size << " )"
+                               << ", sink name: " << _sink->get_name()
+                               << ", node id: " << _sink->node_id()
+                               << ", is_wg_mem_high_water_mark: " << 
is_wg_mem_high_water_mark
+                               << " failed: " << st.to_string()
+                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    {
+                        _memory_sufficient_dependency->block();
+                        
_state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this);
+                        RETURN_IF_ERROR(_sink->revoke_memory(_state));
+                        continue;
+                    }
                     has_enough_memory = false;
                 }
             }
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index f76affffb70..5752256ce57 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -203,7 +203,8 @@ void TaskScheduler::add_paused_task(PipelineTask* task) {
     auto wg = query_ctx_sptr->workload_group();
     auto&& [it, inserted] = 
_paused_queries_list[wg].emplace(std::move(query_ctx_sptr));
     if (inserted) {
-        LOG(INFO) << "here insert one new paused query: " << 
print_id(it->get()->query_id());
+        LOG(INFO) << "here insert one new paused query: " << it->query_id()
+                  << ", wg: " << (void*)(wg.get());
     }
 
     _paused_queries_cv.notify_all();
@@ -249,8 +250,8 @@ void TaskScheduler::_paused_queries_handler() {
                 if (!is_low_wartermark && !is_high_wartermark) {
                     LOG(INFO) << "**** there are " << queries_list.size() << " 
to resume";
                     for (const auto& query : queries_list) {
-                        LOG(INFO) << "**** resume paused query: " << 
print_id(query->query_id());
-                        query->set_memory_sufficient(true);
+                        LOG(INFO) << "**** resume paused query: " << 
query.query_id();
+                        query.query_ctx->set_memory_sufficient(true);
                     }
 
                     queries_list.clear();
@@ -269,7 +270,7 @@ void TaskScheduler::_paused_queries_handler() {
                 auto it_to_remove = queries_list.end();
 
                 for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
-                    const auto& query_ctx = *query_it;
+                    const auto& query_ctx = query_it->query_ctx;
                     size_t revocable_size = 0;
                     size_t memory_usage = 0;
                     bool has_running_task = false;
@@ -324,24 +325,27 @@ void TaskScheduler::_paused_queries_handler() {
                 } else if (max_memory_usage_query) {
                     bool new_is_low_wartermark = false;
                     bool new_is_high_wartermark = false;
+                    const auto query_id = 
print_id(max_memory_usage_query->query_id());
                     wg->check_mem_used(&new_is_low_wartermark, 
&new_is_high_wartermark);
-                    if (new_is_high_wartermark) {
-                        LOG(INFO) << "memory insufficient and cannot find 
revocable query, cancel "
-                                     "the query: "
-                                  << 
print_id(max_memory_usage_query->query_id())
-                                  << ", usage: " << max_memory_usage
+                    if (!new_is_low_wartermark || it_to_remove->elapsed_time() 
< 2000) {
+                        LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
+                                     "the max usage query: "
+                                  << query_id << ", usage: " << 
max_memory_usage
+                                  << ", elapsed: " << 
it_to_remove->elapsed_time()
                                   << ", wg info: " << wg->debug_string();
-                        max_memory_usage_query->cancel(Status::InternalError(
-                                "memory insufficient and cannot find revocable 
query, cancel the "
-                                "biggest usage({}) query({})",
-                                max_memory_usage, 
print_id(max_memory_usage_query->query_id())));
-                    } else {
-                        LOG(INFO) << "new_is_high_wartermark is false, resume 
max memory usage "
-                                     "paused query: "
-                                  << 
print_id(max_memory_usage_query->query_id());
-                        max_memory_usage_query->set_memory_sufficient(true);
-                        queries_list.erase(it_to_remove);
+                        continue;
                     }
+
+                    LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
+                                 "cancel "
+                                 "the query: "
+                              << query_id << ", usage: " << max_memory_usage
+                              << ", wg info: " << wg->debug_string();
+                    max_memory_usage_query->cancel(Status::InternalError(
+                            "memory insufficient and cannot find revocable 
query, cancel "
+                            "the "
+                            "biggest usage({}) query({})",
+                            max_memory_usage, query_id));
                 }
             }
         }
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 987dcd81bd0..bed38887037 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <atomic>
+#include <chrono>
 #include <condition_variable>
 #include <cstddef>
 #include <list>
@@ -32,6 +33,7 @@
 #include "runtime/query_context.h"
 #include "runtime/workload_group/workload_group.h"
 #include "util/thread.h"
+#include "util/uid_util.h"
 
 namespace doris {
 class ExecEnv;
@@ -44,6 +46,31 @@ class TaskQueue;
 
 namespace doris::pipeline {
 
+struct PausedQuery {
+    std::shared_ptr<QueryContext> query_ctx;
+    std::chrono::system_clock::time_point enqueue_at;
+    size_t last_mem_usage {0};
+
+    PausedQuery(std::shared_ptr<QueryContext> query_ctx_)
+            : query_ctx(std::move(query_ctx_)), 
_query_id(print_id(query_ctx->query_id())) {
+        enqueue_at = std::chrono::system_clock::now();
+    }
+
+    int64_t elapsed_time() const {
+        auto now = std::chrono::system_clock::now();
+        return std::chrono::duration_cast<std::chrono::milliseconds>(now - 
enqueue_at).count();
+    }
+
+    std::string query_id() const { return _query_id; }
+
+    bool operator<(const PausedQuery& other) const { return _query_id < 
other._query_id; }
+
+    bool operator==(const PausedQuery& other) const { return _query_id == 
other._query_id; }
+
+private:
+    std::string _query_id;
+};
+
 class TaskScheduler {
 public:
     TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, 
std::string name,
@@ -72,7 +99,7 @@ private:
     std::string _name;
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
 
-    std::map<WorkloadGroupPtr, std::set<std::shared_ptr<QueryContext>>> 
_paused_queries_list;
+    std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list;
     std::mutex _paused_queries_lock;
     std::condition_variable _paused_queries_cv;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to