This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5759d685f6761ffb1a77749dac6b5970e08a726c
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Sep 6 11:30:07 2024 +0800

    [opt](spill) Avoid occupying a large amount of memory in join build side
---
 be/src/pipeline/exec/operator.h                    | 19 +++++---
 .../exec/partitioned_aggregation_sink_operator.cpp |  6 +++
 .../partitioned_aggregation_source_operator.cpp    |  7 ++-
 .../exec/partitioned_aggregation_source_operator.h |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 53 ++++++++++++++++++----
 .../exec/partitioned_hash_join_probe_operator.h    |  9 +++-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 26 ++++++-----
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  5 ++
 .../pipeline/exec/spill_sort_source_operator.cpp   |  9 ++--
 be/src/pipeline/exec/spill_sort_source_operator.h  |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  2 +-
 be/src/pipeline/pipeline_task.cpp                  | 25 ++++++----
 be/src/pipeline/pipeline_task.h                    | 21 +++++++--
 be/src/pipeline/task_scheduler.cpp                 | 38 +++++++++-------
 be/src/runtime/runtime_state.h                     |  6 ---
 be/src/vec/spill/spill_stream.h                    |  3 +-
 16 files changed, 161 insertions(+), 72 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 14cd56f5751..d111a2a7e24 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -417,11 +417,7 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
-    Status open(RuntimeState* state) override {
-        _spill_dependency = state->get_spill_dependency();
-        DCHECK(_spill_dependency != nullptr);
-        return Status::OK();
-    }
+    Status open(RuntimeState* state) override { return Status::OK(); }
 
     Status close(RuntimeState* state, Status exec_status) override;
 
@@ -449,7 +445,7 @@ public:
 
 protected:
     Dependency* _dependency = nullptr;
-    Dependency* _spill_dependency = nullptr;
+    std::shared_ptr<Dependency> _spill_dependency;
     SharedStateType* _shared_state = nullptr;
 
 private:
@@ -734,6 +730,17 @@ public:
         }
     }
 
+    size_t revocable_mem_size(RuntimeState* state) const override {
+        return (_child_x and !is_source()) ? 
_child_x->revocable_mem_size(state) : 0;
+    }
+
+    Status revoke_memory(RuntimeState* state) override {
+        if (_child_x and !is_source()) {
+            return _child_x->revoke_memory(state);
+        }
+        return Status::OK();
+    }
+
     virtual std::string debug_string(int indentation_level = 0) const;
 
     virtual std::string debug_string(RuntimeState* state, int 
indentation_level = 0) const;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 314806529b7..5c26cfb6b97 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -22,7 +22,9 @@
 
 #include "aggregation_sink_operator.h"
 #include "common/status.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "vec/spill/spill_stream_manager.h"
 
@@ -58,6 +60,10 @@ Status 
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
         
value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column());
     }
 
+    _spill_dependency = Dependency::create_shared(parent.operator_id(), 
parent.node_id(),
+                                                  "AggSinkSpillDependency", 
true);
+    state->get_task()->add_spill_dependency(_spill_dependency.get());
+
     _finish_dependency->block();
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index bdbd395ee99..fa41723beba 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/runtime_profile.h"
 #include "vec/spill/spill_stream_manager.h"
@@ -38,6 +39,10 @@ Status PartitionedAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     _init_counters();
+    _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  "AggSourceSpillDependency", 
true);
+    state->get_task()->add_spill_dependency(_spill_dependency.get());
+
     return Status::OK();
 }
 
@@ -48,8 +53,6 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
         return Status::OK();
     }
     _opened = true;
-    _spill_dependency = state->get_spill_dependency();
-    DCHECK(_spill_dependency != nullptr);
     RETURN_IF_ERROR(setup_in_memory_agg_op(state));
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index c09046d840a..3505cf7eed8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -59,7 +59,7 @@ protected:
     bool _current_partition_eos = true;
     bool _is_merging = false;
 
