This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 989f1894869 [test](join) Fuzzy disable runtime filters in BE  #45654 
(#46931)
989f1894869 is described below

commit 989f1894869b5208aa8f2075b096fb2faf5fce03
Author: Pxl <x...@selectdb.com>
AuthorDate: Tue Jan 14 12:46:56 2025 +0800

    [test](join) Fuzzy disable runtime filters in BE  #45654 (#46931)
    
    pick from  #45654
    
    ---------
    
    Co-authored-by: Jerry Hu <hushengg...@selectdb.com>
---
 be/src/exprs/runtime_filter.cpp                    | 59 +++++++++++++++-------
 be/src/exprs/runtime_filter.h                      |  3 ++
 be/src/exprs/runtime_filter_slots.h                | 27 +++++++---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 51 +++++++++++++++++--
 be/src/pipeline/exec/hashjoin_build_sink.h         |  4 ++
 be/src/runtime/runtime_filter_mgr.cpp              |  4 +-
 be/src/runtime/runtime_state.h                     |  5 ++
 be/src/vec/exec/join/vhash_join_node.h             |  2 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |  1 +
 .../java/org/apache/doris/qe/SessionVariable.java  | 12 +++++
 gensrc/proto/internal_service.proto                |  6 +++
 gensrc/thrift/PaloInternalService.thrift           |  2 +
 12 files changed, 142 insertions(+), 34 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 80d9494ac9e..5958fdf2724 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -474,9 +474,15 @@ public:
                           const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
-        if (wrapper->is_ignored()) {
+        if (wrapper->is_disabled()) {
+            set_disabled();
             return Status::OK();
         }
+
+        if (wrapper->is_ignored() || is_disabled()) {
+            return Status::OK();
+        }
+
         _context->ignored = false;
 
         bool can_not_merge_in_or_bloom =
@@ -495,15 +501,9 @@ public:
 
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            if (!_context->hybrid_set) {
-                _context->ignored = true;
-                return Status::OK();
-            }
             _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
             if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
-                _context->ignored = true;
-                // release in filter
-                _context->hybrid_set.reset();
+                set_disabled();
             }
             break;
         }
@@ -957,6 +957,10 @@ public:
 
     void set_ignored() { _context->ignored = true; }
 
+    bool is_disabled() const { return _context->disabled; }
+
+    void set_disabled() { _context->disabled = true; }
+
     void batch_assign(const PInFilter* filter,
                       void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
                                           PColumnValue&, ObjectPool*)) {
@@ -1217,9 +1221,10 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
     merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
     merge_filter_callback->cntl_->ignore_eovercrowded();
 
-    if (get_ignored()) {
+    if (get_ignored() || get_disabled()) {
         merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
-        merge_filter_request->set_ignored(true);
+        merge_filter_request->set_ignored(get_ignored());
+        merge_filter_request->set_disabled(get_disabled());
     } else {
         RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
     }
@@ -1241,7 +1246,7 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
                                           bool is_late_arrival) {
     DCHECK(is_consumer());
     auto origin_size = push_exprs.size();
-    if (!_wrapper->is_ignored()) {
+    if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) {
         _set_push_down(!is_late_arrival);
         RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, 
_probe_expr));
     }
