This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f7d23a0ca1c066530103db9f0442908b17681a3c
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Fri Feb 21 09:23:57 2025 +0800

    [refactor](runtime filter) Simplify producer / slots (#48145)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  14 +--
 .../exec/nested_loop_join_build_operator.cpp       |   2 +-
 be/src/runtime_filter/role/producer.cpp            |  39 +++-----
 be/src/runtime_filter/role/producer.h              |  83 +++++++---------
 be/src/runtime_filter/role/runtime_filter.h        |   1 +
 be/src/runtime_filter/runtime_filter_slots.cpp     | 108 +++++++++++----------
 be/src/runtime_filter/runtime_filter_slots.h       |  53 +++-------
 be/src/runtime_filter/runtime_filter_slots_cross.h |   4 +-
 be/src/runtime_filter/runtime_filter_wrapper.h     |  12 +--
 9 files changed, 136 insertions(+), 180 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 85f65e153a8..ba935feb26a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -88,8 +88,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
 
     // Hash Table Init
     RETURN_IF_ERROR(_hash_table_init(state));
-    _runtime_filter_slots = 
std::make_shared<RuntimeFilterSlots>(_build_expr_ctxs, profile(),
-                                                                 
_should_build_hash_table);
+    _runtime_filter_slots = std::make_shared<RuntimeFilterSlots>(
+            _build_expr_ctxs, profile(), _should_build_hash_table, 
p._is_broadcast_join);
     RETURN_IF_ERROR(_runtime_filter_slots->init(state, 
p._runtime_filter_descs));
     return Status::OK();
 }
@@ -102,8 +102,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 #ifndef NDEBUG
     if (state->fuzzy_disable_runtime_filter_in_be()) {
         if ((_parent->operator_id() + random()) % 2 == 0) {
-            RETURN_IF_ERROR(
-                    _runtime_filter_slots->skip_runtime_filters_process(state, 
_finish_dependency));
+            _runtime_filter_slots->skip_runtime_filters();
         }
     }
 #endif
@@ -144,7 +143,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 
     try {
         RETURN_IF_ERROR(_runtime_filter_slots->process(state, 
_shared_state->build_block.get(),
-                                                       _finish_dependency));
+                                                       _finish_dependency,
+                                                       
p._shared_hash_table_context));
     } catch (Exception& e) {
         bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
                                                    
p._shared_hashtable_controller &&
@@ -508,7 +508,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             _shared_hash_table_context->block = 
local_state._shared_state->build_block;
             _shared_hash_table_context->build_indexes_null =
                     local_state._shared_state->build_indexes_null;
-            
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
         }
     } else if (!local_state._should_build_hash_table) {
         DCHECK(_shared_hashtable_controller != nullptr);
@@ -524,9 +523,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             return _shared_hash_table_context->status;
         }
 
-        
RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
-                _shared_hash_table_context));
-
         local_state.profile()->add_info_string(
                 "SharedHashTableFrom",
                 print_id(
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 157947028e7..0f9fa44fab8 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -42,7 +42,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     }
 
     _runtime_filter_slots =
-            std::make_shared<RuntimeFilterSlotsCross>(_filter_src_expr_ctxs, 
profile(), true);
+            std::make_shared<RuntimeFilterSlotsCross>(_filter_src_expr_ctxs, 
profile());
     RETURN_IF_ERROR(_runtime_filter_slots->init(state, 
p._runtime_filter_descs));
     return Status::OK();
 }
diff --git a/be/src/runtime_filter/role/producer.cpp 
b/be/src/runtime_filter/role/producer.cpp
index b35320a50d6..d1c2286b41c 100644
--- a/be/src/runtime_filter/role/producer.cpp
+++ b/be/src/runtime_filter/role/producer.cpp
@@ -44,9 +44,10 @@ Status 
RuntimeFilterProducer::_send_to_local_targets(RuntimeFilter* merger_filte
     return Status::OK();
 };
 