-    Dependency* _spill_dependency {nullptr};
+    std::shared_ptr<Dependency> _spill_dependency;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     RuntimeProfile::Counter* _get_results_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index d91b424440a..51b6e143b3c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -41,6 +41,10 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
     _partitioned_blocks.resize(p._partition_count);
     _probe_spilling_streams.resize(p._partition_count);
 
+    _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  
"HashJoinProbeSpillDependency", true);
+    state->get_task()->add_spill_dependency(_spill_dependency.get());
+
     _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition");
     _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", 
"Partition");
     _partition_shuffle_timer = ADD_CHILD_TIMER(profile(), 
"PartitionShuffleTime", "Partition");
@@ -144,8 +148,6 @@ void 
PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch
 
 Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(PipelineXSpillLocalState::open(state));
-    _spill_dependency = state->get_spill_dependency();
-    DCHECK(_spill_dependency != nullptr);
     return 
_parent->cast<PartitionedHashJoinProbeOperatorX>()._partitioner->clone(state,
                                                                                
   _partitioner);
 }
@@ -160,13 +162,16 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state, bool force) {
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto query_id = state->query_id();
 
+    const auto spill_size_threshold = force ? 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM
+                                            : 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+
     MonotonicStopWatch submit_timer;
     submit_timer.start();
-    auto spill_func = [query_id, state, submit_timer, this] {
+    auto spill_func = [query_id, state, submit_timer, spill_size_threshold, 
this] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_spill_probe_timer);
 
@@ -175,8 +180,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
              ++partition_index) {
             auto& blocks = _probe_blocks[partition_index];
             auto& partitioned_block = _partitioned_blocks[partition_index];
-            if (partitioned_block && partitioned_block->allocated_bytes() >=
-                                             
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+            if (partitioned_block && partitioned_block->allocated_bytes() >= 
spill_size_threshold) {
                 blocks.emplace_back(partitioned_block->to_block());
                 partitioned_block.reset();
             }
@@ -756,6 +760,22 @@ bool 
PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state
 
 size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
+    if (local_state._child_eos) {
+        return 0;
+    }
+
+    auto revocable_size = _revocable_mem_size(state, true);
+    if (_child_x) {
+        revocable_size += _child_x->revocable_mem_size(state);
+    }
+    return revocable_size;
+}
+
+size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* 
state,
+                                                              bool force) 
const {
+    const auto spill_size_threshold = force ? 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM
+                                            : 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+    auto& local_state = get_local_state(state);
     size_t mem_size = 0;
     auto& probe_blocks = local_state._probe_blocks;
     for (uint32_t i = 0; i < _partition_count; ++i) {
@@ -766,7 +786,7 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
         auto& partitioned_block = local_state._partitioned_blocks[i];
         if (partitioned_block) {
             auto block_bytes = partitioned_block->allocated_bytes();
-            if (block_bytes >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+            if (block_bytes >= spill_size_threshold) {
                 mem_size += block_bytes;
             }
         }
@@ -774,6 +794,23 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
+Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
+               << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos;
+
+    if (local_state._child_eos) {
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
+
+    if (_child_x) {
+        return _child_x->revoke_memory(state);
+    }
+    return Status::OK();
+}
+
 Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
     auto& local_state = get_local_state(state);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
@@ -786,7 +823,7 @@ Status 
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
 bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
     if (local_state._shared_state->need_to_spill) {
-        const auto revocable_size = revocable_mem_size(state);
+        const auto revocable_size = _revocable_mem_size(state);
         const auto min_revocable_size = state->min_revocable_mem();
         return revocable_size > min_revocable_size;
     }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index f1b635208eb..7a4ba1ed50b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <cstdint>
+#include <memory>
 
 #include "common/status.h"
 #include "operator.h"
@@ -46,7 +47,7 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
-    Status spill_probe_blocks(RuntimeState* state);
+    Status spill_probe_blocks(RuntimeState* state, bool force = false);
 
     Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
                                            bool& has_data);
@@ -89,7 +90,7 @@ private:
 
     bool _need_to_setup_internal_operators {true};
 
-    Dependency* _spill_dependency {nullptr};
+    std::shared_ptr<Dependency> _spill_dependency;
 
     RuntimeProfile::Counter* _spill_and_partition_label = nullptr;
     RuntimeProfile::Counter* _partition_timer = nullptr;
@@ -186,9 +187,13 @@ public:
         return _inner_probe_operator->require_data_distribution();
     }
 
+    Status revoke_memory(RuntimeState* state) override;
+
 private:
     Status _revoke_memory(RuntimeState* state);
 
+    size_t _revocable_mem_size(RuntimeState* state, bool force = false) const;
+
     friend class PartitionedHashJoinProbeLocalState;
 
     [[nodiscard]] Status 
_setup_internal_operators(PartitionedHashJoinProbeLocalState& local_state,
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index e6a14aaf603..a073d922769 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -23,6 +23,7 @@
 
 #include "common/logging.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
 #include "util/runtime_profile.h"
@@ -39,6 +40,10 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _shared_state->partitioned_build_blocks.resize(p._partition_count);
     _shared_state->spilled_streams.resize(p._partition_count);
 
+    _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  
"HashJoinBuildSpillDependency", true);
+    state->get_task()->add_spill_dependency(_spill_dependency.get());
+
     _internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
 
     _partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime", 
"Spill", 1);
@@ -548,6 +553,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                            << ", task id: " << state->task_id() << ", nonspill 
build usage: "
                            << _inner_sink_operator->get_memory_usage(
                                       
local_state._shared_state->inner_runtime_state.get());
+            } else {
+                return revoke_memory(state);
             }
 
             
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
@@ -565,6 +572,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (need_to_spill) {
         RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
+        if (eos) {
+            return revoke_memory(state);
+        }
     } else {
         if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
             RETURN_IF_ERROR(_setup_internal_operator(state));
@@ -576,18 +586,12 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
         });
         RETURN_IF_ERROR(_inner_sink_operator->sink(
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
-    }
 
-    if (eos) {
-        LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                  << ", task id: " << state->task_id() << ", need spil: " << 
need_to_spill;
-        
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
-                      
local_state._shared_state->partitioned_build_blocks.end(), [&](auto& block) {
-                          if (block) {
-                              COUNTER_UPDATE(local_state._in_mem_rows_counter, 
block->rows());
-                          }
-                      });
-        local_state._dependency->set_ready_to_read();
+        if (eos) {
+            LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
+                      << ", task id: " << state->task_id();
+            local_state._dependency->set_ready_to_read();
+        }
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index c82404afb03..9dcb66240df 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -19,6 +19,7 @@
 
 #include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "vec/spill/spill_stream_manager.h"
 
@@ -38,6 +39,10 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* 
state,
 
     _init_counters();
 
+    _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  "SortSinkSpillDependency", 
true);
+    state->get_task()->add_spill_dependency(_spill_dependency.get());
+
     RETURN_IF_ERROR(setup_in_memory_sort_op(state));
 
     Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index c3f9f633cd3..601188ae02e 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "sort_source_operator.h"
 #include "util/runtime_profile.h"
