This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 2ae4c86c81e [fix](spill) Thread conflicts caused by non-sink operators 
spilling (#45796)
2ae4c86c81e is described below

commit 2ae4c86c81eef96b3c035f9fd076921b563b87f1
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Mon Dec 23 17:40:09 2024 +0800

    [fix](spill) Thread conflicts caused by non-sink operators spilling (#45796)
---
 be/src/pipeline/exec/operator.h                    |  8 ----
 .../exec/partitioned_hash_join_probe_operator.cpp  | 55 +++++++---------------
 .../exec/partitioned_hash_join_probe_operator.h    |  6 +--
 be/src/pipeline/exec/spill_utils.h                 | 39 +++------------
 be/src/pipeline/pipeline_task.cpp                  |  4 +-
 5 files changed, 25 insertions(+), 87 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index af13ded196e..3267416e4b2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -857,14 +857,6 @@ public:
         return (_child and !is_source()) ? _child->revocable_mem_size(state) : 
0;
     }
 
-    Status revoke_memory(RuntimeState* state,
-                         const std::shared_ptr<SpillContext>& spill_context) 
override {
-        if (_child and !is_source()) {
-            return _child->revoke_memory(state, spill_context);
-        }
-        return Status::OK();
-    }
-
     // If this method is not overwrite by child, its default value is 1MB
     [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
         return state->minimum_operator_memory_required_bytes();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f6cea157cd5..f5568ba1022 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -172,8 +172,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
-        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto query_id = state->query_id();
 
@@ -208,7 +207,9 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                         std::numeric_limits<size_t>::max(), 
_runtime_profile.get()));
             }
 
-            auto merged_block = 
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
+            auto merged_block = 
vectorized::MutableBlock::create_unique(std::move(blocks.back()));
+            blocks.pop_back();
+
             while (!blocks.empty() && !state->is_cancelled()) {
                 auto block = std::move(blocks.back());
                 blocks.pop_back();
@@ -218,17 +219,9 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                     return Status::Error<INTERNAL_ERROR>(
                             "fault_inject partitioned_hash_join_probe 
spill_probe_blocks failed");
                 });
-
-                if (merged_block->allocated_bytes() >=
-                    vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
-                    COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
-                    RETURN_IF_ERROR(
-                            spilling_stream->spill_block(state, 
merged_block->to_block(), false));
-                    COUNTER_UPDATE(_spill_probe_blocks, 1);
-                }
             }
 
-            if (!merged_block->empty()) {
+            if (!merged_block->empty()) [[likely]] {
                 COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
                 RETURN_IF_ERROR(
                         spilling_stream->spill_block(state, 
merged_block->to_block(), false));
@@ -256,9 +249,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
         return status;
     };
 
-    if (spill_context) {
-        spill_context->on_non_sink_task_started();
-    }
     _spill_dependency->block();
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
         return Status::Error<INTERNAL_ERROR>(
@@ -266,8 +256,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
     });
 
     auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
-            state, spill_context, _spill_dependency, _runtime_profile.get(),
-            _shared_state->shared_from_this(), exception_catch_func);
+            state, _spill_dependency, _runtime_profile.get(), 
_shared_state->shared_from_this(),
+            exception_catch_func);
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
@@ -856,27 +846,6 @@ size_t 
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
     return size_to_reserve;
 }
 
-Status PartitionedHashJoinProbeOperatorX::revoke_memory(
-        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
-    auto& local_state = get_local_state(state);
-    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
-               << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos;
-
-    if (local_state._child_eos) {
-        VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash 
probe node: " << node_id()
-                   << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos
-                   << ", will not revoke size: " << revocable_mem_size(state);
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context));
-
-    if (_child) {
-        return _child->revoke_memory(state, spill_context);
-    }
-    return Status::OK();
-}
-
 Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
     auto& local_state = get_local_state(state);
     VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