-Status RuntimeFilterProducer::publish(RuntimeState* state, bool publish_local) 
{
+Status RuntimeFilterProducer::publish(RuntimeState* state, bool 
build_hash_table) {
     _check_state({State::READY_TO_PUBLISH});
 
+    // TODO: do we still need this if wrapper is disabled / ignored?
     auto do_merge = [&]() {
         // two case we need do local merge:
         // 1. has remote target
@@ -77,19 +78,17 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, 
bool publish_local) {
         // So for all runtime filters' producers, `publish` should notify all 
consumers in global RF mgr which manages local-merge RF and local RF mgr which 
manages others.
         RETURN_IF_ERROR(do_merge());
         RETURN_IF_ERROR(_send_to_local_targets(this, false));
-    } else if (!publish_local) {
+    } else if (build_hash_table) {
         if (_is_broadcast_join) {
             RETURN_IF_ERROR(_send_to_remote_targets(state, this));
         } else {
             RETURN_IF_ERROR(do_merge());
         }
     } else {
-        // remote broadcast join only push onetime in build shared hash table
-        // publish_local only set true on copy shared hash table
         DCHECK(_is_broadcast_join);
     }
 
-    _set_state(State::PUBLISHED);
+    set_state(State::PUBLISHED);
     return Status::OK();
 }
 
@@ -134,7 +133,7 @@ public:
             : Base(req, callback, context), 
_dependency(std::move(dependency)), _wrapper(wrapper) {}
 };
 
-Status RuntimeFilterProducer::send_filter_size(
+Status RuntimeFilterProducer::send_size(
         RuntimeState* state, uint64_t local_filter_size,
         const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) {
     if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
@@ -143,7 +142,7 @@ Status RuntimeFilterProducer::send_filter_size(
     }
     _dependency = dependency;
     _dependency->add();
-    _set_state(State::WAITING_FOR_SYNCED_SIZE);
+    set_state(State::WAITING_FOR_SYNCED_SIZE);
 
     // two case we need do local merge:
     // 1. has remote target
@@ -221,10 +220,10 @@ void RuntimeFilterProducer::set_synced_size(uint64_t 
global_size) {
     if (_dependency) {
         _dependency->sub();
     }
-    _set_state(State::WAITING_FOR_DATA);
+    set_state(State::WAITING_FOR_DATA);
 }
 
-Status RuntimeFilterProducer::init_with_size(size_t local_size) {
+Status RuntimeFilterProducer::init(size_t local_size) {
     size_t real_size = _synced_size != -1 ? _synced_size : local_size;
     if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
         real_size > _wrapper->max_in_num()) {
@@ -236,23 +235,15 @@ Status RuntimeFilterProducer::init_with_size(size_t 
local_size) {
     }
     if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER &&
         real_size > _wrapper->max_in_num()) {
-        disable_and_ready_to_publish("reach max in num");
+        
set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED,
+                                               "reach max in num");
     }
-    return Status::OK();
-}
-
-void 
RuntimeFilterProducer::disable_meaningless_filters(std::unordered_set<int>& 
has_in_filter,
-                                                        bool 
collect_in_filters) {
-    if (_rf_state == State::READY_TO_PUBLISH ||
-        collect_in_filters != (_wrapper->get_real_type() == 
RuntimeFilterType::IN_FILTER)) {
-        return;
-    }
-
-    if (has_in_filter.contains(_expr_order)) {
-        disable_and_ready_to_publish("exist in_filter");
-    } else if (collect_in_filters) {
-        has_in_filter.insert(_expr_order);
+    if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER && 
!_callback.empty()) {
+        for (auto& call : _callback) {
+            call();
+        }
     }
+    return Status::OK();
 }
 
 } // namespace doris
