This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3bb027bce [Rf](exec) Support build exactly not cal by ndv (#30398)
ae3bb027bce is described below

commit ae3bb027bce651a0e10e5d496985d55ff23f666f
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Jan 26 17:14:46 2024 +0800

    [Rf](exec) Support build exactly not cal by ndv (#30398)
---
 be/src/exprs/bloom_filter_func.h             | 17 +++--------------
 be/src/exprs/runtime_filter.cpp              | 19 ++++++++++++-------
 be/src/pipeline/exec/hashjoin_build_sink.cpp |  7 -------
 be/src/vec/exec/join/vhash_join_node.cpp     |  5 -----
 4 files changed, 15 insertions(+), 33 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 71dc3f6e663..ed4205a7e0d 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -83,21 +83,11 @@ class BloomFilterFuncBase : public FilterFuncBase {
 public:
     virtual ~BloomFilterFuncBase() = default;
 
-    Status init(int64_t expect_num, double fpp) {
-        size_t filter_size = BloomFilterAdaptor::optimal_bit_num(expect_num, 
fpp);
-        return init_with_fixed_length(filter_size);
-    }
-
     void set_length(int64_t bloom_filter_length) { _bloom_filter_length = 
bloom_filter_length; }
 
     void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = 
build_bf_exactly; }
 
-    Status init_with_fixed_length() {
-        if (_build_bf_exactly) {
-            return Status::OK();
-        }
-        return init_with_fixed_length(_bloom_filter_length);
-    }
+    Status init_with_fixed_length() { return 
init_with_fixed_length(_bloom_filter_length); }
 
     Status init_with_cardinality(const size_t build_bf_cardinality) {
         if (_build_bf_exactly) {
@@ -109,10 +99,9 @@ public:
 
             // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
             int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) 
/ std::log(2))));
-            _bloom_filter_length = std::min(((int64_t)1) << log_filter_size, 
_bloom_filter_length);
-            return init_with_fixed_length(_bloom_filter_length);
+            _bloom_filter_length = (((int64_t)1) << log_filter_size);
         }
-        return Status::OK();
+        return init_with_fixed_length(_bloom_filter_length);
     }
 
     Status init_with_fixed_length(int64_t bloom_filter_length) {
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c509662a426..eef1e6a5245 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -338,15 +338,17 @@ public:
         return Status::OK();
     }
 
-    void change_to_bloom_filter() {
+    void change_to_bloom_filter(bool need_init_bf = false) {
         CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
                 << "Can not change to bloom filter because of runtime filter 
type is "
                 << IRuntimeFilter::to_string(_filter_type);
         _is_bloomfilter = true;
         BloomFilterFuncBase* bf = _context.bloom_filter_func.get();
-        // BloomFilter may be not init
-        static_cast<void>(bf->init_with_fixed_length());
-        insert_to_bloom_filter(bf);
+        if (need_init_bf) {
+            // BloomFilter may be not init
+            static_cast<void>(bf->init_with_fixed_length());
+            insert_to_bloom_filter(bf);
+        }
         // release in filter
         _context.hybrid_set.reset(create_set(_column_return_type));
     }
@@ -533,12 +535,12 @@ public:
                         VLOG_DEBUG << " change runtime filter to bloom 
filter(id=" << _filter_id
                                    << ") because: in_num(" << 
_context.hybrid_set->size()
                                    << ") >= max_in_num(" << _max_in_num << ")";
-                        change_to_bloom_filter();
+                        change_to_bloom_filter(true);
                     }
                 } else {
                     VLOG_DEBUG << " change runtime filter to bloom filter(id=" 
<< _filter_id
                                << ") because: already exist a bloom filter";
-                    change_to_bloom_filter();
+                    change_to_bloom_filter(true);
                     RETURN_IF_ERROR(_context.bloom_filter_func->merge(
                             wrapper->_context.bloom_filter_func.get()));
                 }
@@ -1198,7 +1200,10 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     // 1. Only 1 join key
     // 2. Do not have remote target (e.g. do not need to merge), or broadcast 
join
     // 3. Bloom filter
-    params.build_bf_exactly = build_bf_exactly && (!_has_remote_target || 
_is_broadcast_join) &&
+    // 4. FE do not use ndv stat to predict the bf size, only the row count. 
BE have more
+    // exactly row count stat
+    params.build_bf_exactly = build_bf_exactly && 
!desc->bloom_filter_size_calculated_by_ndv &&
+                              (!_has_remote_target || _is_broadcast_join) &&
                               (_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER ||
                                _runtime_filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
     if (desc->__isset.bloom_filter_size_bytes) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 2711b0d8852..4ea3b006868 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -116,13 +116,6 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
-    auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
-
-    for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
-        if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
-            RETURN_IF_ERROR(bf->init_with_fixed_length());
-        }
-    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index e6c00d94a2f..c7b6b5d0411 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -662,11 +662,6 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState* 
state) {
     SCOPED_TIMER(_exec_timer);
     SCOPED_TIMER(_allocate_resource_timer);
     RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
-    for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
-        if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
-            RETURN_IF_ERROR(bf->init_with_fixed_length());
-        }
-    }
     RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state));
     RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
     for (auto& conjunct : _other_join_conjuncts) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to