@@ -1367,6 +1372,7 @@ PrimitiveType IRuntimeFilter::column_type() const {
 
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
+
     if (_enable_pipeline_exec) {
         _rf_state_atomic.store(RuntimeFilterState::READY);
         if (!_filter_timer.empty()) {
@@ -1420,16 +1426,21 @@ bool IRuntimeFilter::get_ignored() {
     return _wrapper->is_ignored();
 }
 
-std::string IRuntimeFilter::formatted_state() const {
-    return fmt::format(
-            "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
-            "HasLocalTarget = {}, Ignored = {}]",
-            _is_push_down, _get_explain_state_string(), _has_remote_target, 
_has_local_target,
-            _wrapper->_context->ignored);
+void IRuntimeFilter::set_disabled() {
+    _wrapper->set_disabled();
 }
 
-BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
-    return _wrapper->get_bloomfilter();
+bool IRuntimeFilter::get_disabled() const {
+    return _wrapper->is_disabled();
+}
+
+std::string IRuntimeFilter::formatted_state() const {
+    return fmt::format(
+            "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, 
HasRemoteTarget = {}, "
+            "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, 
WaitTimeMS = {}]",
+            _filter_id, _is_push_down, _get_explain_state_string(), 
_has_remote_target,
+            _has_local_target, _wrapper->_context->ignored, 
_wrapper->_context->disabled,
+            _wrapper->get_real_type(), wait_time_ms());
 }
 
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options,
@@ -1536,6 +1547,11 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParamsV2* param,
     *wrapper = param->pool->add(new RuntimePredicateWrapper(
             param->pool, column_type, get_type(filter_type), 
param->request->filter_id()));
 
+    if (param->request->has_disabled() && param->request->disabled()) {
+        (*wrapper)->set_disabled();
+        return Status::OK();
+    }
+
     if (param->request->has_ignored() && param->request->ignored()) {
         (*wrapper)->set_ignored();
         return Status::OK();
@@ -1582,6 +1598,11 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
     *wrapper = std::make_unique<RuntimePredicateWrapper>(pool, column_type, 
get_type(filter_type),
                                                          
param->request->filter_id());
 
+    if (param->request->has_disabled() && param->request->disabled()) {
+        (*wrapper)->set_disabled();
+        return Status::OK();
+    }
+
     if (param->request->has_ignored() && param->request->ignored()) {
         (*wrapper)->set_ignored();
         return Status::OK();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index b71bbd0648c..6f7695796bf 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -304,6 +304,9 @@ public:
 
     bool get_ignored();
 
+    void set_disabled();
+    bool get_disabled() const;
+
     RuntimeFilterType get_real_type();
 
     bool need_sync_filter_size();
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 2b9773ce89f..c06ac946283 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -67,11 +67,17 @@ public:
                                                    : hash_table_size;
     }
 
-    Status ignore_filters(RuntimeState* state) {
+    /**
+        Disable meaningless filters, such as filters:
+            RF1: col1 in (1, 3, 5)
+            RF2: col1 min: 1, max: 5
+        We consider RF2 is meaningless, because RF1 has already filtered out 
all values that RF2 can filter.
+     */
+    Status disable_meaningless_filters(RuntimeState* state) {
         // process ignore duplicate IN_FILTER
         std::unordered_set<int> has_in_filter;
-        for (auto* filter : _runtime_filters) {
-            if (filter->get_ignored()) {
+        for (auto filter : _runtime_filters) {
+            if (filter->get_ignored() || filter->get_disabled()) {
                 continue;
             }
             if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
@@ -82,7 +88,7 @@ public:
                 continue;
             }
             if (has_in_filter.contains(filter->expr_order())) {
-                filter->set_ignored();
+                filter->set_disabled();
                 continue;
             }
             has_in_filter.insert(filter->expr_order());
@@ -90,14 +96,14 @@ public:
 
         // process ignore filter when it has IN_FILTER on same expr
         for (auto filter : _runtime_filters) {
-            if (filter->get_ignored()) {
+            if (filter->get_ignored() || filter->get_disabled()) {
                 continue;
             }
             if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
                 !has_in_filter.contains(filter->expr_order())) {
                 continue;
             }
-            filter->set_ignored();
+            filter->set_disabled();
         }
         return Status::OK();
     }
@@ -109,6 +115,13 @@ public:
         return Status::OK();
     }
 
+    Status disable_all_filters() {
+        for (auto filter : _runtime_filters) {
+            filter->set_disabled();
+        }
+        return Status::OK();
+    }
+
     Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
         // process IN_OR_BLOOM_FILTER's real type
         for (auto* filter : _runtime_filters) {
@@ -139,7 +152,7 @@ public:
             int result_column_id = 
_build_expr_context[i]->get_last_result_column_id();
             const auto& column = 
block->get_by_position(result_column_id).column;
             for (auto* filter : iter->second) {
-                if (filter->get_ignored()) {
+                if (filter->get_ignored() || filter->get_disabled()) {
                     continue;
                 }
                 filter->insert_batch(column, 1);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 87c8d7caea3..c764b8d1a73 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -17,6 +17,7 @@
 
 #include "hashjoin_build_sink.h"
 
+#include <cstdlib>
 #include <string>
 
 #include "exprs/bloom_filter_func.h"
@@ -119,6 +120,15 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
+
+#ifndef NDEBUG
+    if (state->fuzzy_disable_runtime_filter_in_be()) {
+        if ((_parent->operator_id() + random()) % 2 == 0) {
+            RETURN_IF_ERROR(disable_runtime_filters(state));
+        }
+    }
+#endif
+
     return Status::OK();
 }
 
@@ -137,7 +147,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
     }};
 
-    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled() || !_eos) {
+    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled() || !_eos ||
+        _runtime_filters_disabled) {
         return Base::close(state, exec_status);
     }
 
@@ -152,7 +163,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
             {
                 SCOPED_TIMER(_runtime_filter_init_timer);
                 RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
-                RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+                
RETURN_IF_ERROR(_runtime_filter_slots->disable_meaningless_filters(state));
             }
             if (hash_table_size > 1) {
                 SCOPED_TIMER(_runtime_filter_compute_timer);
@@ -181,6 +192,33 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
     return Base::close(state, exec_status);
 }
 
+Status HashJoinBuildSinkLocalState::disable_runtime_filters(RuntimeState* 
state) {
+    if (_runtime_filters_disabled) {
+        return Status::OK();
+    }
+
+    if (_runtime_filters.empty()) {
+        return Status::OK();
+    }
+
+    if (!_should_build_hash_table) {
+        return Status::OK();
+    }
+
+    if (_runtime_filters.empty()) {
+        return Status::OK();
+    }
+
+    DCHECK(_runtime_filter_slots) << "_runtime_filter_slots should be 
initialized";
+
+    _runtime_filters_disabled = true;
+    RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
+    RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters());
+
+    SCOPED_TIMER(_publish_runtime_filter_timer);
+    return _runtime_filter_slots->publish(!_should_build_hash_table);
+}
+
 bool HashJoinBuildSinkLocalState::build_unique() const {
     return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
 }
@@ -605,9 +643,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state._shared_state->build_block = 
std::make_shared<vectorized::Block>(
                 local_state._build_side_mutable_block.to_block());
 
-        RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
-                state, local_state._shared_state->build_block->rows(),
-                local_state._finish_dependency));
+        if (!local_state._runtime_filters_disabled) {
+            
RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
+                    state, local_state._shared_state->build_block->rows(),
+                    local_state._finish_dependency));
+        }
+
         RETURN_IF_ERROR(
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 3b55dc5e44d..60251d33055 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -74,6 +74,8 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
 
+    Status disable_runtime_filters(RuntimeState* state);
+
 protected:
     void _hash_table_init(RuntimeState* state);
     void _set_build_ignore_flag(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
@@ -97,6 +99,8 @@ protected:
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
 
+    bool _runtime_filters_disabled = false;
+
     size_t _build_side_rows = 0;
     std::vector<vectorized::Block> _build_blocks;
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 640cece8fb3..89975973597 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -456,7 +456,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             void* data = nullptr;
             int len = 0;
             bool has_attachment = false;
-            if (!cnt_val->filter->get_ignored()) {
+            if (!cnt_val->filter->get_ignored() && 
!cnt_val->filter->get_disabled()) {
                 RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, 
&data, &len));
             } else {
                 apply_request.set_ignored(true);
@@ -535,7 +535,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             void* data = nullptr;
             int len = 0;
             bool has_attachment = false;
-            if (!cnt_val->filter->get_ignored()) {
+            if (!cnt_val->filter->get_ignored() && 
!cnt_val->filter->get_disabled()) {
                 RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, 
&data, &len));
             } else {
                 apply_request.set_ignored(true);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 96633f7215e..4deb8babd10 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -652,6 +652,11 @@ public:
         return _query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill;
     }
 
+    bool fuzzy_disable_runtime_filter_in_be() const {
+        return _query_options.__isset.fuzzy_disable_runtime_filter_in_be &&
+               _query_options.fuzzy_disable_runtime_filter_in_be;
+    }
+
     int64_t min_revocable_mem() const {
         if (_query_options.__isset.min_revocable_mem) {
             return _query_options.min_revocable_mem;
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index f23a992049f..5cfac9f9aba 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -82,7 +82,7 @@ Status process_runtime_filter_build(RuntimeState* state, 
Block* block, Parent* p
     {
         SCOPED_TIMER(parent->_runtime_filter_init_timer);
         RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state, 
rows));
-        RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state));
+        
RETURN_IF_ERROR(parent->_runtime_filter_slots->disable_meaningless_filters(state));
     }
 
     if (!parent->_runtime_filter_slots->empty() && rows > 1) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index aba441f282a..ea26333a3aa 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -46,6 +46,7 @@ struct RuntimeFilterContext {
     std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
     std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
     bool ignored = false;
+    bool disabled = false;
     std::string err_msg;
 };
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 894de847158..f68fd1423c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -542,6 +542,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
     public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
 
