This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 29ca776 [Refactor] Refactor part of RuntimeFilter's code (#6998) 29ca776 is described below commit 29ca77622fe492ec767221b7d87b6dfcfff1aa8c Author: Pxl <952130...@qq.com> AuthorDate: Sun Nov 7 17:40:45 2021 +0800 [Refactor] Refactor part of RuntimeFilter's code (#6998) #6997 --- be/src/exec/hash_join_node.cpp | 5 +- be/src/exec/hash_join_node.h | 1 + be/src/exprs/bloomfilter_predicate.cpp | 22 ++++-- be/src/exprs/hybrid_set.cpp | 19 +++-- be/src/exprs/hybrid_set.h | 16 ++-- be/src/exprs/runtime_filter.cpp | 108 +++++--------------------- be/src/exprs/runtime_filter.h | 46 +---------- be/src/exprs/runtime_filter_slots.h | 137 +++++++++++++++++++++++++++++++++ be/src/runtime/primitive_type.h | 4 + 9 files changed, 197 insertions(+), 161 deletions(-) diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 3fc0dfe..b1b9356 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -242,8 +242,7 @@ Status HashJoinNode::open(RuntimeState* state) { _runtime_filter_descs); RETURN_IF_ERROR(thread_status.get_future().get()); - RETURN_IF_ERROR(runtime_filter_slots.init(state, _pool, expr_mem_tracker().get(), - _hash_tbl->size())); + RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size())); { SCOPED_TIMER(_push_compute_timer); auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); }; @@ -252,7 +251,7 @@ Status HashJoinNode::open(RuntimeState* state) { COUNTER_UPDATE(_build_timer, _push_compute_timer->value()); { SCOPED_TIMER(_push_down_timer); - runtime_filter_slots.publish(this); + runtime_filter_slots.publish(); } Status open_status = child(0)->open(state); RETURN_IF_ERROR(open_status); diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 2379f77..f097a2c 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -25,6 +25,7 @@ #include "exec/exec_node.h" #include "exec/hash_table.h" +#include "exprs/runtime_filter_slots.h" #include "gen_cpp/PlanNodes_types.h" namespace doris { diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp index 44906fd..4bc7584 100644 --- a/be/src/exprs/bloomfilter_predicate.cpp +++ b/be/src/exprs/bloomfilter_predicate.cpp @@ -40,31 +40,37 @@ IBloomFilterFuncBase* IBloomFilterFuncBase::create_bloom_filter(MemTracker* trac return new BloomFilterFunc<TYPE_INT, CurrentBloomFilterAdaptor>(tracker); case TYPE_BIGINT: return new BloomFilterFunc<TYPE_BIGINT, CurrentBloomFilterAdaptor>(tracker); + case TYPE_LARGEINT: + return new BloomFilterFunc<TYPE_LARGEINT, CurrentBloomFilterAdaptor>(tracker); + case TYPE_FLOAT: return new BloomFilterFunc<TYPE_FLOAT, CurrentBloomFilterAdaptor>(tracker); case TYPE_DOUBLE: return new BloomFilterFunc<TYPE_DOUBLE, CurrentBloomFilterAdaptor>(tracker); + + case TYPE_DECIMALV2: + return new BloomFilterFunc<TYPE_DECIMALV2, CurrentBloomFilterAdaptor>(tracker); + + case TYPE_TIME: + return new BloomFilterFunc<TYPE_TIME, CurrentBloomFilterAdaptor>(tracker); case TYPE_DATE: return new BloomFilterFunc<TYPE_DATE, CurrentBloomFilterAdaptor>(tracker); case TYPE_DATETIME: return new BloomFilterFunc<TYPE_DATETIME, CurrentBloomFilterAdaptor>(tracker); - case TYPE_DECIMALV2: - return new BloomFilterFunc<TYPE_DECIMALV2, CurrentBloomFilterAdaptor>(tracker); - case TYPE_LARGEINT: - return new BloomFilterFunc<TYPE_LARGEINT, CurrentBloomFilterAdaptor>(tracker); + case TYPE_CHAR: return new BloomFilterFunc<TYPE_CHAR, CurrentBloomFilterAdaptor>(tracker); case TYPE_VARCHAR: return new BloomFilterFunc<TYPE_VARCHAR, CurrentBloomFilterAdaptor>(tracker); case TYPE_STRING: - return new BloomFilterFunc<TYPE_STRING, CurrentBloomFilterAdaptor>(tracker); + return new BloomFilterFunc<TYPE_STRING, CurrentBloomFilterAdaptor>(tracker); + default: - return nullptr; + DCHECK(false) << "Invalid type."; } return nullptr; } - BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node) : Predicate(node), _is_prepare(false), @@ -74,7 +80,7 @@ BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node) BloomFilterPredicate::~BloomFilterPredicate() { VLOG_NOTICE << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows - << ",rate:" << (double)_filtered_rows / _scan_rows; + << ",rate:" << (double)_filtered_rows / _scan_rows; } BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other) diff --git a/be/src/exprs/hybrid_set.cpp b/be/src/exprs/hybrid_set.cpp index 4ebdabc..1b3fd37 100644 --- a/be/src/exprs/hybrid_set.cpp +++ b/be/src/exprs/hybrid_set.cpp @@ -36,21 +36,22 @@ HybridSetBase* HybridSetBase::create_set(PrimitiveType type) { case TYPE_BIGINT: return new (std::nothrow) HybridSet<int64_t>(); + case TYPE_LARGEINT: + return new (std::nothrow) HybridSet<__int128>(); + case TYPE_FLOAT: return new (std::nothrow) HybridSet<float>(); + case TYPE_TIME: case TYPE_DOUBLE: return new (std::nothrow) HybridSet<double>(); - case TYPE_DATE: - case TYPE_DATETIME: - return new (std::nothrow) HybridSet<DateTimeValue>(); - case TYPE_DECIMALV2: return new (std::nothrow) HybridSet<DecimalV2Value>(); - case TYPE_LARGEINT: - return new (std::nothrow) HybridSet<__int128>(); + case TYPE_DATE: + case TYPE_DATETIME: + return new (std::nothrow) HybridSet<DateTimeValue>(); case TYPE_CHAR: case TYPE_VARCHAR: @@ -58,12 +59,10 @@ HybridSetBase* HybridSetBase::create_set(PrimitiveType type) { return new (std::nothrow) StringValueSet(); default: - return NULL; + DCHECK(false) << "Invalid type."; } - return NULL; + return nullptr; } } // namespace doris - -/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 47c02ea..6947f09 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -35,7 +35,7 @@ class HybridSetBase { public: HybridSetBase() = default; virtual ~HybridSetBase() = default; - virtual void insert(void* data) = 0; + virtual void insert(const void* data) = 0; // use in vectorize execute engine virtual void insert(void* data, size_t) = 0; @@ -66,17 +66,18 @@ public: ~HybridSet() override = default; - void insert(void* data) override { + void insert(const void* data) override { + if (data == nullptr) return; + if (sizeof(T) >= 16) { // for largeint, it will core dump with no memcpy T value; memcpy(&value, data, sizeof(T)); _set.insert(value); } else { - _set.insert(*reinterpret_cast<T*>(data)); + _set.insert(*reinterpret_cast<const T*>(data)); } } - void insert(void* data, size_t) override { insert(data); } void insert(HybridSetBase* set) override { @@ -124,12 +125,13 @@ public: ~StringValueSet() override = default; - void insert(void* data) override { - auto* value = reinterpret_cast<StringValue*>(data); + void insert(const void* data) override { + if (data == nullptr) return; + + const auto* value = reinterpret_cast<const StringValue*>(data); std::string str_value(value->ptr, value->len); _set.insert(str_value); } - void insert(void* data, size_t size) override { std::string str_value(reinterpret_cast<char*>(data), size); _set.insert(str_value); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 446eb10..17e7f3b 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -43,7 +43,7 @@ namespace doris { // only used in Runtime Filter class MinMaxFuncBase { public: - virtual void insert(void* data) = 0; + virtual void insert(const void* data) = 0; virtual bool find(void* data) = 0; virtual bool is_empty() = 0; virtual void* get_max() = 0; @@ -61,9 +61,9 @@ class MinMaxNumFunc : public MinMaxFuncBase { public: MinMaxNumFunc() = default; ~MinMaxNumFunc() = default; - virtual void insert(void* data) { + virtual void insert(const void* data) { if (data == nullptr) return; - T val_data = *reinterpret_cast<T*>(data); + const T val_data = *reinterpret_cast<const T*>(data); if (_empty) { _min = val_data; _max = val_data; @@ -101,7 +101,6 @@ public: _max.ptr = str->data(); _max.len = str->length(); } - } else { MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func); if (other_minmax->_min < _min) { @@ -151,30 +150,33 @@ MinMaxFuncBase* MinMaxFuncBase::create_minmax_filter(PrimitiveType type) { case TYPE_BIGINT: return new (std::nothrow) MinMaxNumFunc<int64_t>(); + case TYPE_LARGEINT: + return new (std::nothrow) MinMaxNumFunc<__int128>(); + case TYPE_FLOAT: return new (std::nothrow) MinMaxNumFunc<float>(); + case TYPE_TIME: case TYPE_DOUBLE: return new (std::nothrow) MinMaxNumFunc<double>(); - case TYPE_DATE: - case TYPE_DATETIME: - return new (std::nothrow) MinMaxNumFunc<DateTimeValue>(); - case TYPE_DECIMALV2: return new (std::nothrow) MinMaxNumFunc<DecimalV2Value>(); - case TYPE_LARGEINT: - return new (std::nothrow) MinMaxNumFunc<__int128>(); + case TYPE_DATE: + case TYPE_DATETIME: + return new (std::nothrow) MinMaxNumFunc<DateTimeValue>(); case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_STRING: return new (std::nothrow) MinMaxNumFunc<StringValue>(); + default: DCHECK(false) << "Invalid type."; } - return NULL; + + return nullptr; } // PrimitiveType->TExprNodeType @@ -331,6 +333,8 @@ TTypeDesc create_type_desc(PrimitiveType type) { TScalarType scalarType; scalarType.__set_type(to_thrift(type)); scalarType.__set_len(-1); + scalarType.__set_precision(-1); + scalarType.__set_scale(-1); node_type.back().__set_scalar_type(scalarType); type_desc.__set_types(node_type); return type_desc; @@ -472,18 +476,15 @@ public: return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); } default: - DCHECK(false); return Status::InvalidArgument("Unknown Filter type"); } return Status::OK(); } - void insert(void* data) { + void insert(const void* data) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - if (data != nullptr) { - _hybrid_set->insert(data); - } + _hybrid_set->insert(data); break; } case RuntimeFilterType::MINMAX_FILTER: { @@ -491,7 +492,6 @@ public: break; } case RuntimeFilterType::BLOOM_FILTER: { - DCHECK(_bloomfilter_func != nullptr); _bloomfilter_func->insert(data); break; } @@ -747,12 +747,12 @@ Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPo return (*res)->init_with_desc(desc, node_id); } -void IRuntimeFilter::insert(void* data) { +void IRuntimeFilter::insert(const void* data) { DCHECK(is_producer()); _wrapper->insert(data); } -Status IRuntimeFilter::publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx) { +Status IRuntimeFilter::publish() { DCHECK(is_producer()); if (_has_local_target) { IRuntimeFilter* consumer_filter = nullptr; @@ -1054,74 +1054,4 @@ Status IRuntimeFilter::consumer_close() { RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default; RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default; -Status RuntimeFilterSlots::init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker, - int64_t hash_table_size) { - DCHECK(_probe_expr_context.size() == _build_expr_context.size()); - - // runtime filter effect stragety - // 1. we will ignore IN filter when hash_table_size is too big - // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size - // is too small and IN filter has effect - - std::map<int, bool> has_in_filter; - - auto ignore_filter = [state](int filter_id) { - IRuntimeFilter* consumer_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter); - DCHECK(consumer_filter != nullptr); - consumer_filter->set_ignored(); - consumer_filter->signal(); - }; - - for (auto& filter_desc : _runtime_filter_descs) { - IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, - &runtime_filter)); - DCHECK(runtime_filter != nullptr); - DCHECK(runtime_filter->expr_order() >= 0); - DCHECK(runtime_filter->expr_order() < _probe_expr_context.size()); - - if (runtime_filter->type() == RuntimeFilterType::IN_FILTER && - hash_table_size >= state->runtime_filter_max_in_num()) { - ignore_filter(filter_desc.filter_id); - continue; - } - if (has_in_filter[runtime_filter->expr_order()] && !runtime_filter->has_remote_target() && - runtime_filter->type() != RuntimeFilterType::IN_FILTER && - hash_table_size < state->runtime_filter_max_in_num()) { - ignore_filter(filter_desc.filter_id); - continue; - } - has_in_filter[runtime_filter->expr_order()] = - (runtime_filter->type() == RuntimeFilterType::IN_FILTER); - _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter); - } - - return Status::OK(); -} - -void RuntimeFilterSlots::ready_for_publish() { - for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { - filter->ready_for_publish(); - } - } -} - -void RuntimeFilterSlots::publish(HashJoinNode* hash_join_node) { - for (int i = 0; i < _probe_expr_context.size(); ++i) { - auto iter = _runtime_filters.find(i); - if (iter != _runtime_filters.end()) { - for (auto filter : iter->second) { - filter->publish(hash_join_node, _probe_expr_context[i]); - } - } - } - for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { - filter->publish_finally(); - } - } -} - } // namespace doris diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b182c42..809c132 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -120,11 +120,11 @@ public: // insert data to build filter // only used for producer - void insert(void* data); + void insert(const void* data); // publish filter // push filter to remote node or push down it to scan_node - Status publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx); + Status publish(); void publish_finally(); @@ -209,7 +209,6 @@ protected: static Status _create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper); -protected: RuntimeState* _state; MemTracker* _mem_tracker; ObjectPool* _pool; @@ -280,47 +279,6 @@ private: WrapperPtr _wrapper; }; -/// this class used in a hash join node -/// Provide a unified interface for other classes -class RuntimeFilterSlots { -public: - RuntimeFilterSlots(const std::vector<ExprContext*>& prob_expr_ctxs, - const std::vector<ExprContext*>& build_expr_ctxs, - const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) - : _probe_expr_context(prob_expr_ctxs), - _build_expr_context(build_expr_ctxs), - _runtime_filter_descs(runtime_filter_descs) {} - - Status init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker, - int64_t hash_table_size); - - void insert(TupleRow* row) { - for (int i = 0; i < _build_expr_context.size(); ++i) { - auto iter = _runtime_filters.find(i); - if (iter != _runtime_filters.end()) { - void* val = _build_expr_context[i]->get_value(row); - if (val != nullptr) { - for (auto filter : iter->second) { - filter->insert(val); - } - } - } - } - } - - // should call this method after insert - void ready_for_publish(); - // publish runtime filter - void publish(HashJoinNode* hash_join_node); - -private: - const std::vector<ExprContext*>& _probe_expr_context; - const std::vector<ExprContext*>& _build_expr_context; - const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs; - // prob_contition index -> [IRuntimeFilter] - std::map<int, std::list<IRuntimeFilter*>> _runtime_filters; -}; - } // namespace doris #endif diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h new file mode 100644 index 0000000..7f0957a --- /dev/null +++ b/be/src/exprs/runtime_filter_slots.h @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/runtime_filter.h" +#include "runtime/runtime_filter_mgr.h" +#include "runtime/runtime_state.h" + +namespace doris { + +// this class used in a hash join node +// Provide a unified interface for other classes +template <typename ExprCtxType> +class RuntimeFilterSlotsBase { +public: + RuntimeFilterSlotsBase(const std::vector<ExprCtxType*>& prob_expr_ctxs, + const std::vector<ExprCtxType*>& build_expr_ctxs, + const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) + : _probe_expr_context(prob_expr_ctxs), + _build_expr_context(build_expr_ctxs), + _runtime_filter_descs(runtime_filter_descs) {} + + Status init(RuntimeState* state, int64_t hash_table_size) { + DCHECK(_probe_expr_context.size() == _build_expr_context.size()); + + // runtime filter effect stragety + // 1. we will ignore IN filter when hash_table_size is too big + // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size + // is too small and IN filter has effect + + std::map<int, bool> has_in_filter; + + auto ignore_filter = [state](int filter_id) { + IRuntimeFilter* consumer_filter = nullptr; + state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter); + DCHECK(consumer_filter != nullptr); + consumer_filter->set_ignored(); + consumer_filter->signal(); + }; + + for (auto& filter_desc : _runtime_filter_descs) { + IRuntimeFilter* runtime_filter = nullptr; + RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, + &runtime_filter)); + DCHECK(runtime_filter != nullptr); + DCHECK(runtime_filter->expr_order() >= 0); + DCHECK(runtime_filter->expr_order() < _probe_expr_context.size()); + + // do not create 'in filter' when hash_table size over limit + bool over_max_in_num = (hash_table_size >= state->runtime_filter_max_in_num()); + + bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER); + + // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created + bool pass_not_in = (has_in_filter[runtime_filter->expr_order()] && + !runtime_filter->has_remote_target()); + + if (over_max_in_num == is_in_filter && (is_in_filter || pass_not_in)) { + ignore_filter(filter_desc.filter_id); + continue; + } + + has_in_filter[runtime_filter->expr_order()] = + (runtime_filter->type() == RuntimeFilterType::IN_FILTER); + _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter); + } + + return Status::OK(); + } + + void insert(TupleRow* row) { + for (int i = 0; i < _build_expr_context.size(); ++i) { + auto iter = _runtime_filters.find(i); + if (iter != _runtime_filters.end()) { + void* val = _build_expr_context[i]->get_value(row); + if (val != nullptr) { + for (auto filter : iter->second) { + filter->insert(val); + } + } + } + } + } + + // should call this method after insert + void ready_for_publish() { + for (auto& pair : _runtime_filters) { + for (auto filter : pair.second) { + filter->ready_for_publish(); + } + } + } + // publish runtime filter + void publish() { + for (int i = 0; i < _probe_expr_context.size(); ++i) { + auto iter = _runtime_filters.find(i); + if (iter != _runtime_filters.end()) { + for (auto filter : iter->second) { + filter->publish(); + } + } + } + for (auto& pair : _runtime_filters) { + for (auto filter : pair.second) { + filter->publish_finally(); + } + } + } + + bool empty() { return !_runtime_filters.size(); } + +private: + const std::vector<ExprCtxType*>& _probe_expr_context; + const std::vector<ExprCtxType*>& _build_expr_context; + const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs; + // prob_contition index -> [IRuntimeFilter] + std::map<int, std::list<IRuntimeFilter*>> _runtime_filters; +}; + +using RuntimeFilterSlots = RuntimeFilterSlotsBase<ExprContext>; + +} // namespace doris diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index cdafb2c..36113a5 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -242,6 +242,10 @@ struct PrimitiveTypeTraits<TYPE_FLOAT> { using CppType = float; }; template <> +struct PrimitiveTypeTraits<TYPE_TIME> { + using CppType = double; +}; +template <> struct PrimitiveTypeTraits<TYPE_DOUBLE> { using CppType = double; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org