diff --git a/be/src/runtime_filter/role/producer.h 
b/be/src/runtime_filter/role/producer.h
index a782f19277f..ae1b3708457 100644
--- a/be/src/runtime_filter/role/producer.h
+++ b/be/src/runtime_filter/role/producer.h
@@ -24,8 +24,19 @@
 
 namespace doris {
 
+/**
+ * init -> send_size -> insert -> publish
+ */
 class RuntimeFilterProducer : public RuntimeFilter {
 public:
+    using Callback = std::function<void()>;
+    enum class State {
+        WAITING_FOR_SEND_SIZE = 0,
+        WAITING_FOR_SYNCED_SIZE = 1,
+        WAITING_FOR_DATA = 2,
+        READY_TO_PUBLISH = 3,
+        PUBLISHED = 4
+    };
     static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                          std::shared_ptr<RuntimeFilterProducer>* res,
                          RuntimeProfile* parent_profile) {
@@ -40,55 +51,35 @@ public:
         return Status::OK();
     }
 
+    Status init(size_t local_size);
+    Status send_size(RuntimeState* state, uint64_t local_filter_size,
+                     const std::shared_ptr<pipeline::CountedFinishDependency>& 
dependency);
     // insert data to build filter
-    void insert_batch(vectorized::ColumnPtr column, size_t start) {
+    void insert(vectorized::ColumnPtr column, size_t start) {
         if (_rf_state == State::READY_TO_PUBLISH || _rf_state == 
State::PUBLISHED) {
+            DCHECK(!_wrapper->is_valid());
             return;
         }
         _check_state({State::WAITING_FOR_DATA});
         _wrapper->insert_batch(column, start);
     }
-
-    int expr_order() const { return _expr_order; }
-
-    Status init_with_size(size_t local_size);
-
-    Status send_filter_size(RuntimeState* state, uint64_t local_filter_size,
-                            const 
std::shared_ptr<pipeline::CountedFinishDependency>& dependency);
-
-    Status publish(RuntimeState* state, bool publish_local);
-
-    void set_synced_size(uint64_t global_size);
-
+    Status publish(RuntimeState* state, bool build_hash_table);
     std::string debug_string() const override {
         return fmt::format("Producer: ({}, state: {}, dependency: {}, 
synced_size: {})",
                            _debug_string(), to_string(_rf_state),
                            _dependency ? _dependency->debug_string() : "none", 
_synced_size);
     }
 
-    enum class State {
-        WAITING_FOR_SEND_SIZE = 0,
-        WAITING_FOR_SYNCED_SIZE = 1,
-        WAITING_FOR_DATA = 2,
-        READY_TO_PUBLISH = 3,
-        PUBLISHED = 4
-    };
-
-    void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State 
state) {
-        if (_set_state(State::READY_TO_PUBLISH)) {
-            _wrapper->set_state(state);
-        }
-    }
-
-    void disable_and_ready_to_publish(std::string reason) {
-        if (_set_state(State::READY_TO_PUBLISH)) {
-            _wrapper->disable(reason);
+    void with_callback(Callback& callback) { _callback.emplace_back(callback); 
}
+    int expr_order() const { return _expr_order; }
+    void set_synced_size(uint64_t global_size);
+    void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State 
state,
+                                                std::string reason = "") {
+        if (set_state(State::READY_TO_PUBLISH)) {
+            _wrapper->set_state(state, reason);
         }
     }
 
-    void disable_meaningless_filters(std::unordered_set<int>& has_in_filter,
-                                     bool collect_in_filters);
-
     static std::string to_string(const State& state) {
         switch (state) {
         case State::WAITING_FOR_SEND_SIZE:
@@ -109,11 +100,20 @@ public:
     void copy_to_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
         context->runtime_filters[_wrapper->filter_id()] = _wrapper;
     }
-
     void copy_from_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
         _wrapper = context->runtime_filters[_wrapper->filter_id()];
     }
 
+    bool set_state(State state) {
+        if (_rf_state == State::PUBLISHED ||
+            (state != State::PUBLISHED && _rf_state == 
State::READY_TO_PUBLISH)) {
+            return false;
+        }
+        _rf_state = state;
+        _profile->add_info_string("Info", debug_string());
+        return true;
+    }
+
 private:
     RuntimeFilterProducer(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                           RuntimeProfile* parent_profile)
@@ -137,24 +137,15 @@ private:
         }
     }
 