+    public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = 
"fuzzy_disable_runtime_filter_in_be";
+
     public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
 
     public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
@@ -2128,6 +2130,13 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true, fuzzy = true)
     public long dataQueueMaxBlocks = 1;
 
+    @VariableMgr.VarAttr(
+            name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE,
+            description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试",
+                    "Disable the runtime filter on the BE for testing 
purposes."},
+            needForward = true, fuzzy = false)
+    public boolean fuzzyDisableRuntimeFilterInBE = false;
+
     // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
     // Set to 0 to disable; min: 128M
     public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -2376,6 +2385,8 @@ public class SessionVariable implements Serializable, 
Writable {
                     this.batchSize = 50;
                     this.enableFoldConstantByBe = false;
                 }
+
+                this.fuzzyDisableRuntimeFilterInBE = true;
             }
         }
 
@@ -3781,6 +3792,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableForceSpill(enableForceSpill);
         tResult.setMinRevocableMem(minRevocableMem);
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+        
tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE);
 
         tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
         tResult.setSerdeDialect(getSerdeDialect());
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 72b11e6e2ed..1ea3b798f4a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -573,6 +573,8 @@ message PMergeFilterRequest {
     optional PColumnType column_type = 10;
     optional bool contain_null = 11;
     optional bool ignored = 12;
+    optional uint64 local_merge_time = 13;
+    optional bool disabled = 14;
 };
 
 message PMergeFilterResponse {
@@ -593,6 +595,7 @@ message PPublishFilterRequest {
     optional PColumnType column_type = 10;
     optional bool contain_null = 11;
     optional bool ignored = 12;
+    optional bool disabled = 13;
 };
 
 message PPublishFilterRequestV2 {
@@ -607,6 +610,9 @@ message PPublishFilterRequestV2 {
     optional int64 merge_time = 9;
     optional bool contain_null = 10;
     optional bool ignored = 11;
+    repeated int32 fragment_ids = 12;
+    optional uint64 local_merge_time = 13;
+    optional bool disabled = 14;
 };
 
 message PPublishFilterResponse {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b0e3ad456fc..c612826836e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -339,6 +339,8 @@ struct TQueryOptions {
   139: optional i64 orc_once_max_read_bytes = 8388608;
   140: optional i64 orc_max_merge_distance_bytes = 1048576;
 
+  146: optional bool fuzzy_disable_runtime_filter_in_be = false;
+
   // upgrade options. keep them same in every branch.
   200: optional bool new_is_ip_address_in_range = false;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to