This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 711b156a78dcebf7c39aede23f335fdd05b62b87 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Jan 31 16:29:52 2024 +0800 [Refactor][Rf] remove unless code in RF (#30597) --- be/src/exprs/bloom_filter_func.h | 3 +- be/src/exprs/runtime_filter.cpp | 95 +++++++++++-------------------- be/src/exprs/runtime_filter.h | 6 +- be/src/exprs/runtime_filter_slots.h | 13 ++--- be/src/exprs/runtime_filter_slots_cross.h | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 2 +- 6 files changed, 47 insertions(+), 74 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index ed4205a7e0d..84e6eba1e44 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -169,10 +169,9 @@ public: return _bloom_filter->init(data, data_size); } - Status get_data(char** data, int* len) { + void get_data(char** data, int* len) { *data = _bloom_filter->data(); *len = _bloom_filter->size(); - return Status::OK(); } size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 0; } diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index bf1db5ff867..06d1c452fdd 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -67,7 +67,6 @@ namespace doris { // PrimitiveType-> PColumnType -// TODO: use constexpr if we use c++14 PColumnType to_proto(PrimitiveType type) { switch (type) { case TYPE_BOOLEAN: @@ -118,7 +117,6 @@ PColumnType to_proto(PrimitiveType type) { } // PColumnType->PrimitiveType -// TODO: use constexpr if we use c++14 PrimitiveType to_primitive_type(PColumnType type) { switch (type) { case PColumnType::COLUMN_TYPE_BOOL: @@ -281,12 +279,8 @@ class RuntimePredicateWrapper { public: RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* pool, const RuntimeFilterParams* params) - : _state(state), - _be_exec_version(_state->be_exec_version), - _pool(pool), - _column_return_type(params->column_return_type), - _filter_type(params->filter_type), - _filter_id(params->filter_id) {} + : RuntimePredicateWrapper(state, pool, params->column_return_type, params->filter_type, + params->filter_id) {}; // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* pool, @@ -350,7 +344,7 @@ public: insert_to_bloom_filter(bf); } // release in filter - _context.hybrid_set.reset(create_set(_column_return_type)); + _context.hybrid_set.reset(); } Status init_bloom_filter(const size_t build_bf_cardinality) { @@ -475,12 +469,12 @@ public: break; } else if (wrapper->_is_ignored_in_filter) { VLOG_DEBUG << " ignore merge runtime filter(in filter id " << _filter_id - << ") because: " << *(wrapper->get_ignored_in_filter_msg()); + << ") because: " << wrapper->get_ignored_in_filter_msg(); _is_ignored_in_filter = true; _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; // release in filter - _context.hybrid_set.reset(create_set(_column_return_type)); + _context.hybrid_set.reset(); break; } // try insert set @@ -491,14 +485,14 @@ public: msg << " ignore merge runtime filter(in filter id " << _filter_id << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; - _ignored_in_filter_msg = _pool->add(new std::string(msg.str())); + _ignored_in_filter_msg = std::string(msg.str()); #else - _ignored_in_filter_msg = _pool->add(new std::string("ignored")); + _ignored_in_filter_msg = std::string("ignored"); #endif _is_ignored_in_filter = true; // release in filter - _context.hybrid_set.reset(create_set(_column_return_type)); + _context.hybrid_set.reset(); } break; } @@ -529,7 +523,7 @@ public: CHECK(!wrapper->_is_ignored_in_filter) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " - << *(wrapper->get_ignored_in_filter_msg()); + << wrapper->get_ignored_in_filter_msg(); _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id @@ -549,7 +543,7 @@ public: CHECK(!wrapper->_is_ignored_in_filter) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " - << *(wrapper->get_ignored_in_filter_msg()); + << wrapper->get_ignored_in_filter_msg(); wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get()); // bloom filter merge bloom filter } else { @@ -566,14 +560,15 @@ public: } Status assign(const PInFilter* in_filter) { - PrimitiveType type = to_primitive_type(in_filter->column_type()); if (in_filter->has_ignored_msg()) { VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because: " << in_filter->ignored_msg(); _is_ignored_in_filter = true; - _ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg())); + _ignored_in_filter_msg = in_filter->ignored_msg(); return Status::OK(); } + + PrimitiveType type = to_primitive_type(in_filter->column_type()); _context.hybrid_set.reset(create_set(type)); switch (type) { case TYPE_BOOLEAN: { @@ -883,19 +878,15 @@ public: return Status::InvalidArgument("not support!"); } - Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) { - *it = _context.hybrid_set->begin(); - return Status::OK(); - } + HybridSetBase::IteratorBase* get_in_filter_iterator() { return _context.hybrid_set->begin(); } - Status get_bloom_filter_desc(char** data, int* filter_length) { - return _context.bloom_filter_func->get_data(data, filter_length); + void get_bloom_filter_desc(char** data, int* filter_length) { + _context.bloom_filter_func->get_data(data, filter_length); } - Status get_minmax_filter_desc(void** min_data, void** max_data) { + void get_minmax_filter_desc(void** min_data, void** max_data) { *min_data = _context.minmax_func->get_min(); *max_data = _context.minmax_func->get_max(); - return Status::OK(); } PrimitiveType column_type() { return _column_return_type; } @@ -904,7 +895,7 @@ public: bool is_ignored_in_filter() const { return _is_ignored_in_filter; } - std::string* get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } + const std::string& get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, @@ -948,7 +939,7 @@ private: vectorized::SharedRuntimeFilterContext _context; bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; - std::string* _ignored_in_filter_msg = nullptr; + std::string _ignored_in_filter_msg; uint32_t _filter_id; }; @@ -962,13 +953,8 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* poo is_global ? false : build_bf_exactly); } -void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { - context = _wrapper->_context; -} - -Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context) { - _wrapper->_context = context; - return Status::OK(); +vectorized::SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() { + return _wrapper->_context; } void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) { @@ -1170,7 +1156,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) { _is_ignored = true; if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { _wrapper->_is_ignored_in_filter = true; - _wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg)); + _wrapper->_ignored_in_filter_msg = msg; } } @@ -1363,14 +1349,12 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { if (!_is_ignored && wrapper->is_ignored_in_filter()) { - std::string* msg = wrapper->get_ignored_in_filter_msg(); - set_ignored(msg ? *msg : ""); + set_ignored(wrapper->get_ignored_in_filter_msg()); } auto origin_type = _wrapper->get_real_type(); Status status = _wrapper->merge(wrapper); if (!_is_ignored && _wrapper->is_ignored_in_filter()) { - std::string* msg = _wrapper->get_ignored_in_filter_msg(); - set_ignored(msg ? *msg : ""); + set_ignored(_wrapper->get_ignored_in_filter_msg()); } if (origin_type != _wrapper->get_real_type()) { update_runtime_filter_type_to_profile(); @@ -1403,7 +1387,7 @@ Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) { auto in_filter = request->mutable_in_filter(); to_protobuf(in_filter); } else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) { - RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len)); + _wrapper->get_bloom_filter_desc((char**)data, len); DCHECK(data != nullptr); request->mutable_bloom_filter()->set_filter_length(*len); request->mutable_bloom_filter()->set_always_true(false); @@ -1427,8 +1411,7 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { return; } - HybridSetBase::IteratorBase* it; - static_cast<void>(_wrapper->get_in_filter_iterator(&it)); + auto it = _wrapper->get_in_filter_iterator(); DCHECK(it != nullptr); switch (column_type) { @@ -1554,7 +1537,7 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) { void* min_data = nullptr; void* max_data = nullptr; - static_cast<void>(_wrapper->get_minmax_filter_desc(&min_data, &max_data)); + _wrapper->get_minmax_filter_desc(&min_data, &max_data); DCHECK(min_data != nullptr && max_data != nullptr); filter->set_column_type(to_proto(_wrapper->column_type())); @@ -1673,7 +1656,8 @@ bool IRuntimeFilter::is_bloomfilter() { return _wrapper->is_bloomfilter(); } -Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { +template <typename T> +Status IRuntimeFilter::_update_filter(const T* param) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { const PInFilter in_filter = param->request->in_filter(); set_ignored(in_filter.ignored_msg()); @@ -1691,26 +1675,15 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { return Status::OK(); } +Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { + return _update_filter(param); +} + Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply) { - if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { - const PInFilter in_filter = param->request->in_filter(); - set_ignored(in_filter.ignored_msg()); - } - - std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &tmp_wrapper)); - auto origin_type = _wrapper->get_real_type(); - RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get())); - if (origin_type != _wrapper->get_real_type()) { - update_runtime_filter_type_to_profile(); - } - this->signal(); - - _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); _profile->add_info_string("UpdateTime", std::to_string(MonotonicMillis() - start_apply) + " ms"); - return Status::OK(); + return _update_filter(param); } Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index fc324c1c1be..bc487bfe9c9 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -218,8 +218,7 @@ public: bool build_bf_exactly = false, bool is_global = false, int parallel_tasks = 0); - void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); - Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); + vectorized::SharedRuntimeFilterContext& get_shared_context_ref(); void copy_from_other(IRuntimeFilter* other); @@ -367,6 +366,9 @@ protected: void to_protobuf(PInFilter* filter); void to_protobuf(PMinMaxFilter* filter); + template <class T> + Status _update_filter(const T* param); + template <class T> Status serialize_impl(T* request, void** data, int* len); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 4859734a6a4..8f5dab22f8c 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -91,6 +91,10 @@ public: std::sort(sorted_runtime_filter_descs.begin(), sorted_runtime_filter_descs.end(), compare_desc); + // do not create 'in filter' when hash_table size over limit + const auto max_in_num = state->runtime_filter_max_in_num(); + const bool over_max_in_num = (hash_table_size >= max_in_num); + for (auto& filter_desc : sorted_runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, @@ -103,10 +107,6 @@ public: runtime_filter->expr_order(), _build_expr_context.size()); } - // do not create 'in filter' when hash_table size over limit - auto max_in_num = state->runtime_filter_max_in_num(); - bool over_max_in_num = (hash_table_size >= max_in_num); - bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER); if (over_max_in_num && @@ -213,8 +213,7 @@ public: void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { for (auto& it : _runtime_filters) { for (auto& filter : it.second) { - auto& target = context->runtime_filters[filter->filter_id()]; - filter->copy_to_shared_context(target); + context->runtime_filters[filter->filter_id()] = filter->get_shared_context_ref(); } } } @@ -227,7 +226,7 @@ public: if (ret == context->runtime_filters.end()) { return Status::Aborted("invalid runtime filter id: {}", filter_id); } - RETURN_IF_ERROR(filter->copy_from_shared_context(ret->second)); + filter->get_shared_context_ref() = ret->second; } } return Status::OK(); diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h index 76b6085bab9..7b1a2063d15 100644 --- a/be/src/exprs/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter_slots_cross.h @@ -82,7 +82,7 @@ public: return Status::OK(); } - bool empty() { return _runtime_filters.empty(); } + bool empty() const { return _runtime_filters.empty(); } private: const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 29bf22535ed..3a01368b583 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -116,7 +116,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } } - // TODO: make the two case as one case to judge + // TODO: union the remote opt and global two case as one case to one judge bool remote_opt_or_global = (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) || --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org