github-actions[bot] commented on code in PR #31067: URL: https://github.com/apache/doris/pull/31067#discussion_r1494114707
########## be/src/runtime/runtime_filter_mgr.cpp: ########## @@ -89,28 +89,72 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } } - // TODO: union the remote opt and global two case as one case to one judge - bool remote_opt_or_global = (desc.__isset.opt_remote_rf && desc.opt_remote_rf) || is_global; - if (!has_exist) { IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create( - _state, remote_opt_or_global ? _state->obj_pool() : &_pool, &desc, &options, - RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly, is_global)); + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::CONSUMER, node_id, &filter, + build_bf_exactly, need_local_merge)); _consumer_map[key].emplace_back(node_id, filter); *consumer_filter = filter; - } else if (!remote_opt_or_global) { + } else if (!need_local_merge) { return Status::InvalidArgument("filter has registered"); } return Status::OK(); } +Status RuntimeFilterMgr::register_local_merge_producer_filter( + const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, + doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) { + SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); + int32_t key = desc.filter_id; + + decltype(_local_merge_producer_map.end()) iter; + { + std::lock_guard<std::mutex> l(_lock); + iter = _local_merge_producer_map.find(key); + if (iter == _local_merge_producer_map.end()) { + auto [new_iter, _] = _local_merge_producer_map.emplace(key, LocalMergeFilters {}); + iter = new_iter; + } + } + + DCHECK(_state != nullptr); + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::PRODUCER, -1, producer_filter, + build_bf_exactly, true)); + { + std::lock_guard<std::mutex> l(*iter->second.lock); + if (iter->second.filters.empty()) { + IRuntimeFilter* merge_filter = nullptr; + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::PRODUCER, -1, &merge_filter, + build_bf_exactly, true)); + iter->second.filters.emplace_back(merge_filter); + } + iter->second.merge_time++; + iter->second.filters.emplace_back(*producer_filter); + } + return Status::OK(); +} + +Status RuntimeFilterMgr::get_local_merge_producer_filters( Review Comment: warning: method 'get_local_merge_producer_filters' can be made static [readability-convert-member-functions-to-static] be/src/runtime/runtime_filter_mgr.h:91: ```diff - Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); + static Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); ``` ########## be/src/exprs/runtime_filter.cpp: ########## @@ -954,33 +954,34 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta _wrapper->insert_batch(column, start); } -Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num) { - SCOPED_TIMER(_merge_local_rf_timer); - std::unique_lock lock(_local_merge_mutex); - if (_merged_rf_num == 0) { - _wrapper = wrapper; - } else { - RETURN_IF_ERROR(merge_from(wrapper)); - } - *merged_num = ++_merged_rf_num; - return Status::OK(); -} - Status IRuntimeFilter::publish(bool publish_local) { Review Comment: warning: function 'publish' has cognitive complexity of 62 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status IRuntimeFilter::publish(bool publish_local) { ^ ``` <details> <summary>Additional context</summary> **be/src/exprs/runtime_filter.cpp:958:** nesting level increased to 1 ```cpp auto send_to_remote = [&](IRuntimeFilter* filter) { ^ ``` **be/src/exprs/runtime_filter.cpp:961:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:961:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:964:** nesting level increased to 1 ```cpp auto do_local_merge = [&]() { ^ ``` **be/src/exprs/runtime_filter.cpp:966:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters( ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:966:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters( ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:969:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:969:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:971:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (local_merge_filters->merge_time == 0) { ^ ``` **be/src/exprs/runtime_filter.cpp:972:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (_has_local_target) { ^ ``` **be/src/exprs/runtime_filter.cpp:975:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/exprs/runtime_filter.cpp:976:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0])); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:976:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0])); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:982:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_need_local_merge && _has_local_target) { ^ ``` **be/src/exprs/runtime_filter.cpp:982:** +1 ```cpp if (_need_local_merge && _has_local_target) { ^ ``` **be/src/exprs/runtime_filter.cpp:983:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(do_local_merge()); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:983:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(do_local_merge()); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:984:** +1, nesting level increased to 1 ```cpp } else if (_has_local_target) { ^ ``` **be/src/exprs/runtime_filter.cpp:986:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:986:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:993:** +1, nesting level increased to 1 ```cpp } else if (!publish_local) { ^ ``` **be/src/exprs/runtime_filter.cpp:994:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_is_broadcast_join) { ^ ``` **be/src/exprs/runtime_filter.cpp:995:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(send_to_remote(this)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:995:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(send_to_remote(this)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:996:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/exprs/runtime_filter.cpp:997:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(do_local_merge()); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/exprs/runtime_filter.cpp:997:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(do_local_merge()); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/exprs/runtime_filter.cpp:999:** +1, nesting level increased to 1 ```cpp } else { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org