This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 226dbd06e4b [Feature](runtime-filter) support rf on set operator: be 
part (#49679)
226dbd06e4b is described below

commit 226dbd06e4b5153220d8864a407de263ddc426a0
Author: Pxl <x...@selectdb.com>
AuthorDate: Thu Apr 3 15:22:07 2025 +0800

    [Feature](runtime-filter) support rf on set operator: be part (#49679)
    
    ### What problem does this PR solve?
    support rf on set operator: be part
    
    ```sql
    create table tx (
            k1 int null,
            k2 int not null,
            k3 bigint null,
      k4 varchar(100) null
    )
    duplicate key (k1)
    properties("replication_num" = "1");
    
    insert into tx select e1,e1,e1,e1 from (select 1 k1) as t lateral view 
explode_numbers(x) tmp1 as e1;
    
    
    select k1 from t1000 intersect select k1 from t10000000;
    10.03 sec -> 0.13 sec
    
    select k1 from t1000 intersect select k1 from t1000000;
    1.17 sec  -> 0.13 sec
    
    select k1 from t10000 intersect select k1 from t100000;
    0.23 sec -> 0.18 sec
    
    select k1 from t10000 intersect select * from( select k1 from t100000 where 
k1 > 10000)t;
    0.15 sec -> 0.16 sec
    
    select k1 from t10000 intersect select * from( select k1 from t100000 where 
k1 < 10000)t;
    0.13sec -> 0.18sec
    ```
    
    part of https://github.com/apache/doris/pull/49573
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [x] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/set_sink_operator.cpp         | 76 ++++++++++++++++------
 be/src/pipeline/exec/set_sink_operator.h           | 21 ++++--
 be/src/runtime/fragment_mgr.cpp                    |  4 +-
 .../runtime_filter_producer_helper_set.h           | 59 +++++++++++++++++
 4 files changed, 134 insertions(+), 26 deletions(-)

diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 4faeb975ef9..8d28fe1e369 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -26,6 +26,49 @@
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 
+uint64_t get_hash_table_size(const auto& hash_table_variant) {
+    uint64_t hash_table_size = 0;
+    std::visit(
+            [&](auto&& arg) {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    hash_table_size = arg.hash_table->size();
+                }
+            },
+            hash_table_variant);
+    return hash_table_size;
+}
+
+template <bool is_intersect>
+Status SetSinkLocalState<is_intersect>::terminate(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    if (_terminated) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_runtime_filter_producer_helper->terminate(state));
+    return Base::terminate(state);
+}
+
+template <bool is_intersect>
+Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status 
exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    if (!_terminated && _runtime_filter_producer_helper && 
!state->is_cancelled()) {
+        try {
+            RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
+                    state, &_shared_state->build_block,
+                    
get_hash_table_size(_shared_state->hash_table_variants->method_variant)));
+        } catch (Exception& e) {
+            return Status::InternalError(
+                    "rf process meet error: {}, _terminated: {}, 
_finish_dependency: {}",
+                    e.to_string(), _terminated, 
_finish_dependency->debug_string());
+        }
+    }
+    return Base::close(state, exec_status);
+}
+
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                             bool eos) {
@@ -57,23 +100,15 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
         local_state._mutable_block.clear();
 
         if (eos) {
-            if constexpr (is_intersect) {
-                valid_element_in_hash_tbl = 0;
-            } else {
-                std::visit(
-                        [&](auto&& arg) {
-                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                            if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                                valid_element_in_hash_tbl = 
arg.hash_table->size();
-                            }
-                        },
-                        
local_state._shared_state->hash_table_variants->method_variant);
-            }
+            uint64_t hash_table_size = get_hash_table_size(
+                    
local_state._shared_state->hash_table_variants->method_variant);
+            valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
+
             
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
                     ->set_ready();
-            if (_child_quantity == 1) {
-                local_state._dependency->set_ready_to_read();
-            }
+            
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
+                    state, hash_table_size, local_state._finish_dependency));
+            local_state._eos = true;
         }
     }
     return Status::OK();
@@ -113,16 +148,16 @@ template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::_extract_build_column(
         SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
         vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) {
-    std::vector<int> result_locs(_child_exprs.size(), -1);
+    std::vector<int> result_locs(local_state._child_exprs.size(), -1);
     bool is_all_const = true;
 
-    for (size_t i = 0; i < _child_exprs.size(); ++i) {
-        RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i]));
+    for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
+        RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block, 
&result_locs[i]));
         is_all_const &= 
is_column_const(*block.get_by_position(result_locs[i]).column);
     }
     rows = is_all_const ? 1 : rows;
 
