This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 8d7f09a0c53 [fix](spill) runtime filter and add some counters (#46999)
8d7f09a0c53 is described below

commit 8d7f09a0c5348a663de8d2b0ffbd5f078ec8d916
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Thu Jan 16 21:17:55 2025 +0800

    [fix](spill) runtime filter and add some counters (#46999)
---
 .../exec/partitioned_hash_join_probe_operator.cpp  |  3 ++-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 29 +++++++++++++++-------
 .../exec/partitioned_hash_join_sink_operator.h     |  2 ++
 3 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 120c6bcbd06..565aeaa5fee 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -534,7 +534,9 @@ Status 
PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
     RETURN_IF_ERROR(_inner_probe_operator->set_child(child));
     DCHECK(_build_side_child != nullptr);
     _inner_probe_operator->set_build_side_child(_build_side_child);
+    RETURN_IF_ERROR(_inner_sink_operator->set_child(_build_side_child));
     RETURN_IF_ERROR(_inner_probe_operator->open(state));
+    RETURN_IF_ERROR(_inner_sink_operator->open(state));
     _child = std::move(child);
     RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
     RETURN_IF_ERROR(_partitioner->open(state));
@@ -948,7 +950,6 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
                     local_state._shared_state->inner_runtime_state.get(), 
block, eos));
             if (*eos) {
                 _update_profile_from_internal_states(local_state);
-                local_state._shared_state->inner_runtime_state.reset();
             }
         }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 4da7abec23e..3546818a1a9 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -24,6 +24,7 @@
 #include <mutex>
 
 #include "common/logging.h"
+#include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
@@ -52,7 +53,7 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
                                                   
"HashJoinBuildSpillDependency", true);
     state->get_task()->add_spill_dependency(_spill_dependency.get());
 
-    _internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
+    _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
 
     _partition_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionTime", 
1);
     _partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillPartitionShuffleTime", 1);
@@ -60,7 +61,6 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", 
TUnit::UNIT, 1);
     _memory_usage_reserved =
             ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
-
     return Status::OK();
 }
 
@@ -70,6 +70,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* 
state) {
     _shared_state->setup_shared_profile(_profile);
     RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    RETURN_IF_ERROR(p._setup_internal_operator(state));
     for (uint32_t i = 0; i != p._partition_count; ++i) {
         auto& spilling_stream = _shared_state->spilled_streams[i];
         
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -87,6 +88,11 @@ Status 
PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec
         return Status::OK();
     }
     dec_running_big_mem_op_num(state);
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) {
+        
RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->inner_runtime_state.get(),
+                                                      exec_status));
+    }
     return PipelineXSpillSinkLocalState::close(state, exec_status);
 }
 
@@ -156,6 +162,15 @@ size_t 
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
     return size_to_reserve;
 }
 
+Dependency* PartitionedHashJoinSinkLocalState::finishdependency() {
+    if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
+        auto* inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
+        return inner_sink_state->finishdependency();
+    }
+    DCHECK(false) << "Should not reach here!";
+    return nullptr;
+}
+
 Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
@@ -176,6 +191,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
     if (inner_sink_state) {
         build_block = inner_sink_state->_build_side_mutable_block.to_block();
         block_old_mem = build_block.allocated_bytes();
+        // If spilling was triggered, constructing runtime filters is 
meaningless,
+        // therefore, all runtime filters are temporarily disabled.
         RETURN_IF_ERROR(inner_sink_state->disable_runtime_filters(
                 _shared_state->inner_runtime_state.get()));
     }
@@ -503,6 +520,7 @@ Status 
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
     local_state._shared_state->inner_runtime_state = 
RuntimeState::create_unique(
             state->fragment_instance_id(), state->query_id(), 
state->fragment_id(),
             state->query_options(), TQueryGlobals {}, state->exec_env(), 
state->get_query_ctx());
+    
local_state._shared_state->inner_runtime_state->set_task(state->get_task());
     local_state._shared_state->inner_runtime_state->set_task_execution_context(
             state->get_task_execution_context().lock());
     
local_state._shared_state->inner_runtime_state->set_be_number(state->be_number());
@@ -582,10 +600,6 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
             if (need_to_spill) {
                 return revoke_memory(state, nullptr);
             } else {
-                if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
-                    RETURN_IF_ERROR(_setup_internal_operator(state));
-                }
-
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
                     return Status::Error<INTERNAL_ERROR>(
                             "fault_inject partitioned_hash_join_sink "
@@ -633,9 +647,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
             return revoke_memory(state, nullptr);
         }
     } else {
-        if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
-            RETURN_IF_ERROR(_setup_internal_operator(state));
-        }
         DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
             return Status::Error<INTERNAL_ERROR>(
                     "fault_inject partitioned_hash_join_sink "
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e1a76fa17de..73955932427 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -52,6 +52,8 @@ public:
     [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
     void update_memory_usage();
 
+    Dependency* finishdependency() override;
+
 protected:
     PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
             : 
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to