@@ -891,7 +860,15 @@ bool 
PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
     if (local_state._shared_state->need_to_spill) {
         const auto revocable_size = _revocable_mem_size(state);
         const auto min_revocable_size = state->min_revocable_mem();
-        return revocable_size > min_revocable_size;
+
+        if (state->get_query_ctx()->low_memory_mode()) {
+            return revocable_size >
+                   std::min<int64_t>(min_revocable_size,
+                                     static_cast<int64_t>(
+                                             
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM));
+        } else {
+            return vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+        }
     }
     return false;
 }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 7b77e1e6e3f..40c6b0fcef5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -47,8 +47,7 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
-    Status spill_probe_blocks(RuntimeState* state,
-                              const std::shared_ptr<SpillContext>& 
spill_context = nullptr);
+    Status spill_probe_blocks(RuntimeState* state);
 
     Status recover_build_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
                                           bool& has_data);
@@ -181,9 +180,6 @@ public:
         return _inner_probe_operator->require_data_distribution();
     }
 
-    Status revoke_memory(RuntimeState* state,
-                         const std::shared_ptr<SpillContext>& spill_context) 
override;
-
 private:
     Status _revoke_memory(RuntimeState* state);
 
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index 84a3f8c2e29..c0bb47960c5 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -50,36 +50,18 @@ struct SpillContext {
 
     ~SpillContext() {
         if (running_tasks_count.load() != 0) {
-            LOG_EVERY_T(WARNING, 60) << "Query: " << print_id(query_id)
-                                     << " not all spill tasks finished, 
remaining tasks: "
-                                     << running_tasks_count.load();
-        }
-
-        if (_running_non_sink_tasks_count.load() != 0) {
-            LOG_EVERY_T(WARNING, 60)
-                    << "Query: " << print_id(query_id)
-                    << " not all spill tasks(non sink tasks) finished, 
remaining tasks: "
-                    << _running_non_sink_tasks_count.load();
+            LOG(WARNING) << "Query: " << print_id(query_id)
+                         << " not all spill tasks finished, remaining tasks: "
+                         << running_tasks_count.load();
         }
     }
 
     void on_task_finished() {
         auto count = running_tasks_count.fetch_sub(1);
-        if (count == 1 && _running_non_sink_tasks_count.load() == 0) {
-            all_tasks_finished_callback(this);
-        }
-    }
-
-    void on_non_sink_task_started() { 
_running_non_sink_tasks_count.fetch_add(1); }
-    void on_non_sink_task_finished() {
-        const auto count = _running_non_sink_tasks_count.fetch_sub(1);
-        if (count == 1 && running_tasks_count.load() == 0) {
+        if (count == 1) {
             all_tasks_finished_callback(this);
         }
     }
-
-private:
-    std::atomic_int _running_non_sink_tasks_count {0};
 };
 
 class SpillRunnable : public Runnable {
@@ -233,20 +215,13 @@ public:
 
 class SpillNonSinkRunnable : public SpillRunnable {
 public:
-    SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> 
spill_context,
-                         std::shared_ptr<Dependency> spill_dependency, 
RuntimeProfile* profile,
+    SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<Dependency> 
spill_dependency,
+                         RuntimeProfile* profile,
                          const std::shared_ptr<BasicSpillSharedState>& 
shared_state,
                          std::function<Status()> spill_exec_func,
                          std::function<Status()> spill_fin_cb = {})
-            : SpillRunnable(state, spill_context, spill_dependency, profile, 
shared_state, true,
+            : SpillRunnable(state, nullptr, spill_dependency, profile, 
shared_state, true,
                             spill_exec_func, spill_fin_cb) {}
-
-protected:
-    void _on_task_finished() override {
-        if (_spill_context) {
-            _spill_context->on_non_sink_task_finished();
-        }
-    }
 };
 
 class SpillRecoverRunnable : public SpillRunnable {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 9d284b31861..0ddc329da3b 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -619,7 +619,7 @@ size_t PipelineTask::get_revocable_size() const {
         return 0;
     }
 
-    return _sink->revocable_mem_size(_state) + 
_root->revocable_mem_size(_state);
+    return _sink->revocable_mem_size(_state);
 }
 
 Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
@@ -632,8 +632,6 @@ Status PipelineTask::revoke_memory(const 
std::shared_ptr<SpillContext>& spill_co
         return Status::OK();
     }
 
-    RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context));
-
     const auto revocable_size = _sink->revocable_mem_size(_state);
     if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
         RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to