This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch set-runtime-filter
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/set-runtime-filter by this 
push:
     new 3b7d5d3c716 support rf on set operator: be part
3b7d5d3c716 is described below

commit 3b7d5d3c71619fd11489ddd22a04debf1704810d
Author: BiteTheDDDDt <x...@selectdb.com>
AuthorDate: Tue Mar 25 14:11:52 2025 +0800

    support rf on set operator: be part
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 +-
 .../exec/nested_loop_join_build_operator.cpp       |  2 +-
 .../exec/nested_loop_join_build_operator.h         |  2 +-
 be/src/pipeline/exec/set_sink_operator.cpp         | 54 +++++++++++++-----
 be/src/pipeline/exec/set_sink_operator.h           | 19 +++++--
 .../runtime_filter_producer_helper_set.h           | 65 ++++++++++++++++++++++
 7 files changed, 121 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 15153d1df40..3f2a262a84c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -92,7 +92,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
 
     // Hash Table Init
     RETURN_IF_ERROR(_hash_table_init(state));
-    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelper>(
+    _runtime_filter_producer_helper = 
std::make_unique<RuntimeFilterProducerHelper>(
             profile(), _should_build_hash_table, p._is_broadcast_join);
     RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, 
_build_expr_ctxs,
                                                           
p._runtime_filter_descs));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 7ac62160bbd..44d19014d94 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -84,7 +84,7 @@ protected:
     size_t _build_side_rows = 0;
 
     vectorized::MutableBlock _build_side_mutable_block;
