This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/new_join by this push:
     new a7bc11faea8 fix runtime filter performance (#26727)
a7bc11faea8 is described below

commit a7bc11faea85fb33e3b003034d0f8b510bd8fcae
Author: HappenLee <[email protected]>
AuthorDate: Fri Nov 10 10:14:05 2023 +0800

    fix runtime filter performance (#26727)
---
 be/src/exprs/bloom_filter_func.h             | 29 ++++++++++++++--------------
 be/src/exprs/runtime_filter_slots.h          |  4 ++--
 be/src/pipeline/exec/hashjoin_build_sink.cpp |  3 +--
 be/src/pipeline/exec/hashjoin_build_sink.h   |  1 -
 be/src/vec/exec/join/vhash_join_node.cpp     |  2 +-
 be/src/vec/exec/join/vhash_join_node.h       |  5 ++---
 6 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index a9330a01169..3f1e25efdc8 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -94,24 +94,25 @@ public:
     void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = 
build_bf_exactly; }
 
     Status init_with_fixed_length() {
-        //        if (_build_bf_exactly) {
-        //            return Status::OK();
-        //        }
+        if (_build_bf_exactly) {
+            return Status::OK();
+        }
         return init_with_fixed_length(_bloom_filter_length);
     }
 
     Status init_with_cardinality(const size_t build_bf_cardinality) {
-        //        if (_build_bf_exactly) {
-        //            // Use the same algorithm as 
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
-        //            constexpr double fpp = 0.05;
-        //            constexpr double k = 8; // BUCKET_WORDS
-        //            // m is the number of bits we would need to get the fpp 
specified
-        //            double m = -k * build_bf_cardinality / std::log(1 - 
std::pow(fpp, 1.0 / k));
-        //
-        //            // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
-        //            int log_filter_size = std::max(0, 
(int)(std::ceil(std::log(m / 8) / std::log(2))));
-        //            return init_with_fixed_length(((int64_t)1) << 
log_filter_size);
-        //        }
+        if (_build_bf_exactly) {
+            // Use the same algorithm as 
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
+            constexpr double fpp = 0.05;
+            constexpr double k = 8; // BUCKET_WORDS
+            // m is the number of bits we would need to get the fpp specified
+            double m = -k * build_bf_cardinality / std::log(1 - std::pow(fpp, 
1.0 / k));
+
+            // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
+            int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) 
/ std::log(2))));
+            _bloom_filter_length = std::min(((int64_t)1) << log_filter_size, 
_bloom_filter_length);
+            return init_with_fixed_length(_bloom_filter_length);
+        }
         return Status::OK();
     }
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 01155493f87..62cf0eab7d1 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -37,7 +37,7 @@ public:
             const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
             : _build_expr_context(build_expr_ctxs), 
_runtime_filter_descs(runtime_filter_descs) {}
 
-    Status init(RuntimeState* state, int64_t hash_table_size, size_t 
build_bf_cardinality) {
+    Status init(RuntimeState* state, int64_t hash_table_size) {
         // runtime filter effect strategy
         // 1. we will ignore IN filter when hash_table_size is too big
         // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
@@ -111,7 +111,7 @@ public:
             }
 
             if (runtime_filter->is_bloomfilter()) {
-                
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(build_bf_cardinality));
+                
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size));
             }
 
             // Note:
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 748246a8f27..2483bff5c80 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -483,8 +483,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                                                 _build_expr_ctxs, 
_runtime_filter_descs);
 
                                 
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
-                                        state, arg.hash_table->size(),
-                                        local_state._build_rf_cardinality));
+                                        state, arg.hash_table->size()));
                                 RETURN_IF_ERROR(
                                         
local_state._runtime_filter_slots->copy_from_shared_context(
                                                 _shared_hash_table_context));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index dc047e39848..634ee2c50e3 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -100,7 +100,6 @@ protected:
     std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots = nullptr;
     bool _has_set_need_null_map_for_build = false;
     bool _build_side_ignore_null = false;
-    size_t _build_rf_cardinality = 0;
     std::unordered_set<const vectorized::Block*> _inserted_blocks;
     std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index dc13bb44ee4..2b7447ee56b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -816,7 +816,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                                           _build_expr_ctxs, 
_runtime_filter_descs);
 
                                   RETURN_IF_ERROR(_runtime_filter_slots->init(
-                                          state, arg.hash_table->size(), 
_build_rf_cardinality));
+                                          state, arg.hash_table->size()));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
                                           _shared_hash_table_context));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->publish());
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 6d47528fefd..a58241234d8 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -83,8 +83,8 @@ struct ProcessRuntimeFilterBuild {
         parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
                 parent->_build_expr_ctxs, parent->runtime_filter_descs());
 
-        RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
-                state, hash_table_ctx.hash_table->size(), 
parent->_build_rf_cardinality));
+        RETURN_IF_ERROR(
+                parent->_runtime_filter_slots->init(state, 
hash_table_ctx.hash_table->size()));
 
         if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_blocks.empty()) {
             {
@@ -445,7 +445,6 @@ private:
     std::unordered_set<const Block*> _inserted_blocks;
 
     std::vector<IRuntimeFilter*> _runtime_filters;
-    size_t _build_rf_cardinality = 0;
     std::atomic_bool _probe_open_finish = false;
 };
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to