This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/new_join by this push:
new a7bc11faea8 fix runtime filter performance (#26727)
a7bc11faea8 is described below
commit a7bc11faea85fb33e3b003034d0f8b510bd8fcae
Author: HappenLee <[email protected]>
AuthorDate: Fri Nov 10 10:14:05 2023 +0800
fix runtime filter performance (#26727)
---
be/src/exprs/bloom_filter_func.h | 29 ++++++++++++++--------------
be/src/exprs/runtime_filter_slots.h | 4 ++--
be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 +--
be/src/pipeline/exec/hashjoin_build_sink.h | 1 -
be/src/vec/exec/join/vhash_join_node.cpp | 2 +-
be/src/vec/exec/join/vhash_join_node.h | 5 ++---
6 files changed, 21 insertions(+), 23 deletions(-)
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index a9330a01169..3f1e25efdc8 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -94,24 +94,25 @@ public:
void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly =
build_bf_exactly; }
Status init_with_fixed_length() {
- // if (_build_bf_exactly) {
- // return Status::OK();
- // }
+ if (_build_bf_exactly) {
+ return Status::OK();
+ }
return init_with_fixed_length(_bloom_filter_length);
}
Status init_with_cardinality(const size_t build_bf_cardinality) {
- // if (_build_bf_exactly) {
- // // Use the same algorithm as
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
- // constexpr double fpp = 0.05;
- // constexpr double k = 8; // BUCKET_WORDS
- // // m is the number of bits we would need to get the fpp
specified
- // double m = -k * build_bf_cardinality / std::log(1 -
std::pow(fpp, 1.0 / k));
- //
- // // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
- // int log_filter_size = std::max(0,
(int)(std::ceil(std::log(m / 8) / std::log(2))));
- // return init_with_fixed_length(((int64_t)1) <<
log_filter_size);
- // }
+ if (_build_bf_exactly) {
+ // Use the same algorithm as
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
+ constexpr double fpp = 0.05;
+ constexpr double k = 8; // BUCKET_WORDS
+ // m is the number of bits we would need to get the fpp specified
+ double m = -k * build_bf_cardinality / std::log(1 - std::pow(fpp,
1.0 / k));
+
+ // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
+ int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8)
/ std::log(2))));
+ _bloom_filter_length = std::min(((int64_t)1) << log_filter_size,
_bloom_filter_length);
+ return init_with_fixed_length(_bloom_filter_length);
+ }
return Status::OK();
}
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 01155493f87..62cf0eab7d1 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -37,7 +37,7 @@ public:
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
: _build_expr_context(build_expr_ctxs),
_runtime_filter_descs(runtime_filter_descs) {}
- Status init(RuntimeState* state, int64_t hash_table_size, size_t
build_bf_cardinality) {
+ Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
// 1. we will ignore IN filter when hash_table_size is too big
// 2. we will ignore BLOOM filter and MinMax filter when
hash_table_size
@@ -111,7 +111,7 @@ public:
}
if (runtime_filter->is_bloomfilter()) {
-
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(build_bf_cardinality));
+
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size));
}
// Note:
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 748246a8f27..2483bff5c80 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -483,8 +483,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
_build_expr_ctxs,
_runtime_filter_descs);
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
- state, arg.hash_table->size(),
- local_state._build_rf_cardinality));
+ state, arg.hash_table->size()));
RETURN_IF_ERROR(
local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index dc047e39848..634ee2c50e3 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -100,7 +100,6 @@ protected:
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots = nullptr;
bool _has_set_need_null_map_for_build = false;
bool _build_side_ignore_null = false;
- size_t _build_rf_cardinality = 0;
std::unordered_set<const vectorized::Block*> _inserted_blocks;
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index dc13bb44ee4..2b7447ee56b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -816,7 +816,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
_build_expr_ctxs,
_runtime_filter_descs);
RETURN_IF_ERROR(_runtime_filter_slots->init(
- state, arg.hash_table->size(),
_build_rf_cardinality));
+ state, arg.hash_table->size()));
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
RETURN_IF_ERROR(_runtime_filter_slots->publish());
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 6d47528fefd..a58241234d8 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -83,8 +83,8 @@ struct ProcessRuntimeFilterBuild {
parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
parent->_build_expr_ctxs, parent->runtime_filter_descs());
- RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
- state, hash_table_ctx.hash_table->size(),
parent->_build_rf_cardinality));
+ RETURN_IF_ERROR(
+ parent->_runtime_filter_slots->init(state,
hash_table_ctx.hash_table->size()));
if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_blocks.empty()) {
{
@@ -445,7 +445,6 @@ private:
std::unordered_set<const Block*> _inserted_blocks;
std::vector<IRuntimeFilter*> _runtime_filters;
- size_t _build_rf_cardinality = 0;
std::atomic_bool _probe_open_finish = false;
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]