@@ -37,6 +38,11 @@ Status SpillSortLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
+
+    _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  "SortSourceSpillDependency", 
true);
+    state->get_task()->add_spill_dependency(_spill_dependency.get());
+
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
     _spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillMergeSortTime", "Spill", 1);
     _spill_merge_sort_timer =
@@ -61,9 +67,6 @@ Status SpillSortLocalState::open(RuntimeState* state) {
         return Status::OK();
     }
 
-    _spill_dependency = state->get_spill_dependency();
-    DCHECK(_spill_dependency != nullptr);
-
     RETURN_IF_ERROR(setup_in_memory_sort_op(state));
     return Base::open(state);
 }
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 5674e18ef69..e372a9039bf 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -60,7 +60,7 @@ protected:
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
 
-    Dependency* _spill_dependency {nullptr};
+    std::shared_ptr<Dependency> _spill_dependency;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     // counters for spill merge sort
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d4a300e58a2..5d1b6aaaa1b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1412,7 +1412,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             auto tnode_ = tnode;
             /// TODO: support rf in partitioned hash join
             tnode_.runtime_filters.clear();
-            const uint32_t partition_count = 128;
+            const uint32_t partition_count = 32;
             auto inner_probe_operator =
                     std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, 
descs);
             auto inner_sink_operator = 
