This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f8af843e0b547e4228052c6bd1a777845c5d5f49
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Tue Dec 10 12:33:30 2024 +0800

    fix spill cancel and cleanup some code
---
 be/src/pipeline/dependency.h                       |  2 --
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  |  4 ---
 .../exec/partitioned_aggregation_sink_operator.cpp |  9 -------
 .../partitioned_aggregation_source_operator.cpp    |  2 --
 .../exec/partitioned_hash_join_probe_operator.cpp  |  3 ---
 .../exec/partitioned_hash_join_sink_operator.cpp   | 12 +++------
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  9 -------
 .../pipeline/exec/spill_sort_source_operator.cpp   |  2 --
 be/src/pipeline/exec/spill_utils.h                 | 31 +++++++++++++++-------
 .../workload_group/workload_group_manager.cpp      |  5 +++-
 10 files changed, 29 insertions(+), 50 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index bb6cf4e6c91..7ce81a23dbd 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -426,8 +426,6 @@ private:
 struct BasicSpillSharedState {
     virtual ~BasicSpillSharedState() = default;
 
-    AtomicStatus _spill_status;
-
     // These two counters are shared to spill source operators as the initial 
value
     // of 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount'.
     // Total bytes of spill data written to disk file(after serialized)
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 0b4e6b4078c..8fc3bc6015c 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -284,10 +284,6 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block
 
     const auto block_mem_size = block->allocated_bytes();
 
-    if (!_shared_state->_spill_status.ok()) {
-        return _shared_state->_spill_status.status();
-    }
-
     {
         std::lock_guard l(_mutex);
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 88d39eb93df..8a29a88c82c 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -173,9 +173,6 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    if (!local_state._shared_state->_spill_status.ok()) {
-        return local_state._shared_state->_spill_status.status();
-    }
     local_state._eos = eos;
     auto* runtime_state = local_state._runtime_state.get();
     DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::sink", {
@@ -214,9 +211,6 @@ Status PartitionedAggSinkOperatorX::revoke_memory(
 
 size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) 
const {
     auto& local_state = get_local_state(state);
-    if (!local_state._shared_state->_spill_status.ok()) {
-        return UINT64_MAX;
-    }
     auto* runtime_state = local_state._runtime_state.get();
     auto size = _agg_sink_operator->get_revocable_mem_size(runtime_state);
     return size;
@@ -265,9 +259,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                << Base::_parent->node_id()
                << " revoke_memory, size: " << 
_parent->revocable_mem_size(state)
                << ", eos: " << _eos;
-    if (!_shared_state->_spill_status.ok()) {
-        return _shared_state->_spill_status.status();
-    }
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
         profile()->add_info_string("Spilled", "true");
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 046f8df44ae..6bd601383f7 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -151,8 +151,6 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
 
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    status = local_state._shared_state->_spill_status.status();
-    RETURN_IF_ERROR(status);
 
     if (local_state._shared_state->is_spilled &&
         local_state._need_to_merge_data_for_current_partition) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 1e3c1d18a71..1cdb5ce7be0 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -698,9 +698,6 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
 Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
                                                vectorized::Block* 
output_block, bool* eos) const {
     auto& local_state = get_local_state(state);
-    if (!local_state._shared_state->_spill_status.ok()) {
-        return local_state._shared_state->_spill_status.status();
-    }
 
     const auto partition_index = local_state._partition_cursor;
     auto& probe_blocks = local_state._probe_blocks[partition_index];
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d88ff691b6c..abf81a068e7 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -479,16 +479,15 @@ Status PartitionedHashJoinSinkLocalState::_spill_to_disk(
         uint32_t partition_index, const vectorized::SpillStreamSPtr& 
spilling_stream) {
     auto& partitioned_block = 
_shared_state->partitioned_build_blocks[partition_index];
 
-    Status status = _shared_state->_spill_status.status();
-    if (status.ok()) {
+    if (!_state->is_cancelled()) {
         auto block = partitioned_block->to_block();
         int64_t block_mem_usage = block.allocated_bytes();
         Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(), 
-block_mem_usage); }};
         partitioned_block = 
vectorized::MutableBlock::create_unique(block.clone_empty());
-        status = spilling_stream->spill_block(state(), block, false);
+        return spilling_stream->spill_block(state(), block, false);
+    } else {
+        return _state->cancel_reason();
     }
-
-    return status;
 }
 
 PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* 
pool,
@@ -580,9 +579,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr);
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    if (!local_state._shared_state->_spill_status.ok()) {
-        return local_state._shared_state->_spill_status.status();
-    }
 
     local_state._child_eos = eos;
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 0dfc157ad33..25205ab09fe 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -144,9 +144,6 @@ Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* 
state,
 
 size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
     auto& local_state = get_local_state(state);
