This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit f8af843e0b547e4228052c6bd1a777845c5d5f49 Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Tue Dec 10 12:33:30 2024 +0800 fix spill cancel and cleanup some code --- be/src/pipeline/dependency.h | 2 -- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 4 --- .../exec/partitioned_aggregation_sink_operator.cpp | 9 ------- .../partitioned_aggregation_source_operator.cpp | 2 -- .../exec/partitioned_hash_join_probe_operator.cpp | 3 --- .../exec/partitioned_hash_join_sink_operator.cpp | 12 +++------ be/src/pipeline/exec/spill_sort_sink_operator.cpp | 9 ------- .../pipeline/exec/spill_sort_source_operator.cpp | 2 -- be/src/pipeline/exec/spill_utils.h | 31 +++++++++++++++------- .../workload_group/workload_group_manager.cpp | 5 +++- 10 files changed, 29 insertions(+), 50 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index bb6cf4e6c91..7ce81a23dbd 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -426,8 +426,6 @@ private: struct BasicSpillSharedState { virtual ~BasicSpillSharedState() = default; - AtomicStatus _spill_status; - // These two counters are shared to spill source operators as the initial value // of 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount'. // Total bytes of spill data written to disk file(after serialized) diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 0b4e6b4078c..8fc3bc6015c 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -284,10 +284,6 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block const auto block_mem_size = block->allocated_bytes(); - if (!_shared_state->_spill_status.ok()) { - return _shared_state->_spill_status.status(); - } - { std::lock_guard l(_mutex); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 88d39eb93df..8a29a88c82c 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -173,9 +173,6 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - if (!local_state._shared_state->_spill_status.ok()) { - return local_state._shared_state->_spill_status.status(); - } local_state._eos = eos; auto* runtime_state = local_state._runtime_state.get(); DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::sink", { @@ -214,9 +211,6 @@ Status PartitionedAggSinkOperatorX::revoke_memory( size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); - if (!local_state._shared_state->_spill_status.ok()) { - return UINT64_MAX; - } auto* runtime_state = local_state._runtime_state.get(); auto size = _agg_sink_operator->get_revocable_mem_size(runtime_state); return size; @@ -265,9 +259,6 @@ Status PartitionedAggSinkLocalState::revoke_memory( << Base::_parent->node_id() << " revoke_memory, size: " << _parent->revocable_mem_size(state) << ", eos: " << _eos; - if (!_shared_state->_spill_status.ok()) { - return _shared_state->_spill_status.status(); - } if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; profile()->add_info_string("Spilled", "true"); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 046f8df44ae..6bd601383f7 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -151,8 +151,6 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - status = local_state._shared_state->_spill_status.status(); - RETURN_IF_ERROR(status); if (local_state._shared_state->is_spilled && local_state._need_to_merge_data_for_current_partition) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 1e3c1d18a71..1cdb5ce7be0 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -698,9 +698,6 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) const { auto& local_state = get_local_state(state); - if (!local_state._shared_state->_spill_status.ok()) { - return local_state._shared_state->_spill_status.status(); - } const auto partition_index = local_state._partition_cursor; auto& probe_blocks = local_state._probe_blocks[partition_index]; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d88ff691b6c..abf81a068e7 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -479,16 +479,15 @@ Status PartitionedHashJoinSinkLocalState::_spill_to_disk( uint32_t partition_index, const vectorized::SpillStreamSPtr& spilling_stream) { auto& partitioned_block = _shared_state->partitioned_build_blocks[partition_index]; - Status status = _shared_state->_spill_status.status(); - if (status.ok()) { + if (!_state->is_cancelled()) { auto block = partitioned_block->to_block(); int64_t block_mem_usage = block.allocated_bytes(); Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(), -block_mem_usage); }}; partitioned_block = vectorized::MutableBlock::create_unique(block.clone_empty()); - status = spilling_stream->spill_block(state(), block, false); + return spilling_stream->spill_block(state(), block, false); + } else { + return _state->cancel_reason(); } - - return status; } PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* pool, @@ -580,9 +579,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr); local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - if (!local_state._shared_state->_spill_status.ok()) { - return local_state._shared_state->_spill_status.status(); - } local_state._child_eos = eos; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 0dfc157ad33..25205ab09fe 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -144,9 +144,6 @@ Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state, size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); - if (!local_state._shared_state->_spill_status.ok()) { - return UINT64_MAX; - } return _sort_sink_operator->get_revocable_mem_size(local_state._runtime_state.get()); } @@ -155,9 +152,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc auto& local_state = get_local_state(state); local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - if (!local_state._shared_state->_spill_status.ok()) { - return local_state._shared_state->_spill_status.status(); - } COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { local_state._shared_state->update_spill_block_batch_row_count(in_block); @@ -203,9 +197,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << Base::_parent->node_id() << " revoke_memory" << ", eos: " << _eos; - if (!_shared_state->_spill_status.ok()) { - return _shared_state->_spill_status.status(); - } auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, _spilling_stream, print_id(state->query_id()), "sort", _parent->node_id(), diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 447c306c9ba..3464ecd847f 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -262,8 +262,6 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc }}; local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - status = local_state._shared_state->_spill_status.status(); - RETURN_IF_ERROR(status); if (local_state._shared_state->is_spilled) { if (!local_state._merger) { diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index 9986031ccc3..e669c0e343c 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -24,6 +24,7 @@ #include <functional> #include <utility> +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" @@ -48,14 +49,18 @@ struct SpillContext { all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {} ~SpillContext() { - LOG_IF(WARNING, running_tasks_count.load() != 0) - << "Query: " << print_id(query_id) - << " not all spill tasks finished, remaining tasks: " << running_tasks_count.load(); - - LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0) - << "Query: " << print_id(query_id) - << " not all spill tasks(non sink tasks) finished, remaining tasks: " - << _running_non_sink_tasks_count.load(); + if (running_tasks_count.load() != 0) { + LOG_EVERY_T(WARNING, 60) << "Query: " << print_id(query_id) + << " not all spill tasks finished, remaining tasks: " + << running_tasks_count.load(); + } + + if (_running_non_sink_tasks_count.load() != 0) { + LOG_EVERY_T(WARNING, 60) + << "Query: " << print_id(query_id) + << " not all spill tasks(non sink tasks) finished, remaining tasks: " + << _running_non_sink_tasks_count.load(); + } } void on_task_finished() { @@ -152,11 +157,17 @@ public: if (_state->is_cancelled()) { return; } - shared_state_holder->_spill_status.update(_spill_exec_func()); + auto status = _spill_exec_func(); + if (!status.ok()) { + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status); + } _on_task_finished(); if (_spill_fin_cb) { - shared_state_holder->_spill_status.update(_spill_fin_cb()); + auto status2 = _spill_fin_cb(); + if (!status2.ok()) { + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status2); + } } if (_spill_dependency) { diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 79379f5682e..86c33f5a334 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -749,7 +749,10 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& } } else { SCOPED_ATTACH_TASK(query_ctx.get()); - RETURN_IF_ERROR(query_ctx->revoke_memory()); + auto status = query_ctx->revoke_memory(); + if (!status.ok()) { + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_ctx->query_id(), status); + } } return true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org