This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4ba4767eef2 [improvement](scan) make global runtime filter support in-list filter (#29394) 4ba4767eef2 is described below commit 4ba4767eef28eca4d1c64fbf45a9c2ebc594d2e1 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Thu Jan 4 09:12:30 2024 +0800 [improvement](scan) make global runtime filter support in-list filter (#29394) --- be/src/exprs/runtime_filter.cpp | 45 ++++++++++++++-------- be/src/exprs/runtime_filter.h | 4 +- be/src/exprs/runtime_filter_slots.h | 5 +-- .../processor/post/RuntimeFilterGenerator.java | 6 +-- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f52a9574bf6..bf09adc53f8 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -453,9 +453,11 @@ public: std::vector<vectorized::VExprSPtr>& push_exprs, const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && - (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && - wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER); + bool can_not_merge_in_or_bloom = + _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && + (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && + wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER && + wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER); bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER && _filter_type != wrapper->_filter_type; @@ -513,8 +515,15 @@ public: case RuntimeFilterType::IN_OR_BLOOM_FILTER: { auto real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER : RuntimeFilterType::IN_FILTER; + + auto other_filter_type = wrapper->_filter_type; + if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + other_filter_type = wrapper->_is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER + : RuntimeFilterType::IN_FILTER; + } + if (real_filter_type == RuntimeFilterType::IN_FILTER) { - if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in + if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in CHECK(!wrapper->_is_ignored_in_filter) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " @@ -526,7 +535,6 @@ public: << ") >= max_in_num(" << _max_in_num << ")"; change_to_bloom_filter(); } - // in merge bloom filter } else { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; @@ -535,8 +543,7 @@ public: wrapper->_context.bloom_filter_func.get())); } } else { - if (wrapper->_filter_type == - RuntimeFilterType::IN_FILTER) { // bloom filter merge in + if (other_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in CHECK(!wrapper->_is_ignored_in_filter) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " @@ -1157,6 +1164,14 @@ void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim _filter_timer.push_back(timer); } +void IRuntimeFilter::set_ignored(const std::string& msg) { + _is_ignored = true; + if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { + _wrapper->_is_ignored_in_filter = true; + _wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg)); + } +} + BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { return _wrapper->get_bloomfilter(); } @@ -1344,14 +1359,14 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { if (!_is_ignored && wrapper->is_ignored_in_filter()) { - set_ignored(); - set_ignored_msg(*(wrapper->get_ignored_in_filter_msg())); + std::string* msg = wrapper->get_ignored_in_filter_msg(); + set_ignored(msg ? *msg : ""); } auto origin_type = _wrapper->get_real_type(); Status status = _wrapper->merge(wrapper); if (!_is_ignored && _wrapper->is_ignored_in_filter()) { - set_ignored(); - set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg())); + std::string* msg = _wrapper->get_ignored_in_filter_msg(); + set_ignored(msg ? *msg : ""); } if (origin_type != _wrapper->get_real_type()) { update_runtime_filter_type_to_profile(); @@ -1656,10 +1671,8 @@ bool IRuntimeFilter::is_bloomfilter() { Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { - set_ignored(); const PInFilter in_filter = param->request->in_filter(); - auto msg = param->pool->add(new std::string(in_filter.ignored_msg())); - set_ignored_msg(*msg); + set_ignored(in_filter.ignored_msg()); } std::unique_ptr<RuntimePredicateWrapper> wrapper; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &wrapper)); @@ -1677,10 +1690,8 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { - set_ignored(); const PInFilter in_filter = param->request->in_filter(); - auto msg = param->pool->add(new std::string(in_filter.ignored_msg())); - set_ignored_msg(*msg); + set_ignored(in_filter.ignored_msg()); } std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 2673308ae77..fc324c1c1be 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -292,9 +292,7 @@ public: Status update_filter(const UpdateRuntimeFilterParams* param); Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply); - void set_ignored() { _is_ignored = true; } - - void set_ignored_msg(std::string& msg) { _ignored_msg = msg; } + void set_ignored(const std::string& msg); // for ut bool is_bloomfilter(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index d539e295ae8..495ac28e762 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -58,15 +58,14 @@ public: filter_id); } for (auto filter : filters) { - filter->set_ignored(); + filter->set_ignored(""); filter->signal(); } return Status::OK(); }; auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) { - runtime_filter->set_ignored(); - runtime_filter->set_ignored_msg(msg); + runtime_filter->set_ignored(msg); RETURN_IF_ERROR(runtime_filter->publish()); return Status::OK(); }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 45a14d3b633..12db27793d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -369,11 +369,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()) .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); - if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) { - // If storage data distribution is ignored, we use BLOOM filter. - legalTypes.clear(); - legalTypes.add(TRuntimeFilterType.BLOOM); - } + List<EqualTo> hashJoinConjuncts = join.getEqualToConjuncts(); for (int i = 0; i < hashJoinConjuncts.size(); i++) { EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org