-    if (!local_state._shared_state->_spill_status.ok()) {
-        return UINT64_MAX;
-    }
     return 
_sort_sink_operator->get_revocable_mem_size(local_state._runtime_state.get());
 }
 
@@ -155,9 +152,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     auto& local_state = get_local_state(state);
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    if (!local_state._shared_state->_spill_status.ok()) {
-        return local_state._shared_state->_spill_status.status();
-    }
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
         
local_state._shared_state->update_spill_block_batch_row_count(in_block);
@@ -203,9 +197,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
     VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node "
                << Base::_parent->node_id() << " revoke_memory"
                << ", eos: " << _eos;
-    if (!_shared_state->_spill_status.ok()) {
-        return _shared_state->_spill_status.status();
-    }
 
     auto status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
             state, _spilling_stream, print_id(state->query_id()), "sort", 
_parent->node_id(),
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 447c306c9ba..3464ecd847f 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -262,8 +262,6 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
     }};
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    status = local_state._shared_state->_spill_status.status();
-    RETURN_IF_ERROR(status);
 
     if (local_state._shared_state->is_spilled) {
         if (!local_state._merger) {
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index 9986031ccc3..e669c0e343c 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -24,6 +24,7 @@
 #include <functional>
 #include <utility>
 
+#include "runtime/fragment_mgr.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
@@ -48,14 +49,18 @@ struct SpillContext {
               
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
 
     ~SpillContext() {
-        LOG_IF(WARNING, running_tasks_count.load() != 0)
-                << "Query: " << print_id(query_id)
-                << " not all spill tasks finished, remaining tasks: " << 
running_tasks_count.load();
-
-        LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0)
-                << "Query: " << print_id(query_id)
-                << " not all spill tasks(non sink tasks) finished, remaining 
tasks: "
-                << _running_non_sink_tasks_count.load();
+        if (running_tasks_count.load() != 0) {
+            LOG_EVERY_T(WARNING, 60) << "Query: " << print_id(query_id)
+                                     << " not all spill tasks finished, 
remaining tasks: "
+                                     << running_tasks_count.load();
+        }
+
+        if (_running_non_sink_tasks_count.load() != 0) {
+            LOG_EVERY_T(WARNING, 60)
+                    << "Query: " << print_id(query_id)
+                    << " not all spill tasks(non sink tasks) finished, 
remaining tasks: "
+                    << _running_non_sink_tasks_count.load();
+        }
     }
 
     void on_task_finished() {
@@ -152,11 +157,17 @@ public:
         if (_state->is_cancelled()) {
             return;
         }
-        shared_state_holder->_spill_status.update(_spill_exec_func());
+        auto status = _spill_exec_func();
+        if (!status.ok()) {
+            
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), 
status);
+        }
 
         _on_task_finished();
         if (_spill_fin_cb) {
-            shared_state_holder->_spill_status.update(_spill_fin_cb());
+            auto status2 = _spill_fin_cb();
+            if (!status2.ok()) {
+                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), 
status2);
+            }
         }
 
         if (_spill_dependency) {
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 79379f5682e..86c33f5a334 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -749,7 +749,10 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
         }
     } else {
         SCOPED_ATTACH_TASK(query_ctx.get());
-        RETURN_IF_ERROR(query_ctx->revoke_memory());
+        auto status = query_ctx->revoke_memory();
+        if (!status.ok()) {
+            
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_ctx->query_id(), 
status);
+        }
     }
     return true;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to