-    std::shared_ptr<RuntimeFilterProducerHelper> 
_runtime_filter_producer_helper;
+    std::unique_ptr<RuntimeFilterProducerHelper> 
_runtime_filter_producer_helper;
 
     /*
      * The comparison result of a null value with any other value is null,
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 7b8647f2232..c6325f10102 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -41,7 +41,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
         RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
     }
 
-    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelperCross>(profile());
+    _runtime_filter_producer_helper = 
std::make_unique<RuntimeFilterProducerHelperCross>(profile());
     RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, 
_filter_src_expr_ctxs,
                                                           
p._runtime_filter_descs));
     return Status::OK();
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 42274276fbb..2600d5085f6 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -47,7 +47,7 @@ private:
     friend class NestedLoopJoinBuildSinkOperatorX;
 
     vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
-    std::shared_ptr<RuntimeFilterProducerHelperCross> 
_runtime_filter_producer_helper;
+    std::unique_ptr<RuntimeFilterProducerHelperCross> 
_runtime_filter_producer_helper;
 };
 
 class NestedLoopJoinBuildSinkOperatorX final
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 4faeb975ef9..f5236884c70 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -20,12 +20,35 @@
 #include <memory>
 
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
 #include "vec/common/hash_table/hash_table_set_build.h"
 #include "vec/core/materialize_block.h"
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 
+template <bool is_intersect>
+Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status 
exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    if (!_runtime_filter_producer_helper || state->is_cancelled() || !_eos) {
+        return Base::close(state, exec_status);
+    }
+
+    try {
+        RETURN_IF_ERROR(
+                _runtime_filter_producer_helper->process(state, 
&_shared_state->build_block));
+    } catch (Exception& e) {
+        return Status::InternalError(
+                "rf process meet error: {}, wake_up_early: {}, 
_finish_dependency: {}",
+                e.to_string(), state->get_task()->wake_up_early(),
+                _finish_dependency->debug_string());
+    }
+    return Base::close(state, exec_status);
+}
+
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                             bool eos) {
@@ -57,23 +80,21 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
         local_state._mutable_block.clear();
 
         if (eos) {
-            if constexpr (is_intersect) {
-                valid_element_in_hash_tbl = 0;
-            } else {
-                std::visit(
-                        [&](auto&& arg) {
-                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                            if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                                valid_element_in_hash_tbl = 
arg.hash_table->size();
-                            }
-                        },
-                        
local_state._shared_state->hash_table_variants->method_variant);
-            }
+            uint64_t hash_table_size = 0;
+            std::visit(
+                    [&](auto&& arg) {
+                        using HashTableCtxType = std::decay_t<decltype(arg)>;
+                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            hash_table_size = arg.hash_table->size();
+                        }
+                    },
+                    
local_state._shared_state->hash_table_variants->method_variant);
+            valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
+
             
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
                     ->set_ready();
-            if (_child_quantity == 1) {
-                local_state._dependency->set_ready_to_read();
-            }
+            
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
+                    state, hash_table_size, local_state._finish_dependency));
         }
     }
     return Status::OK();
@@ -175,6 +196,9 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
 
     RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs));
 
+    _runtime_filter_producer_helper = 
std::make_unique<RuntimeFilterProducerHelperSet>(profile());
+    RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs,
+                                                          
parent._runtime_filter_descs));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 08f789f702a..c87007fabf8 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -17,10 +17,8 @@
 
 #pragma once
 
-#include <stdint.h>
-
-#include "olap/olap_common.h"
 #include "operator.h"
+#include "runtime_filter/runtime_filter_producer_helper_set.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -46,6 +44,7 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
 
 private:
     friend class SetSinkOperatorX<is_intersect>;
@@ -57,6 +56,9 @@ private:
 
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
     RuntimeProfile::Counter* _build_timer = nullptr;
+
+    std::unique_ptr<RuntimeFilterProducerHelperSet> 
_runtime_filter_producer_helper;
+    std::shared_ptr<CountedFinishDependency> _finish_dependency;
 };
 
 template <bool is_intersect>
@@ -71,14 +73,17 @@ public:
     SetSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool,
                      const TPlanNode& tnode, const DescriptorTbl& descs)
             : Base(sink_id, tnode.node_id, dest_id),
-              _cur_child_id(child_id),
               _child_quantity(tnode.node_type == 
TPlanNodeType::type::INTERSECT_NODE
                                       ? 
tnode.intersect_node.result_expr_lists.size()
                                       : 
tnode.except_node.result_expr_lists.size()),
               _is_colocate(is_intersect ? tnode.intersect_node.is_colocate
                                         : tnode.except_node.is_colocate),
               _partition_exprs(is_intersect ? 
tnode.intersect_node.result_expr_lists[child_id]
-                                            : 
tnode.except_node.result_expr_lists[child_id]) {}
+                                            : 
tnode.except_node.result_expr_lists[child_id]),
+              _runtime_filter_descs(tnode.runtime_filters) {
+        DCHECK_EQ(child_id, _cur_child_id);
+        DCHECK_GT(_child_quantity, 1);
+    }
     ~SetSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TDataSink",
@@ -107,13 +112,15 @@ private:
                                  vectorized::Block& block, 
vectorized::ColumnRawPtrs& raw_ptrs,
                                  size_t& rows);
 
-    const int _cur_child_id;
+    const int _cur_child_id = 0;
     const size_t _child_quantity;
     // every child has its result expr list
     vectorized::VExprContextSPtrs _child_exprs;
     const bool _is_colocate;
     const std::vector<TExpr> _partition_exprs;
     using OperatorBase::_child;
+
+    const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h 
b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
new file mode 100644
index 00000000000..39739aabccf
--- /dev/null
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/runtime_state.h"
+#include "runtime_filter/runtime_filter.h"
+#include "runtime_filter/runtime_filter_mgr.h"
+#include "runtime_filter/runtime_filter_producer_helper.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+// this class used in set sink node
+class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper {
+public:
+    ~RuntimeFilterProducerHelperSet() override = default;
+
+    RuntimeFilterProducerHelperSet(RuntimeProfile* profile)
+            : RuntimeFilterProducerHelper(profile, true, false) {}
+
+    Status process(RuntimeState* state, const vectorized::Block* block) {
+        if (_skip_runtime_filters_process) {
+            return Status::OK();
+        }
+
+        bool wake_up_early = state->get_task()->wake_up_early();
+        // Runtime filter is ignored partially which has no effect on 
correctness.
+        auto wrapper_state = wake_up_early ? 
RuntimeFilterWrapper::State::IGNORED
+                                           : 
RuntimeFilterWrapper::State::READY;
+        if (!wake_up_early) {
+            // Hash table is completed and runtime filter has a global size 
now.
+            uint64_t hash_table_size = block ? block->rows() : 0;
+            RETURN_IF_ERROR(_init_filters(state, hash_table_size));
+            RETURN_IF_ERROR(_insert(block, 0));
+        }
+
+        for (const auto& filter : _producers) {
+            filter->set_wrapper_state_and_ready_to_publish(wrapper_state);
+        }
+
+        RETURN_IF_ERROR(_publish(state));
+        return Status::OK();
+    }
+};
+#include "common/compile_check_end.h"
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to