This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6d7d1976064c4fe8aa2ce3926b07fe965fe78977 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Feb 8 09:59:29 2024 +0800 opt the rf code and remove rf unless code (#30861) --- be/src/exprs/runtime_filter.cpp | 110 +++++++++++++--------------- be/src/exprs/runtime_filter.h | 20 +---- be/src/exprs/runtime_filter_slots.h | 5 +- be/src/pipeline/exec/datagen_operator.cpp | 8 +- be/src/runtime/fragment_mgr.cpp | 8 +- be/src/runtime/runtime_filter_mgr.cpp | 69 +++++++---------- be/src/runtime/runtime_filter_mgr.h | 10 +-- be/src/vec/exec/runtime_filter_consumer.cpp | 15 ++-- be/src/vec/exec/vdata_gen_scan_node.cpp | 8 +- 9 files changed, 97 insertions(+), 156 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 06d1c452fdd..47157cf74d2 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -366,11 +366,9 @@ public: BloomFilterFuncBase* get_bloomfilter() const { return _context.bloom_filter_func.get(); } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { + DCHECK(!is_ignored()); switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - if (_is_ignored_in_filter) { - break; - } _context.hybrid_set->insert_fixed_len(column, start); break; } @@ -465,14 +463,15 @@ public: switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - if (_is_ignored_in_filter) { + // only in filter can set ignore in merge time + if (_ignored) { break; - } else if (wrapper->_is_ignored_in_filter) { + } else if (wrapper->_ignored) { VLOG_DEBUG << " ignore merge runtime filter(in filter id " << _filter_id - << ") because: " << wrapper->get_ignored_in_filter_msg(); + << ") because: " << wrapper->ignored_msg(); - _is_ignored_in_filter = true; - _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; + _ignored = true; + _ignored_msg = wrapper->_ignored_msg; // release in filter _context.hybrid_set.reset(); break; @@ -480,17 +479,11 @@ public: // try insert set _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { -#ifdef VLOG_DEBUG_IS_ON - std::stringstream msg; - msg << " ignore merge runtime filter(in filter id " << _filter_id - << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" - << _max_in_num << ")"; - _ignored_in_filter_msg = std::string(msg.str()); -#else - _ignored_in_filter_msg = std::string("ignored"); -#endif - _is_ignored_in_filter = true; - + _ignored_msg = fmt::format( + " ignore merge runtime filter(in filter id {})" + "because: in_num({}) >= max_in_num({})", + _filter_id, _context.hybrid_set->size(), _max_in_num); + _ignored = true; // release in filter _context.hybrid_set.reset(); } @@ -520,10 +513,10 @@ public: if (real_filter_type == RuntimeFilterType::IN_FILTER) { if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in - CHECK(!wrapper->_is_ignored_in_filter) + CHECK(!wrapper->_ignored) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " - << wrapper->get_ignored_in_filter_msg(); + << wrapper->ignored_msg(); _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id @@ -540,10 +533,10 @@ public: } } else { if (other_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in - CHECK(!wrapper->_is_ignored_in_filter) + CHECK(!wrapper->_ignored) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " - << wrapper->get_ignored_in_filter_msg(); + << wrapper->ignored_msg(); wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get()); // bloom filter merge bloom filter } else { @@ -563,8 +556,8 @@ public: if (in_filter->has_ignored_msg()) { VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because: " << in_filter->ignored_msg(); - _is_ignored_in_filter = true; - _ignored_in_filter_msg = in_filter->ignored_msg(); + _ignored = true; + _ignored_msg = in_filter->ignored_msg(); return Status::OK(); } @@ -893,9 +886,9 @@ public: bool is_bloomfilter() const { return _is_bloomfilter; } - bool is_ignored_in_filter() const { return _is_ignored_in_filter; } + bool is_ignored() const { return _ignored; } - const std::string& get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } + const std::string& ignored_msg() const { return _ignored_msg; } void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, @@ -938,8 +931,8 @@ private: vectorized::SharedRuntimeFilterContext _context; bool _is_bloomfilter = false; - bool _is_ignored_in_filter = false; - std::string _ignored_in_filter_msg; + bool _ignored = false; + std::string _ignored_msg; uint32_t _filter_id; }; @@ -1022,7 +1015,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr std::vector<vectorized::VExprSPtr>& push_exprs, bool is_late_arrival) { DCHECK(is_consumer()); - if (_is_ignored) { + if (_wrapper->is_ignored()) { return Status::OK(); } if (!is_late_arrival) { @@ -1153,11 +1146,16 @@ void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim } void IRuntimeFilter::set_ignored(const std::string& msg) { - _is_ignored = true; - if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { - _wrapper->_is_ignored_in_filter = true; - _wrapper->_ignored_in_filter_msg = msg; - } + _wrapper->_ignored = true; + _wrapper->_ignored_msg = msg; +} + +std::string IRuntimeFilter::_format_status() const { + return fmt::format( + "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, HasRemoteTarget = {}, " + "HasLocalTarget = {}]", + _is_push_down, _get_explain_state_string(), _wrapper->is_ignored(), _has_remote_target, + _has_local_target); } BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { @@ -1223,7 +1221,6 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue } _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); - return _wrapper->init(¶ms); } @@ -1348,14 +1345,11 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { - if (!_is_ignored && wrapper->is_ignored_in_filter()) { - set_ignored(wrapper->get_ignored_in_filter_msg()); + if (!_wrapper->is_ignored() && wrapper->is_ignored()) { + set_ignored(wrapper->ignored_msg()); } auto origin_type = _wrapper->get_real_type(); Status status = _wrapper->merge(wrapper); - if (!_is_ignored && _wrapper->is_ignored_in_filter()) { - set_ignored(_wrapper->get_ignored_in_filter_msg()); - } if (origin_type != _wrapper->get_real_type()) { update_runtime_filter_type_to_profile(); } @@ -1406,8 +1400,8 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { auto column_type = _wrapper->column_type(); filter->set_column_type(to_proto(column_type)); - if (_is_ignored) { - filter->set_ignored_msg(_ignored_msg); + if (_wrapper->is_ignored()) { + filter->set_ignored_msg(_wrapper->ignored_msg()); return; } @@ -1704,22 +1698,20 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex auto real_filter_type = get_real_type(); switch (real_filter_type) { case RuntimeFilterType::IN_FILTER: { - if (!_is_ignored_in_filter) { - TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); - type_desc.__set_is_nullable(false); - TExprNode node; - node.__set_type(type_desc); - node.__set_node_type(TExprNodeType::IN_PRED); - node.in_predicate.__set_is_not_in(false); - node.__set_opcode(TExprOpcode::FILTER_IN); - node.__set_is_nullable(false); - - auto in_pred = vectorized::VDirectInPredicate::create_shared(node); - in_pred->set_filter(_context.hybrid_set); - in_pred->add_child(probe_ctx->root()); - auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred); - container.push_back(wrapper); - } + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + type_desc.__set_is_nullable(false); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::IN_PRED); + node.in_predicate.__set_is_not_in(false); + node.__set_opcode(TExprOpcode::FILTER_IN); + node.__set_is_nullable(false); + + auto in_pred = vectorized::VDirectInPredicate::create_shared(node); + in_pred->set_filter(_context.hybrid_set); + in_pred->add_child(probe_ctx->root()); + auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred); + container.push_back(wrapper); break; } case RuntimeFilterType::MIN_FILTER: { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 9a7c1a2ae3c..f68c0ec250c 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -199,8 +199,6 @@ public: _rf_state_atomic(RuntimeFilterState::NOT_READY), _role(RuntimeFilterRole::PRODUCER), _expr_order(-1), - _always_true(false), - _is_ignored(false), registration_time_(MonotonicMillis()), _wait_infinitely(_state->runtime_filter_wait_infinitely), _rf_wait_time_ms(_state->runtime_filter_wait_time_ms), @@ -381,15 +379,9 @@ protected: void _set_push_down() { _is_push_down = true; } - std::string _format_status() { - return fmt::format( - "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, HasRemoteTarget = {}, " - "HasLocalTarget = {}]", - _is_push_down, _get_explain_state_string(), _is_ignored, _has_remote_target, - _has_local_target); - } + std::string _format_status() const; - std::string _get_explain_state_string() { + std::string _get_explain_state_string() const { if (_enable_pipeline_exec) { return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY ? "READY" @@ -430,16 +422,8 @@ protected: bool _is_push_down = false; - // if set always_true = true - // this filter won't filter any data - bool _always_true; - TExpr _probe_expr; - // Indicate whether runtime filter expr has been ignored - bool _is_ignored; - std::string _ignored_msg; - struct RPCContext; std::shared_ptr<RPCContext> _rpc_context; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 8f5dab22f8c..e1a1f871b95 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -48,11 +48,14 @@ public: std::map<int, bool> has_in_filter; auto ignore_local_filter = [&](int filter_id) { + // Now pipeline x have bug in ignore, after fix the problem enable ignore logic in pipeline x if (_is_global) { return Status::OK(); } + auto runtime_filter_mgr = state->runtime_filter_mgr(); + std::vector<IRuntimeFilter*> filters; - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filters(filter_id, filters)); + RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters)); if (filters.empty()) { throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}", filter_id); diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 46a0dacb78e..916ce62aa26 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -102,14 +102,10 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { IRuntimeFilter* runtime_filter = nullptr; if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), p.node_id(), false)); - RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, p.node_id(), &runtime_filter)); + filter_desc, state->query_options(), p.node_id(), &runtime_filter, false)); } else { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), p.node_id(), false)); - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, p.node_id(), &runtime_filter)); + filter_desc, state->query_options(), p.node_id(), &runtime_filter, false)); } runtime_filter->init_profile(_runtime_profile.get()); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3a9a7c2a88d..2eff857a3d3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -635,11 +635,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline); _set_scan_concurrency(params, query_ctx.get()); - - bool is_pipeline = false; - if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) { - is_pipeline = true; - } + const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, Params>; if (params.__isset.workload_groups && !params.workload_groups.empty()) { uint64_t tg_id = params.workload_groups[0].id; @@ -1333,7 +1329,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, int64_t start_apply = MonotonicMillis(); const auto& fragment_instance_ids = request->fragment_instance_ids(); - if (fragment_instance_ids.size() > 0) { + if (!fragment_instance_ids.empty()) { UniqueId fragment_instance_id = fragment_instance_ids[0]; TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 3a01368b583..8281ae2ea6d 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -69,23 +69,6 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter return Status::OK(); } -Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_id, - IRuntimeFilter** consumer_filter) { - std::lock_guard<std::mutex> l(_lock); - auto iter = _consumer_map.find(filter_id); - if (iter != _consumer_map.cend()) { - for (auto& item : iter->second) { - if (item.node_id == node_id) { - *consumer_filter = item.filter; - return Status::OK(); - } - } - } - - return Status::InvalidArgument("unknown filter, filter_id: {}, node_id: {}, role: CONSUMER", - filter_id, node_id); -} - Status RuntimeFilterMgr::get_consume_filters(const int filter_id, std::vector<IRuntimeFilter*>& consumer_filters) { std::lock_guard<std::mutex> l(_lock); @@ -101,16 +84,17 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, + IRuntimeFilter** consumer_filter, bool build_bf_exactly, bool is_global) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; + bool has_exist = false; std::lock_guard<std::mutex> l(_lock); - auto iter = _consumer_map.find(key); - bool has_exist = false; - if (iter != _consumer_map.end()) { + if (auto iter = _consumer_map.find(key); iter != _consumer_map.end()) { for (auto holder : iter->second) { if (holder.node_id == node_id) { + *consumer_filter = holder.filter; has_exist = true; } } @@ -128,6 +112,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc _state, remote_opt_or_global ? _state->obj_pool() : &_pool, &desc, &options, RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly, is_global)); _consumer_map[key].emplace_back(node_id, filter); + *consumer_filter = filter; } else if (!remote_opt_or_global) { return Status::InvalidArgument("filter has registered"); } @@ -201,7 +186,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; - // LOG(INFO) << "entity filter id:" << filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1, false)); _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique<std::mutex>()}); @@ -224,7 +208,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; - // LOG(INFO) << "entity filter id:" << filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options)); _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique<std::mutex>()}); return Status::OK(); @@ -281,7 +264,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ butil::IOBufAsZeroCopyInputStream* attach_data, bool opt_remote_rf) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - std::shared_ptr<RuntimeFilterCntlVal> cntVal; + std::shared_ptr<RuntimeFilterCntlVal> cnt_val; int merged_size = 0; int64_t merge_time = 0; int64_t start_merge = MonotonicMillis(); @@ -296,39 +279,39 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ std::to_string(request->filter_id())); } } - cntVal = iter->second.first; + cnt_val = iter->second.first; { std::lock_guard<std::mutex> l(*iter->second.second); // Skip the other broadcast join runtime filter - if (cntVal->arrive_id.size() == 1 && cntVal->runtime_filter_desc.is_broadcast_join) { + if (cnt_val->arrive_id.size() == 1 && cnt_val->runtime_filter_desc.is_broadcast_join) { return Status::OK(); } MergeRuntimeFilterParams params(request, attach_data); - ObjectPool* pool = cntVal->pool.get(); + ObjectPool* pool = cnt_val->pool.get(); RuntimeFilterWrapperHolder holder; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, ¶ms, pool, holder.getHandle())); - RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get())); - cntVal->arrive_id.insert(UniqueId(request->fragment_instance_id())); - merged_size = cntVal->arrive_id.size(); + RETURN_IF_ERROR(cnt_val->filter->merge_from(holder.getHandle()->get())); + cnt_val->arrive_id.insert(UniqueId(request->fragment_instance_id())); + merged_size = cnt_val->arrive_id.size(); // TODO: avoid log when we had acquired a lock - VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size; - DCHECK_LE(merged_size, cntVal->producer_size); - cntVal->merge_time += (MonotonicMillis() - start_merge); - merge_time = cntVal->merge_time; + VLOG_ROW << "merge size:" << merged_size << ":" << cnt_val->producer_size; + DCHECK_LE(merged_size, cnt_val->producer_size); + cnt_val->merge_time += (MonotonicMillis() - start_merge); + merge_time = cnt_val->merge_time; } - if (merged_size == cntVal->producer_size) { + if (merged_size == cnt_val->producer_size) { if (opt_remote_rf) { - DCHECK_GT(cntVal->targetv2_info.size(), 0); - DCHECK(cntVal->filter->is_bloomfilter()); + DCHECK_GT(cnt_val->targetv2_info.size(), 0); + DCHECK(cnt_val->filter->is_bloomfilter()); // Optimize merging phase iff: // 1. All BE has been upgraded (e.g. _opt_remote_rf) - // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0) + // 2. FE has been upgraded (e.g. cnt_val->targetv2_info.size() > 0) // 3. This filter is bloom filter (only bloom filter should be used for merging) using PPublishFilterRpcContext = AsyncRPCContext<PPublishFilterRequestV2, PPublishFilterResponse>; std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts; - rpc_contexts.reserve(cntVal->targetv2_info.size()); + rpc_contexts.reserve(cnt_val->targetv2_info.size()); butil::IOBuf request_attachment; @@ -337,13 +320,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len)); + RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); if (data != nullptr && len > 0) { request_attachment.append(data, len); has_attachment = true; } - std::vector<TRuntimeFilterTargetParamsV2>& targets = cntVal->targetv2_info; + std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val->targetv2_info; for (size_t i = 0; i < targets.size(); i++) { rpc_contexts.emplace_back(new PPublishFilterRpcContext); size_t cur = rpc_contexts.size() - 1; @@ -393,7 +376,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ using PPublishFilterRpcContext = AsyncRPCContext<PPublishFilterRequest, PPublishFilterResponse>; std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts; - rpc_contexts.reserve(cntVal->target_info.size()); + rpc_contexts.reserve(cnt_val->target_info.size()); butil::IOBuf request_attachment; @@ -402,13 +385,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len)); + RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); if (data != nullptr && len > 0) { request_attachment.append(data, len); has_attachment = true; } - std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info; + std::vector<TRuntimeFilterTargetParams>& targets = cnt_val->target_info; for (size_t i = 0; i < targets.size(); i++) { rpc_contexts.emplace_back(new PPublishFilterRpcContext); size_t cur = rpc_contexts.size() - 1; diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 24ab78464db..de55e34fc1e 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -71,17 +71,14 @@ public: ~RuntimeFilterMgr() = default; - Status get_consume_filter(const int filter_id, const int node_id, - IRuntimeFilter** consumer_filter); - Status get_consume_filters(const int filter_id, std::vector<IRuntimeFilter*>& consumer_filters); Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - int node_id, bool build_bf_exactly = false, - bool is_global = false); + int node_id, IRuntimeFilter** consumer_filter, + bool build_bf_exactly = false, bool is_global = false); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, bool build_bf_exactly = false, bool is_global = false, int parallel_tasks = 0); @@ -149,9 +146,6 @@ public: std::shared_ptr<ObjectPool> pool; }; -public: - RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[id].first.get(); } - private: Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index b7146c7c6a7..5e2d90bf62d 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -58,20 +58,17 @@ Status RuntimeFilterConsumer::_register_runtime_filter(bool is_global) { // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) // 2. This filter is bloom filter (only bloom filter should be used for merging) RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, false, is_global)); - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, _filter_id, &runtime_filter)); + filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, + is_global)); } else if (is_global) { // For pipelineX engine, runtime filter is global iff data distribution is ignored. RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, false, is_global)); - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, _filter_id, &runtime_filter)); + filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, + is_global)); } else { RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, false, is_global)); - RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, _filter_id, &runtime_filter)); + filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, + is_global)); } _runtime_filter_ctxs.emplace_back(runtime_filter); _runtime_filter_ready_flag.emplace_back(false); diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp index 6c5db2c0161..42f6250a030 100644 --- a/be/src/vec/exec/vdata_gen_scan_node.cpp +++ b/be/src/vec/exec/vdata_gen_scan_node.cpp @@ -83,14 +83,10 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* state) { IRuntimeFilter* runtime_filter = nullptr; if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), id(), false)); - RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, id(), &runtime_filter)); + filter_desc, state->query_options(), id(), &runtime_filter, false)); } else { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), id(), false)); - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, - id(), &runtime_filter)); + filter_desc, state->query_options(), id(), &runtime_filter, false)); } runtime_filter->init_profile(_runtime_profile.get()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org