github-actions[bot] commented on code in PR #31067:
URL: https://github.com/apache/doris/pull/31067#discussion_r1494114707


##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -89,28 +89,72 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
         }
     }
 
-    // TODO: union the remote opt and global two case as one case to one judge
-    bool remote_opt_or_global = (desc.__isset.opt_remote_rf && 
desc.opt_remote_rf) || is_global;
-
     if (!has_exist) {
         IRuntimeFilter* filter;
-        RETURN_IF_ERROR(IRuntimeFilter::create(
-                _state, remote_opt_or_global ? _state->obj_pool() : &_pool, 
&desc, &options,
-                RuntimeFilterRole::CONSUMER, node_id, &filter, 
build_bf_exactly, is_global));
+        RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+                                               RuntimeFilterRole::CONSUMER, 
node_id, &filter,
+                                               build_bf_exactly, 
need_local_merge));
         _consumer_map[key].emplace_back(node_id, filter);
         *consumer_filter = filter;
-    } else if (!remote_opt_or_global) {
+    } else if (!need_local_merge) {
         return Status::InvalidArgument("filter has registered");
     }
 
     return Status::OK();
 }
 
+Status RuntimeFilterMgr::register_local_merge_producer_filter(
+        const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& 
options,
+        doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) {
+    SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
+    int32_t key = desc.filter_id;
+
+    decltype(_local_merge_producer_map.end()) iter;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        iter = _local_merge_producer_map.find(key);
+        if (iter == _local_merge_producer_map.end()) {
+            auto [new_iter, _] = _local_merge_producer_map.emplace(key, 
LocalMergeFilters {});
+            iter = new_iter;
+        }
+    }
+
+    DCHECK(_state != nullptr);
+    RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+                                           RuntimeFilterRole::PRODUCER, -1, 
producer_filter,
+                                           build_bf_exactly, true));
+    {
+        std::lock_guard<std::mutex> l(*iter->second.lock);
+        if (iter->second.filters.empty()) {
+            IRuntimeFilter* merge_filter = nullptr;
+            RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, 
&options,
+                                                   
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
+                                                   build_bf_exactly, true));
+            iter->second.filters.emplace_back(merge_filter);
+        }
+        iter->second.merge_time++;
+        iter->second.filters.emplace_back(*producer_filter);
+    }
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_local_merge_producer_filters(

Review Comment:
   warning: method 'get_local_merge_producer_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/runtime_filter_mgr.h:91:
   ```diff
   -     Status get_local_merge_producer_filters(int filter_id, 
LocalMergeFilters** local_merge_filters);
   +     static Status get_local_merge_producer_filters(int filter_id, 
LocalMergeFilters** local_merge_filters);
   ```
   



##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -954,33 +954,34 @@ void IRuntimeFilter::insert_batch(const 
vectorized::ColumnPtr column, size_t sta
     _wrapper->insert_batch(column, start);
 }
 
-Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, 
int* merged_num) {
-    SCOPED_TIMER(_merge_local_rf_timer);
-    std::unique_lock lock(_local_merge_mutex);
-    if (_merged_rf_num == 0) {
-        _wrapper = wrapper;
-    } else {
-        RETURN_IF_ERROR(merge_from(wrapper));
-    }
-    *merged_num = ++_merged_rf_num;
-    return Status::OK();
-}
-
 Status IRuntimeFilter::publish(bool publish_local) {

Review Comment:
   warning: function 'publish' has cognitive complexity of 62 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status IRuntimeFilter::publish(bool publish_local) {
                          ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/exprs/runtime_filter.cpp:958:** nesting level increased to 1
   ```cpp
       auto send_to_remote = [&](IRuntimeFilter* filter) {
                             ^
   ```
   **be/src/exprs/runtime_filter.cpp:961:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:961:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:964:** nesting level increased to 1
   ```cpp
       auto do_local_merge = [&]() {
                             ^
   ```
   **be/src/exprs/runtime_filter.cpp:966:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:966:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:969:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:969:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:971:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           if (local_merge_filters->merge_time == 0) {
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:972:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               if (_has_local_target) {
               ^
   ```
   **be/src/exprs/runtime_filter.cpp:975:** +1, nesting level increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/exprs/runtime_filter.cpp:976:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:976:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                   
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:982:** +1, including nesting penalty of 0, 
nesting level increased to 1
   ```cpp
       if (_need_local_merge && _has_local_target) {
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:982:** +1
   ```cpp
       if (_need_local_merge && _has_local_target) {
                             ^
   ```
   **be/src/exprs/runtime_filter.cpp:983:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(do_local_merge());
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:983:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(do_local_merge());
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:984:** +1, nesting level increased to 1
   ```cpp
       } else if (_has_local_target) {
              ^
   ```
   **be/src/exprs/runtime_filter.cpp:986:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:986:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:993:** +1, nesting level increased to 1
   ```cpp
       } else if (!publish_local) {
              ^
   ```
   **be/src/exprs/runtime_filter.cpp:994:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           if (_is_broadcast_join) {
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:995:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(send_to_remote(this));
               ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:995:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(send_to_remote(this));
               ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:996:** +1, nesting level increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/exprs/runtime_filter.cpp:997:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(do_local_merge());
               ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:997:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(do_local_merge());
               ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:999:** +1, nesting level increased to 1
   ```cpp
       } else {
         ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to