This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 8d7f09a0c53 [fix](spill) runtime filter and add some counters (#46999) 8d7f09a0c53 is described below commit 8d7f09a0c5348a663de8d2b0ffbd5f078ec8d916 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Thu Jan 16 21:17:55 2025 +0800 [fix](spill) runtime filter and add some counters (#46999) --- .../exec/partitioned_hash_join_probe_operator.cpp | 3 ++- .../exec/partitioned_hash_join_sink_operator.cpp | 29 +++++++++++++++------- .../exec/partitioned_hash_join_sink_operator.h | 2 ++ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 120c6bcbd06..565aeaa5fee 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -534,7 +534,9 @@ Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(_inner_probe_operator->set_child(child)); DCHECK(_build_side_child != nullptr); _inner_probe_operator->set_build_side_child(_build_side_child); + RETURN_IF_ERROR(_inner_sink_operator->set_child(_build_side_child)); RETURN_IF_ERROR(_inner_probe_operator->open(state)); + RETURN_IF_ERROR(_inner_sink_operator->open(state)); _child = std::move(child); RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); @@ -948,7 +950,6 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori local_state._shared_state->inner_runtime_state.get(), block, eos)); if (*eos) { _update_profile_from_internal_states(local_state); - local_state._shared_state->inner_runtime_state.reset(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 4da7abec23e..3546818a1a9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -24,6 +24,7 @@ #include <mutex> #include "common/logging.h" +#include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" @@ -52,7 +53,7 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, "HashJoinBuildSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); - _internal_runtime_profile.reset(new RuntimeProfile("internal_profile")); + _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); _partition_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionTime", 1); _partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionShuffleTime", 1); @@ -60,7 +61,6 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", TUnit::UNIT, 1); _memory_usage_reserved = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1); - return Status::OK(); } @@ -70,6 +70,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { _shared_state->setup_shared_profile(_profile); RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state)); auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + RETURN_IF_ERROR(p._setup_internal_operator(state)); for (uint32_t i = 0; i != p._partition_count; ++i) { auto& spilling_stream = _shared_state->spilled_streams[i]; RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( @@ -87,6 +88,11 @@ Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec return Status::OK(); } dec_running_big_mem_op_num(state); + auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) { + RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->inner_runtime_state.get(), + exec_status)); + } return PipelineXSpillSinkLocalState::close(state, exec_status); } @@ -156,6 +162,15 @@ size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta return size_to_reserve; } +Dependency* PartitionedHashJoinSinkLocalState::finishdependency() { + if (auto* tmp_sink_state = _shared_state->inner_runtime_state->get_sink_local_state()) { + auto* inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state); + return inner_sink_state->finishdependency(); + } + DCHECK(false) << "Should not reach here!"; + return nullptr; +} + Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); @@ -176,6 +191,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( if (inner_sink_state) { build_block = inner_sink_state->_build_side_mutable_block.to_block(); block_old_mem = build_block.allocated_bytes(); + // If spilling was triggered, constructing runtime filters is meaningless, + // therefore, all runtime filters are temporarily disabled. RETURN_IF_ERROR(inner_sink_state->disable_runtime_filters( _shared_state->inner_runtime_state.get())); } @@ -503,6 +520,7 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* local_state._shared_state->inner_runtime_state = RuntimeState::create_unique( state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); + local_state._shared_state->inner_runtime_state->set_task(state->get_task()); local_state._shared_state->inner_runtime_state->set_task_execution_context( state->get_task_execution_context().lock()); local_state._shared_state->inner_runtime_state->set_be_number(state->be_number()); @@ -582,10 +600,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (need_to_spill) { return revoke_memory(state, nullptr); } else { - if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { - RETURN_IF_ERROR(_setup_internal_operator(state)); - } - DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", { return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_sink " @@ -633,9 +647,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B return revoke_memory(state, nullptr); } } else { - if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { - RETURN_IF_ERROR(_setup_internal_operator(state)); - } DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", { return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_sink " diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index e1a76fa17de..73955932427 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -52,6 +52,8 @@ public: [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); void update_memory_usage(); + Dependency* finishdependency() override; + protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org