std::make_shared<HashJoinBuildSinkOperatorX>(
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index c34c40580a9..e27d73be62e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -72,9 +72,6 @@ PipelineTask::PipelineTask(
                   state->get_query_ctx()->get_memory_sufficient_dependency()) {
     _pipeline_task_watcher.start();
 
-    _spill_dependency = Dependency::create_shared(-1, -1, 
"PipelineTaskSpillDependency", true);
-
-    _state->set_spill_dependency(_spill_dependency.get());
     auto shared_state = _sink->create_shared_state();
     if (shared_state) {
         _sink_shared_state = shared_state;
@@ -250,10 +247,12 @@ bool PipelineTask::_wait_to_start() {
 }
 
 bool PipelineTask::_is_blocked() {
-    _blocked_dep = _spill_dependency->is_blocked_by(this);
-    if (_blocked_dep != nullptr) {
-        _blocked_dep->start_watcher();
-        return true;
+    for (auto* spill_dependency : _spill_dependencies) {
+        _blocked_dep = spill_dependency->is_blocked_by(this);
+        if (_blocked_dep != nullptr) {
+            _blocked_dep->start_watcher();
+            return true;
+        }
     }
 
     _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this);
@@ -528,11 +527,19 @@ std::string PipelineTask::debug_string() {
 }
 
 size_t PipelineTask::get_revocable_size() const {
-    return (_running || _eos) ? 0 : _sink->revocable_mem_size(_state);
+    if (_running || _eos) {
+        return 0;
+    }
+
+    auto revocable_size = _root->revocable_mem_size(_state);
+    revocable_size += _sink->revocable_mem_size(_state);
+
+    return revocable_size;
 }
 
 Status PipelineTask::revoke_memory() {
-    return _sink->revoke_memory(_state);
+    RETURN_IF_ERROR(_sink->revoke_memory(_state));
+    return _root->revoke_memory(_state);
 }
 
 void PipelineTask::wake_up() {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 79da888cda6..943366b4b70 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -141,7 +141,10 @@ public:
         if (!_finalized) {
             _execution_dep->set_always_ready();
             _memory_sufficient_dependency->set_always_ready();
-            _spill_dependency->set_always_ready();
+            for (auto* dep : _spill_dependencies) {
+                dep->set_always_ready();
+            }
+
             for (auto* dep : _filter_dependencies) {
                 dep->set_always_ready();
             }
@@ -198,7 +201,14 @@ public:
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
 
     bool is_running() { return _running.load(); }
-    bool is_revoking() { return _spill_dependency->is_blocked_by(nullptr) != 
nullptr; }
+    bool is_revoking() {
+        for (auto* dep : _spill_dependencies) {
+            if (dep->is_blocked_by(nullptr) != nullptr) {
+                return true;
+            }
+        }
+        return false;
+    }
     bool set_running(bool running) { return _running.exchange(running); }
 
     bool is_exceed_debug_timeout() {
@@ -240,6 +250,10 @@ public:
     [[nodiscard]] size_t get_revocable_size() const;
     [[nodiscard]] Status revoke_memory();
 
+    void add_spill_dependency(Dependency* dependency) {
+        _spill_dependencies.emplace_back(dependency);
+    }
+
 private:
     friend class RuntimeFilterDependency;
     bool _is_blocked();
@@ -298,6 +312,7 @@ private:
 
     // `_read_dependencies` is stored as same order as `_operators`
     std::vector<std::vector<Dependency*>> _read_dependencies;
+    std::vector<Dependency*> _spill_dependencies;
     std::vector<Dependency*> _write_dependencies;
     std::vector<Dependency*> _finish_dependencies;
     std::vector<Dependency*> _filter_dependencies;
@@ -317,8 +332,6 @@ private:
 
     Dependency* _memory_sufficient_dependency = nullptr;
 
-    std::shared_ptr<Dependency> _spill_dependency;
-
     std::atomic<bool> _finalized {false};
     std::mutex _dependency_lock;
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 0aabeac4933..a4379f73b64 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -331,26 +331,30 @@ void TaskScheduler::_paused_queries_handler() {
                     bool new_is_high_wartermark = false;
                     const auto query_id = 
print_id(max_memory_usage_query->query_id());
                     wg->check_mem_used(&new_is_low_wartermark, 
&new_is_high_wartermark);
-                    if (!new_is_low_wartermark || it_to_remove->elapsed_time() 
< 2000) {
-                        LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
-                                     "the max usage query: "
+                    if (new_is_high_wartermark) {
+                        if (it_to_remove->elapsed_time() < 2000) {
+                            LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
+                                         "the max usage query: "
+                                      << query_id << ", usage: " << 
max_memory_usage
+                                      << ", elapsed: " << 
it_to_remove->elapsed_time()
+                                      << ", wg info: " << wg->debug_string();
+                            continue;
+                        }
+                        max_memory_usage_query->cancel(Status::InternalError(
+                                "memory insufficient and cannot find revocable 
query, cancel "
+                                "the "
+                                "biggest usage({}) query({})",
+                                max_memory_usage, query_id));
+                        queries_list.erase(it_to_remove);
+
+                    } else {
+                        LOG(INFO) << "non high water mark, resume "
+                                     "the query: "
                                   << query_id << ", usage: " << 
max_memory_usage
-                                  << ", elapsed: " << 
it_to_remove->elapsed_time()
                                   << ", wg info: " << wg->debug_string();
-                        continue;
+                        max_memory_usage_query->set_memory_sufficient(true);
+                        queries_list.erase(it_to_remove);
                     }
-
-                    LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
-                                 "cancel "
-                                 "the query: "
-                              << query_id << ", usage: " << max_memory_usage
-                              << ", wg info: " << wg->debug_string();
-                    // Should use memory exceed error code, so that FE may do 
retry for this error
-                    max_memory_usage_query->cancel(Status::MemoryLimitExceeded(
-                            "memory insufficient and cannot find revocable 
query, cancel "
-                            "the "
-                            "biggest usage({}) query({})",
-                            max_memory_usage, query_id));
                 }
             }
         }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 87c54564ae5..442268850bf 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -603,10 +603,6 @@ public:
         return _partial_update_auto_inc_column;
     }
 
-    void set_spill_dependency(pipeline::Dependency* dependency) { 
_spill_dependency = dependency; }
-
-    pipeline::Dependency* get_spill_dependency() { return _spill_dependency; }
-
 private:
     Status create_error_log_file();
 
@@ -702,8 +698,6 @@ private:
     int _task_id = -1;
     int _task_num = 0;
 
-    pipeline::Dependency* _spill_dependency;
-
     std::vector<THivePartitionUpdate> _hive_partition_updates;
 
     std::vector<TIcebergCommitData> _iceberg_commit_datas;
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index ad30a0bbd1d..7a4bb4980b1 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -35,7 +35,8 @@ class SpillDataDir;
 class SpillStream {
 public:
     // to avoid too many small file writes
-    static constexpr int MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
+    static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
+    static constexpr size_t MAX_SPILL_WRITE_BATCH_MEM = 32 * 1024 * 1024;
     SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
                 std::string spill_dir, size_t batch_rows, size_t batch_bytes,
                 RuntimeProfile* profile);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to