-    bool _set_state(State state) {
-        if (_rf_state == State::PUBLISHED ||
-            (state != State::PUBLISHED && _rf_state == 
State::READY_TO_PUBLISH)) {
-            return false;
-        }
-        _rf_state = state;
-        _profile->add_info_string("Info", debug_string());
-        return true;
-    }
-
-    bool _is_broadcast_join;
-    int _expr_order;
+    const bool _is_broadcast_join;
+    const int _expr_order;
 
     int64_t _synced_size = -1;
     std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
 
     std::atomic<State> _rf_state;
     std::unique_ptr<RuntimeProfile> _profile;
+    std::vector<Callback> _callback;
 };
 
 } // namespace doris
diff --git a/be/src/runtime_filter/role/runtime_filter.h 
b/be/src/runtime_filter/role/runtime_filter.h
index 0c9c80c0513..41623fbc74d 100644
--- a/be/src/runtime_filter/role/runtime_filter.h
+++ b/be/src/runtime_filter/role/runtime_filter.h
@@ -38,6 +38,7 @@ class RuntimeFilter {
 public:
     virtual ~RuntimeFilter() = default;
 
+    RuntimeFilterWrapper* impl() const { return _wrapper.get(); }
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
     bool has_local_target() const { return _has_local_target; }
diff --git a/be/src/runtime_filter/runtime_filter_slots.cpp 
b/be/src/runtime_filter/runtime_filter_slots.cpp
index add42cd1f88..5031c59a04b 100644
--- a/be/src/runtime_filter/runtime_filter_slots.cpp
+++ b/be/src/runtime_filter/runtime_filter_slots.cpp
@@ -24,37 +24,41 @@
 
 namespace doris {
 
-Status RuntimeFilterSlots::send_filter_size(
-        RuntimeState* state, uint64_t hash_table_size,
-        std::shared_ptr<pipeline::CountedFinishDependency> dependency) {
-    if (_skip_runtime_filters_process) {
-        return Status::OK();
+Status RuntimeFilterSlots::init(RuntimeState* state,
+                                const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs) {
+    _runtime_filters.resize(runtime_filter_descs.size());
+    std::unordered_map<int, RuntimeFilterProducer*> id_to_in_filter;
+    for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
+        RETURN_IF_ERROR(state->register_producer_runtime_filter(
+                runtime_filter_descs[i], &_runtime_filters[i], 
_profile.get()));
+        if (runtime_filter_descs[i].type == TRuntimeFilterType::IN) {
+            id_to_in_filter.insert({runtime_filter_descs[i].expr_order, 
_runtime_filters[i].get()});
+        } else if (runtime_filter_descs[i].type == 
TRuntimeFilterType::IN_OR_BLOOM &&
+                   
!id_to_in_filter.contains(runtime_filter_descs[i].expr_order)) {
+            id_to_in_filter.insert({runtime_filter_descs[i].expr_order, 
_runtime_filters[i].get()});
+        }
     }
-
-    dependency->add(); // add count at start to avoid dependency ready 
multiple times
-    Defer defer {[&]() { dependency->sub(); }}; // remove the initial external 
add
-    for (auto runtime_filter : _runtime_filters) {
-        RETURN_IF_ERROR(runtime_filter->send_filter_size(state, 
hash_table_size, dependency));
+    for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
+        if (id_to_in_filter.contains(_runtime_filters[i]->expr_order()) &&
+            _runtime_filters[i].get() != 
id_to_in_filter[_runtime_filters[i]->expr_order()]) {
+            RuntimeFilterProducer::Callback callback =
+                    [&, filter = _runtime_filters[i].get()]() -> void {
+                filter->set_wrapper_state_and_ready_to_publish(
+                        RuntimeFilterWrapper::State::DISABLED, "exist 
in_filter");
+            };
+            
id_to_in_filter[_runtime_filters[i]->expr_order()]->with_callback(callback);
+        }
     }
     return Status::OK();
 }
 
-/**
-    Disable meaningless filters, such as filters:
-        RF1: col1 in (1, 3, 5)
-        RF2: col1 min: 1, max: 5
-    We consider RF2 is meaningless, because RF1 has already filtered out all 
values that RF2 can filter.
-*/
-Status RuntimeFilterSlots::_disable_meaningless_filters(RuntimeState* state) {
-    // process ignore duplicate IN_FILTER
-    std::unordered_set<int> has_in_filter;
-    for (auto filter : _runtime_filters) {
-        filter->disable_meaningless_filters(has_in_filter, true);
-    }
-
-    // process ignore filter when it has IN_FILTER on same expr
-    for (auto filter : _runtime_filters) {
-        filter->disable_meaningless_filters(has_in_filter, false);
+Status RuntimeFilterSlots::send_filter_size(
+        RuntimeState* state, uint64_t hash_table_size,
+        std::shared_ptr<pipeline::CountedFinishDependency> dependency) {
+    // TODO: dependency is not needed if `_skip_runtime_filters_process` is 
true
+    for (auto runtime_filter : _runtime_filters) {
+        RETURN_IF_ERROR(runtime_filter->send_size(
+                state, _skip_runtime_filters_process ? 0 : hash_table_size, 
dependency));
     }
     return Status::OK();
 }
@@ -62,7 +66,7 @@ Status 
RuntimeFilterSlots::_disable_meaningless_filters(RuntimeState* state) {
 Status RuntimeFilterSlots::_init_filters(RuntimeState* state, uint64_t 
local_hash_table_size) {
     // process IN_OR_BLOOM_FILTER's real type
     for (auto filter : _runtime_filters) {
-        RETURN_IF_ERROR(filter->init_with_size(local_hash_table_size));
+        RETURN_IF_ERROR(filter->init(local_hash_table_size));
     }
     return Status::OK();
 }
@@ -70,51 +74,51 @@ Status RuntimeFilterSlots::_init_filters(RuntimeState* 
state, uint64_t local_has
 void RuntimeFilterSlots::_insert(const vectorized::Block* block, size_t start) 
{
     SCOPED_TIMER(_runtime_filter_compute_timer);
     for (auto& filter : _runtime_filters) {
+        if (!filter->impl()->is_valid()) {
+            // Skip building if ignored or disabled.
+            continue;
+        }
         int result_column_id =
                 
_build_expr_context[filter->expr_order()]->get_last_result_column_id();
         const auto& column = block->get_by_position(result_column_id).column;
-        filter->insert_batch(column, start);
+        filter->insert(column, start);
     }
 }
 
 Status RuntimeFilterSlots::process(
         RuntimeState* state, const vectorized::Block* block,
-        std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency) {
-    if (_skip_runtime_filters_process) {
-        return Status::OK();
-    }
-
-    auto wrapper_state = RuntimeFilterWrapper::State::READY;
-    if (state->get_task()->wake_up_early()) {
-        // partitial ignore rf to make global rf work
+        std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency,
+        vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) {
+    auto wrapper_state = _skip_runtime_filters_process ? 
RuntimeFilterWrapper::State::DISABLED
+                                                       : 
RuntimeFilterWrapper::State::READY;
+    if (state->get_task()->wake_up_early() && !_skip_runtime_filters_process) {
+        // Runtime filter is ignored partially which has no effect on 
correctness.
         wrapper_state = RuntimeFilterWrapper::State::IGNORED;
-    } else if (_should_build_hash_table) {
+    } else if (_should_build_hash_table && !_skip_runtime_filters_process) {
+        // Hash table is completed and runtime filter has a global size now.
         uint64_t hash_table_size = block ? block->rows() : 0;
-        {
-            RETURN_IF_ERROR(_init_filters(state, hash_table_size));
-            RETURN_IF_ERROR(_disable_meaningless_filters(state));
-        }
+        RETURN_IF_ERROR(_init_filters(state, hash_table_size));
         if (hash_table_size > 1) {
             _insert(block, 1);
         }
     }
 
     for (auto filter : _runtime_filters) {
-        filter->set_wrapper_state_and_ready_to_publish(wrapper_state);
+        if (shared_hash_table_ctx && _should_build_hash_table) {
+            filter->copy_to_shared_context(shared_hash_table_ctx);
+        } else if (shared_hash_table_ctx) {
+            filter->copy_from_shared_context(shared_hash_table_ctx);
+        }
+        if (_should_build_hash_table) {
+            filter->set_wrapper_state_and_ready_to_publish(
+                    wrapper_state, _skip_runtime_filters_process ? "skip all 
rf process" : "");
+        } else {
+            filter->set_state(RuntimeFilterProducer::State::READY_TO_PUBLISH);
+        }
     }
 
     RETURN_IF_ERROR(_publish(state));
     return Status::OK();
 }
 
-Status RuntimeFilterSlots::skip_runtime_filters_process(
-        RuntimeState* state, 
std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency) {
-    RETURN_IF_ERROR(send_filter_size(state, 0, finish_dependency));
-    for (auto filter : _runtime_filters) {
-        filter->disable_and_ready_to_publish("skip all rf process");
-    }
-    RETURN_IF_ERROR(_publish(state));
-    _skip_runtime_filters_process = true;
-    return Status::OK();
-}
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_slots.h 
b/be/src/runtime_filter/runtime_filter_slots.h
index 7f76e33d250..166a012f7e2 100644
--- a/be/src/runtime_filter/runtime_filter_slots.h
+++ b/be/src/runtime_filter/runtime_filter_slots.h
@@ -28,74 +28,49 @@
 
 namespace doris {
 // this class used in hash join node
+/**
+ * init -> (skip_runtime_filters ->) send_filter_size -> process
+ */
 class RuntimeFilterSlots {
 public:
     RuntimeFilterSlots(const vectorized::VExprContextSPtrs& build_expr_ctxs,
-                       RuntimeProfile* profile, bool should_build_hash_table)
+                       RuntimeProfile* profile, bool should_build_hash_table,
+                       bool is_broadcast_join)
             : _build_expr_context(build_expr_ctxs),
               _should_build_hash_table(should_build_hash_table),
-              _profile(new RuntimeProfile("RuntimeFilterSlots")) {
+              _profile(new RuntimeProfile("RuntimeFilterSlots")),
+              _is_broadcast_join(is_broadcast_join) {
         profile->add_child(_profile.get(), true, nullptr);
         _publish_runtime_filter_timer = ADD_TIMER_WITH_LEVEL(_profile, 
"PublishTime", 1);
         _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(_profile, 
"BuildTime", 1);
     }
-
-    Status init(RuntimeState* state, const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs) {
-        _runtime_filters.resize(runtime_filter_descs.size());
-        for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
-            RETURN_IF_ERROR(state->register_producer_runtime_filter(
-                    runtime_filter_descs[i], &_runtime_filters[i], 
_profile.get()));
-        }
-        return Status::OK();
-    }
-
+    Status init(RuntimeState* state, const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs);
     Status send_filter_size(RuntimeState* state, uint64_t hash_table_size,
                             std::shared_ptr<pipeline::CountedFinishDependency> 
dependency);
-
-    Status skip_runtime_filters_process(
-            RuntimeState* state,
-            std::shared_ptr<pipeline::CountedFinishDependency> 
finish_dependency);
-
+    void skip_runtime_filters() { _skip_runtime_filters_process = true; }
     Status process(RuntimeState* state, const vectorized::Block* block,
-                   std::shared_ptr<pipeline::CountedFinishDependency> 
finish_dependency);
-
-    void copy_to_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
-        for (auto& filter : _runtime_filters) {
-            filter->copy_to_shared_context(context);
-        }
-    }
-
-    Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
-        for (auto& filter : _runtime_filters) {
-            filter->copy_from_shared_context(context);
-        }
-        return Status::OK();
-    }
+                   std::shared_ptr<pipeline::CountedFinishDependency> 
finish_dependency,
+                   vectorized::SharedHashTableContextPtr& 
shared_hash_table_ctx);
 
 protected:
-    Status _disable_meaningless_filters(RuntimeState* state);
     Status _init_filters(RuntimeState* state, uint64_t local_hash_table_size);
     void _insert(const vectorized::Block* block, size_t start);
     Status _publish(RuntimeState* state) {
-        if (_skip_runtime_filters_process) {
-            return Status::OK();
-        }
         SCOPED_TIMER(_publish_runtime_filter_timer);
         for (auto& filter : _runtime_filters) {
-            RETURN_IF_ERROR(filter->publish(state, !_should_build_hash_table));
+            RETURN_IF_ERROR(filter->publish(state, _should_build_hash_table));
         }
         return Status::OK();
     }
 
     const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
_build_expr_context;
     std::vector<std::shared_ptr<RuntimeFilterProducer>> _runtime_filters;
-    bool _should_build_hash_table;
-
+    const bool _should_build_hash_table;
     RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
     RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
     std::unique_ptr<RuntimeProfile> _profile;
-
     bool _skip_runtime_filters_process = false;
+    const bool _is_broadcast_join;
 };
 
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_slots_cross.h 
b/be/src/runtime_filter/runtime_filter_slots_cross.h
index 74a5fc6c2d8..f66f5537224 100644
--- a/be/src/runtime_filter/runtime_filter_slots_cross.h
+++ b/be/src/runtime_filter/runtime_filter_slots_cross.h
@@ -33,8 +33,8 @@ namespace doris {
 class RuntimeFilterSlotsCross : public RuntimeFilterSlots {
 public:
     RuntimeFilterSlotsCross(const vectorized::VExprContextSPtrs& 
build_expr_ctxs,
-                            RuntimeProfile* profile, bool 
should_build_hash_table)
-            : RuntimeFilterSlots(build_expr_ctxs, profile, 
should_build_hash_table) {}
+                            RuntimeProfile* profile)
+            : RuntimeFilterSlots(build_expr_ctxs, profile, true, false) {}
 
     Status process(RuntimeState* state, vectorized::Blocks& blocks) {
         for (auto& block : blocks) {
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index c51ad4484aa..b41e72c7cea 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -45,6 +45,7 @@ public:
 
     Status change_to_bloom_filter();
 
+    bool is_valid() const { return _state != State::DISABLED && _state != 
State::IGNORED; }
     int filter_id() const { return _filter_id; }
 
     int max_in_num() const { return _max_in_num; }
@@ -145,19 +146,16 @@ public:
 
     std::string debug_string() const;
 
-    void set_state(State state) {
-        DCHECK(state != State::DISABLED);
+    void set_state(State state, std::string reason = "") {
         if (_state == State::DISABLED) {
             return;
+        } else if (state == State::DISABLED) {
+            _disabled_reason = reason;
         }
-
         _state = state;
     }
 
-    void disable(std::string reason) {
-        _state = State::DISABLED;
-        _disabled_reason = reason;
-    }
+    void disable(std::string reason) { set_state(State::DISABLED, reason); }
 
     State get_state() const { return _state; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to