-    for (size_t i = 0; i < _child_exprs.size(); ++i) {
+    for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
         size_t result_col_id = result_locs[i];
 
         if (is_all_const) {
@@ -175,6 +210,9 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
 
     RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs));
 
+    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelperSet>(profile());
+    RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs,
+                                                          
parent._runtime_filter_descs));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index b2795c23a5b..aadfc7ee6d2 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -17,10 +17,8 @@
 
 #pragma once
 
-#include <stdint.h>
-
-#include "olap/olap_common.h"
 #include "operator.h"
+#include "runtime_filter/runtime_filter_producer_helper_set.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -46,6 +44,8 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
+    Status terminate(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
 
 private:
     friend class SetSinkOperatorX<is_intersect>;
@@ -57,6 +57,9 @@ private:
 
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
     RuntimeProfile::Counter* _build_timer = nullptr;
+
+    std::shared_ptr<RuntimeFilterProducerHelperSet> 
_runtime_filter_producer_helper;
+    std::shared_ptr<CountedFinishDependency> _finish_dependency;
 };
 
 template <bool is_intersect>
@@ -71,14 +74,17 @@ public:
     SetSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool,
                      const TPlanNode& tnode, const DescriptorTbl& descs)
             : Base(sink_id, tnode.node_id, dest_id),
-              _cur_child_id(child_id),
               _child_quantity(tnode.node_type == 
TPlanNodeType::type::INTERSECT_NODE
                                       ? 
tnode.intersect_node.result_expr_lists.size()
                                       : 
tnode.except_node.result_expr_lists.size()),
               _is_colocate(is_intersect ? tnode.intersect_node.is_colocate
                                         : tnode.except_node.is_colocate),
               _partition_exprs(is_intersect ? 
tnode.intersect_node.result_expr_lists[child_id]
-                                            : 
tnode.except_node.result_expr_lists[child_id]) {}
+                                            : 
tnode.except_node.result_expr_lists[child_id]),
+              _runtime_filter_descs(tnode.runtime_filters) {
+        DCHECK_EQ(child_id, _cur_child_id);
+        DCHECK_GT(_child_quantity, 1);
+    }
 
 #ifdef BE_TEST
     SetSinkOperatorX(int _child_quantity)
@@ -87,6 +93,7 @@ public:
               _is_colocate(false),
               _partition_exprs() {}
 #endif
+
     ~SetSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TDataSink",
@@ -115,13 +122,15 @@ private:
                                  vectorized::Block& block, 
vectorized::ColumnRawPtrs& raw_ptrs,
                                  size_t& rows);
 
-    const int _cur_child_id;
+    const int _cur_child_id = 0;
     const size_t _child_quantity;
     // every child has its result expr list
     vectorized::VExprContextSPtrs _child_exprs;
     const bool _is_colocate;
     const std::vector<TExpr> _partition_exprs;
     using OperatorBase::_child;
+
+    const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3d81bee17d5..f7980e41c91 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1343,7 +1343,9 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
     query_id.__set_lo(queryid.lo);
     if (auto q_ctx = get_query_ctx(query_id)) {
         SCOPED_ATTACH_TASK(q_ctx.get());
-        std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+        if (!q_ctx->get_merge_controller_handler()) {
+            return Status::InternalError("Merge filter failed: Merge 
controller handler is null");
+        }
         return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, 
attach_data);
     } else {
         return Status::EndOfFile(
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h 
b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
new file mode 100644
index 00000000000..2e4e5bfe86a
--- /dev/null
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/runtime_state.h"
+#include "runtime_filter/runtime_filter.h"
+#include "runtime_filter/runtime_filter_mgr.h"
+#include "runtime_filter/runtime_filter_producer_helper.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+// this class used in set sink node
+class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper {
+public:
+    ~RuntimeFilterProducerHelperSet() override = default;
+
+    RuntimeFilterProducerHelperSet(RuntimeProfile* profile)
+            : RuntimeFilterProducerHelper(profile, true, false) {}
+
+    Status process(RuntimeState* state, const vectorized::Block* block, 
uint64_t cardinality) {
+        if (_skip_runtime_filters_process) {
+            return Status::OK();
+        }
+
+        RETURN_IF_ERROR(_init_filters(state, cardinality));
+        if (cardinality != 0) {
+            RETURN_IF_ERROR(_insert(block, 0));
+        }
+
+        for (const auto& filter : _producers) {
+            
filter->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
+        }
+
+        RETURN_IF_ERROR(_publish(state));
+        return Status::OK();
+    }
+};
+#include "common/compile_check_end.h"
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to