This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 52c45e38aff4c88670f789db2f275b6d8c1dede1
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Feb 23 14:42:30 2024 +0800

    [Refactor](RF) refactor the profile of rf and pipeline-x support local 
ignore (#31287)
    
    * [Refactor](RF) refactor the profile of rf and pipeline-x support local 
ignore
    
    * fix local merge filter
---
 be/src/agent/heartbeat_server.cpp                  |  2 +-
 be/src/agent/heartbeat_server.h                    |  2 +-
 be/src/exprs/runtime_filter.cpp                    | 76 +++++++---------------
 be/src/exprs/runtime_filter.h                      |  2 +-
 be/src/exprs/runtime_filter_slots.h                |  7 +-
 be/src/pipeline/exec/scan_operator.cpp             |  3 +-
 be/src/pipeline/pipeline_fragment_context.h        |  9 ---
 be/src/pipeline/pipeline_x/dependency.cpp          | 43 ++++++------
 be/src/pipeline/pipeline_x/dependency.h            | 13 ++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  2 -
 be/src/runtime/fragment_mgr.cpp                    | 38 ++++-------
 be/src/runtime/plan_fragment_executor.h            |  8 ---
 be/src/runtime/query_context.h                     |  9 ++-
 be/src/runtime/runtime_filter_mgr.cpp              |  5 +-
 be/src/runtime/runtime_filter_mgr.h                | 11 +---
 be/src/service/backend_options.cpp                 |  4 ++
 be/src/service/backend_options.h                   |  1 +
 be/src/service/doris_main.cpp                      |  1 -
 be/src/vec/exec/runtime_filter_consumer.cpp        |  6 +-
 be/src/vec/exec/scan/vscan_node.cpp                |  3 +-
 20 files changed, 92 insertions(+), 153 deletions(-)

diff --git a/be/src/agent/heartbeat_server.cpp 
b/be/src/agent/heartbeat_server.cpp
index a47a02dc938..a3783a07e07 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -215,8 +215,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
 
     if (master_info.__isset.backend_id) {
         _master_info->__set_backend_id(master_info.backend_id);
+        BackendOptions::set_backend_id(master_info.backend_id);
     }
-
     if (master_info.__isset.frontend_infos) {
         ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
     } else {
diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h
index 928efb5c620..ce7d60c5b97 100644
--- a/be/src/agent/heartbeat_server.h
+++ b/be/src/agent/heartbeat_server.h
@@ -39,7 +39,7 @@ public:
     explicit HeartbeatServer(TMasterInfo* master_info);
     ~HeartbeatServer() override = default;
 
-    virtual void init_cluster_id();
+    void init_cluster_id();
 
     // Master send heartbeat to this server
     //
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1ec66bf2a87..786876cc796 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -462,19 +462,6 @@ public:
 
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            // only in filter can set ignore in merge time
-            if (_ignored) {
-                break;
-            } else if (wrapper->_ignored) {
-                VLOG_DEBUG << " ignore merge runtime filter(in filter id " << 
_filter_id
-                           << ") because: " << wrapper->ignored_msg();
-
-                _ignored = true;
-                _ignored_msg = wrapper->_ignored_msg;
-                // release in filter
-                _context.hybrid_set.reset();
-                break;
-            }
             // try insert set
             _context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
             if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
@@ -1032,8 +1019,8 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
     pquery_id->set_lo(_state->query_id.lo());
 
     auto pfragment_instance_id = 
merge_filter_request->mutable_fragment_instance_id();
-    pfragment_instance_id->set_hi(_state->fragment_instance_id().hi());
-    pfragment_instance_id->set_lo(_state->fragment_instance_id().lo());
+    pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id);
+    pfragment_instance_id->set_lo((int64_t)this);
 
     merge_filter_request->set_filter_id(_filter_id);
     merge_filter_request->set_opt_remote_rf(opt_remote_rf);
@@ -1061,14 +1048,12 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
                                           std::vector<vectorized::VExprSPtr>& 
push_exprs,
                                           bool is_late_arrival) {
     DCHECK(is_consumer());
-    if (_wrapper->is_ignored()) {
-        return Status::OK();
-    }
-    if (!is_late_arrival) {
-        _set_push_down();
+    if (!_wrapper->is_ignored()) {
+        _set_push_down(!is_late_arrival);
+        RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, 
_probe_expr));
     }
     _profile->add_info_string("Info", _format_status());
-    return _wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr);
+    return Status::OK();
 }
 
 bool IRuntimeFilter::await() {
@@ -1202,9 +1187,9 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {
 
 std::string IRuntimeFilter::_format_status() const {
     return fmt::format(
-            "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, 
HasRemoteTarget = {}, "
+            "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, 
HasRemoteTarget = {}, "
             "HasLocalTarget = {}]",
-            _is_push_down, _get_explain_state_string(), 
_wrapper->is_ignored(), _has_remote_target,
+            _is_push_down, _get_explain_state_string(), 
_wrapper->ignored_msg(), _has_remote_target,
             _has_local_target);
 }
 
@@ -1323,11 +1308,7 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParamsV2* param,
 }
 
 void IRuntimeFilter::change_to_bloom_filter() {
-    auto origin_type = _wrapper->get_real_type();
     _wrapper->change_to_bloom_filter();
-    if (origin_type != _wrapper->get_real_type()) {
-        update_runtime_filter_type_to_profile();
-    }
 }
 
 Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) {
@@ -1367,32 +1348,24 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
 void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
     if (_profile_init) {
         parent_profile->add_child(_profile.get(), true, nullptr);
-        return;
-    }
-    _profile_init = true;
-    parent_profile->add_child(_profile.get(), true, nullptr);
-    _profile->add_info_string("Info", _format_status());
-    if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-        update_runtime_filter_type_to_profile();
+    } else {
+        _profile_init = true;
+        parent_profile->add_child(_profile.get(), true, nullptr);
+        _profile->add_info_string("Info", _format_status());
     }
 }
 
 void IRuntimeFilter::update_runtime_filter_type_to_profile() {
-    if (_profile != nullptr) {
-        _profile->add_info_string("RealRuntimeFilterType", 
to_string(_wrapper->get_real_type()));
-    }
+    _profile->add_info_string("RealRuntimeFilterType", 
to_string(_wrapper->get_real_type()));
 }
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
-    if (!_wrapper->is_ignored() && wrapper->is_ignored()) {
+    if (wrapper->is_ignored()) {
         set_ignored(wrapper->ignored_msg());
+    } else if (!_wrapper->is_ignored()) {
+        return _wrapper->merge(wrapper);
     }
-    auto origin_type = _wrapper->get_real_type();
-    Status status = _wrapper->merge(wrapper);
-    if (origin_type != _wrapper->get_real_type()) {
-        update_runtime_filter_type_to_profile();
-    }
-    return status;
+    return Status::OK();
 }
 
 template <typename T>
@@ -1695,12 +1668,10 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParams* param) {
     if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
         const PInFilter in_filter = param->request->in_filter();
         set_ignored(in_filter.ignored_msg());
-    }
-    std::unique_ptr<RuntimePredicateWrapper> wrapper;
-    RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper));
-    auto origin_type = _wrapper->get_real_type();
-    RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
-    if (origin_type != _wrapper->get_real_type()) {
+    } else {
+        std::unique_ptr<RuntimePredicateWrapper> wrapper;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, 
&wrapper));
+        RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
         update_runtime_filter_type_to_profile();
     }
     this->signal();
@@ -1718,11 +1689,8 @@ void 
IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer
     if (_wrapper->column_type() != wrapper->column_type()) {
         wrapper->_column_return_type = _wrapper->_column_return_type;
     }
-    auto origin_type = _wrapper->get_real_type();
     _wrapper = wrapper;
-    if (origin_type != _wrapper->get_real_type()) {
-        update_runtime_filter_type_to_profile();
-    }
+    update_runtime_filter_type_to_profile();
     this->signal();
 }
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index d853493889c..74b2580a4e6 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -362,7 +362,7 @@ protected:
     static Status _create_wrapper(const T* param, ObjectPool* pool,
                                   std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
 
-    void _set_push_down() { _is_push_down = true; }
+    void _set_push_down(bool push_down) { _is_push_down = push_down; }
 
     std::string _format_status() const;
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 7f34bf7f2c9..c9c1a996064 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -47,11 +47,8 @@ public:
         std::map<int, bool> has_in_filter;
 
         auto ignore_local_filter = [&](int filter_id) {
-            // Now pipeline x have bug in ignore, after fix the problem enable 
ignore logic in pipeline x
-            if (_need_local_merge) {
-                return Status::OK();
-            }
-            auto runtime_filter_mgr = state->local_runtime_filter_mgr();
+            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
+                                                        : 
state->local_runtime_filter_mgr();
 
             std::vector<IRuntimeFilter*> filters;
             RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 84e439c9032..119f5e42a60 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -303,8 +303,7 @@ Status ScanLocalState<Derived>::_normalize_predicate(
             auto impl = conjunct_expr_root->get_impl();
             // If impl is not null, which means this a conjuncts from runtime 
filter.
             auto cur_expr = impl ? impl.get() : conjunct_expr_root.get();
-            bool _is_runtime_filter_predicate =
-                    _rf_vexpr_set.find(conjunct_expr_root) != 
_rf_vexpr_set.end();
+            bool _is_runtime_filter_predicate = 
_rf_vexpr_set.contains(conjunct_expr_root);
             SlotDescriptor* slot = nullptr;
             ColumnValueRangeType* range = nullptr;
             vectorized::VScanNode::PushDownType pdt =
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 38db8cbe8ff..4c805b50582 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -112,11 +112,6 @@ public:
 
     void close_a_pipeline();
 
-    void set_merge_controller_handler(
-            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
-        _merge_controller_handler = handler;
-    }
-
     virtual void add_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
 
@@ -193,10 +188,6 @@ protected:
 
     std::shared_ptr<QueryContext> _query_ctx;
 
-    // This shared ptr is never used. It is just a reference to hold the 
object.
-    // There is a weak ptr in runtime filter manager to reference this object.
-    std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
-
     MonotonicStopWatch _fragment_watcher;
     RuntimeProfile::Counter* _start_timer = nullptr;
     RuntimeProfile::Counter* _prepare_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 631a17d193a..56045118a94 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -126,7 +126,7 @@ void RuntimeFilterTimer::call_timeout() {
     }
     _call_timeout = true;
     if (_parent) {
-        _parent->sub_filters();
+        _parent->sub_filters(_filter_id);
     }
 }
 
@@ -137,7 +137,7 @@ void RuntimeFilterTimer::call_ready() {
     }
     _call_ready = true;
     if (_parent) {
-        _parent->sub_filters();
+        _parent->sub_filters(_filter_id);
     }
     _is_ready = true;
 }
@@ -146,40 +146,43 @@ void RuntimeFilterTimer::call_has_ready() {
     std::unique_lock<std::mutex> lc(_lock);
     DCHECK(!_call_timeout);
     if (!_call_ready) {
-        _parent->sub_filters();
+        _parent->sub_filters(_filter_id);
     }
 }
 
-void RuntimeFilterTimer::call_has_release() {
-    // When the use count is equal to 1, only the timer queue still holds 
ownership,
-    // so there is no need to take any action.
-}
-
 void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
+    const auto filter_id = runtime_filter->filter_id();
+    ;
     _filters++;
+    _filter_ready_map[filter_id] = false;
     int64_t registration_time = runtime_filter->registration_time();
     int32 wait_time_ms = runtime_filter->wait_time_ms();
     auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            registration_time, wait_time_ms,
+            filter_id, registration_time, wait_time_ms,
             
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
     runtime_filter->set_filter_timer(filter_timer);
     
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
 }
 
-void RuntimeFilterDependency::sub_filters() {
-    auto value = _filters.fetch_sub(1);
-    if (value == 1) {
-        _watcher.stop();
-        std::vector<PipelineXTask*> local_block_task {};
-        {
-            std::unique_lock<std::mutex> lc(_task_lock);
-            *_blocked_by_rf = false;
-            local_block_task.swap(_blocked_task);
+void RuntimeFilterDependency::sub_filters(int id) {
+    std::vector<PipelineXTask*> local_block_task {};
+    {
+        std::lock_guard<std::mutex> lk(_task_lock);
+        if (!_filter_ready_map[id]) {
+            _filter_ready_map[id] = true;
+            _filters--;
         }
-        for (auto* task : local_block_task) {
-            task->wake_up();
+        if (_filters == 0) {
+            _watcher.stop();
+            {
+                *_blocked_by_rf = false;
+                local_block_task.swap(_blocked_task);
+            }
         }
     }
+    for (auto* task : local_block_task) {
+        task->wake_up();
+    }
 }
 
 void LocalExchangeSharedState::sub_running_sink_operators() {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index f518ef96d46..ccb919c7edf 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -203,9 +203,10 @@ public:
 class RuntimeFilterDependency;
 class RuntimeFilterTimer {
 public:
-    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
+    RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t 
wait_time_ms,
                        std::shared_ptr<RuntimeFilterDependency> parent)
-            : _parent(std::move(parent)),
+            : _filter_id(filter_id),
+              _parent(std::move(parent)),
               _registration_time(registration_time),
               _wait_time_ms(wait_time_ms) {}
 
@@ -215,7 +216,9 @@ public:
 
     void call_has_ready();
 
-    void call_has_release();
+    // When the use count is equal to 1, only the timer queue still holds 
ownership,
+    // so there is no need to take any action.
+    void call_has_release() {};
 
     bool has_ready();
 
@@ -223,6 +226,7 @@ public:
     int32_t wait_time_ms() const { return _wait_time_ms; }
 
 private:
+    int _filter_id = -1;
     bool _call_ready {};
     bool _call_timeout {};
     std::shared_ptr<RuntimeFilterDependency> _parent;
@@ -303,7 +307,7 @@ public:
             : Dependency(id, node_id, name, query_ctx) {}
     Dependency* is_blocked_by(PipelineXTask* task) override;
     void add_filters(IRuntimeFilter* runtime_filter);
-    void sub_filters();
+    void sub_filters(int id);
     void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
         _blocked_by_rf = blocked_by_rf;
     }
@@ -312,6 +316,7 @@ public:
 
 protected:
     std::atomic_int _filters;
+    phmap::flat_hash_map<int, bool> _filter_ready_map;
     std::shared_ptr<std::atomic_bool> _blocked_by_rf;
 };
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index efca7c068b6..696fcfefba5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -518,8 +518,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             filterparams->query_id.set_hi(_runtime_state->query_id().hi);
             filterparams->query_id.set_lo(_runtime_state->query_id().lo);
 
-            
filterparams->_fragment_instance_id.set_hi(fragment_instance_id.hi);
-            
filterparams->_fragment_instance_id.set_lo(fragment_instance_id.lo);
             filterparams->be_exec_version = _runtime_state->be_exec_version();
             filterparams->query_ctx = _query_ctx.get();
         }
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 20a78209a36..3c1e64cf6c9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -719,7 +719,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     static_cast<void>(_runtimefilter_controller.add_entity(
             params.params, params.params.query_id, params.query_options, 
&handler,
             
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
-    fragment_executor->set_merge_controller_handler(handler);
+    query_ctx->set_merge_controller_handler(handler);
     {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_instance_map.insert(
@@ -807,7 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             static_cast<void>(_runtimefilter_controller.add_entity(
                     params.local_params[i], params.query_id, 
params.query_options, &handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-            context->set_merge_controller_handler(handler);
+            query_ctx->set_merge_controller_handler(handler);
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
             {
                 std::lock_guard<std::mutex> lock(_lock);
@@ -887,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             static_cast<void>(_runtimefilter_controller.add_entity(
                     local_params, params.query_id, params.query_options, 
&handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-            context->set_merge_controller_handler(handler);
+            query_ctx->set_merge_controller_handler(handler);
             {
                 std::lock_guard<std::mutex> lock(_lock);
                 _pipeline_map.insert(std::make_pair(fragment_instance_id, 
context));
@@ -1395,38 +1395,24 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
                                  butil::IOBufAsZeroCopyInputStream* 
attach_data) {
     UniqueId queryid = request->query_id();
-    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     bool opt_remote_rf = request->has_opt_remote_rf() && 
request->opt_remote_rf();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
 
-    auto fragment_instance_id = filter_controller->instance_id();
-    TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-    if (is_pipeline) {
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);
+        query_id.__set_lo(queryid.lo);
         std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _pipeline_map.find(tfragment_instance_id);
-        if (iter == _pipeline_map.end()) {
-            VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id;
-            return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
+        auto iter = _query_ctx_map.find(query_id);
+        if (iter == _query_ctx_map.end()) {
+            return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
         }
 
         // hold reference to pip_context, or else runtime_state can be 
destroyed
         // when filter_controller->merge is still in progress
-        pip_context = iter->second;
-    } else {
-        std::unique_lock<std::mutex> lock(_lock);
-        auto iter = _fragment_instance_map.find(tfragment_instance_id);
-        if (iter == _fragment_instance_map.end()) {
-            VLOG_CRITICAL << "unknown fragment instance id:" << 
print_id(tfragment_instance_id);
-            return Status::InvalidArgument("fragment instance id: {}",
-                                           print_id(tfragment_instance_id));
-        }
-
-        // hold reference to fragment_executor, or else runtime_state can be 
destroyed
-        // when filter_controller->merge is still in progress
-        fragment_executor = iter->second;
+        query_ctx = iter->second;
     }
     auto merge_status = filter_controller->merge(request, attach_data, 
opt_remote_rf);
     DCHECK(merge_status.ok());
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 41fa6c2f819..5529d1ba3b5 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -134,11 +134,6 @@ public:
 
     void set_need_wait_execution_trigger() { _need_wait_execution_trigger = 
true; }
 
-    void set_merge_controller_handler(
-            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
-        _merge_controller_handler = handler;
-    }
-
     std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
 
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
@@ -219,9 +214,6 @@ private:
     RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
 
     RuntimeProfile::Counter* _fragment_cpu_timer = nullptr;
-    // This shared ptr is never used. It is just a reference to hold the 
object.
-    // There is a weak ptr in runtime filter manager to reference this object.
-    std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
 
     // If set the true, this plan fragment will be executed only after FE send 
execution start rpc.
     bool _need_wait_execution_trigger = false;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 3db91ba2824..d7b3813dcef 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -214,7 +214,11 @@ public:
 
     int64_t mem_limit() { return _bytes_limit; }
 
-public:
+    void set_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+        _merge_controller_handler = handler;
+    }
+
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
     std::string user;
@@ -282,6 +286,9 @@ private:
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
     std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
+    // This shared ptr is never used. It is just a reference to hold the 
object.
+    // There is a weak ptr in runtime filter manager to reference this object.
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 95f65c5fc32..81d5dc88d54 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -258,11 +258,10 @@ Status 
RuntimeFilterMergeControllerEntity::_init_with_desc(
     return Status::OK();
 }
 
-Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId 
fragment_instance_id,
+Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id,
                                                 const TRuntimeFilterParams& 
runtime_filter_params,
                                                 const TQueryOptions& 
query_options) {
     _query_id = query_id;
-    _fragment_instance_id = fragment_instance_id;
     _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity",
                                                 
ExecEnv::GetInstance()->experimental_mem_tracker());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -518,8 +517,6 @@ RuntimeFilterParamsContext* 
RuntimeFilterParamsContext::create(RuntimeState* sta
     params->query_id.set_hi(state->query_id().hi);
     params->query_id.set_lo(state->query_id().lo);
 
-    params->_fragment_instance_id.set_hi(state->fragment_instance_id().hi);
-    params->_fragment_instance_id.set_lo(state->fragment_instance_id().lo);
     params->be_exec_version = state->be_exec_version();
     params->query_ctx = state->get_query_ctx();
     return params;
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index c9b455bc107..19908166942 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -136,8 +136,7 @@ public:
             : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {}
     ~RuntimeFilterMergeControllerEntity() = default;
 
-    Status init(UniqueId query_id, UniqueId fragment_instance_id,
-                const TRuntimeFilterParams& runtime_filter_params,
+    Status init(UniqueId query_id, const TRuntimeFilterParams& 
runtime_filter_params,
                 const TQueryOptions& query_options);
 
     // handle merge rpc
@@ -201,7 +200,6 @@ public:
 
         // TODO: why we need string, direct use UniqueId
         std::string query_id_str = query_id.to_string();
-        UniqueId fragment_instance_id = UniqueId(params.fragment_instance_id);
         uint32_t shard = _get_controller_shard_idx(query_id);
         std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
         auto iter = _filter_controller_map[shard].find(query_id_str);
@@ -214,8 +212,7 @@ public:
                     });
             _filter_controller_map[shard][query_id_str] = *handle;
             const TRuntimeFilterParams& filter_params = 
params.runtime_filter_params;
-            RETURN_IF_ERROR(handle->get()->init(query_id, 
fragment_instance_id, filter_params,
-                                                query_options));
+            RETURN_IF_ERROR(handle->get()->init(query_id, filter_params, 
query_options));
         } else {
             *handle = _filter_controller_map[shard][query_id_str].lock();
         }
@@ -254,8 +251,6 @@ private:
 // and the other is local, originating from RuntimeState.
 // In practice, we have already distinguished between them through 
UpdateRuntimeFilterParamsV2/V1.
 // RuntimeState/QueryContext is only used to store runtime_filter_wait_time_ms 
and enable_pipeline_exec...
-
-/// TODO: Consider adding checks for global/local.
 struct RuntimeFilterParamsContext {
     RuntimeFilterParamsContext() = default;
     static RuntimeFilterParamsContext* create(RuntimeState* state);
@@ -268,10 +263,8 @@ struct RuntimeFilterParamsContext {
     RuntimeFilterMgr* runtime_filter_mgr;
     ExecEnv* exec_env;
     PUniqueId query_id;
-    PUniqueId _fragment_instance_id;
     int be_exec_version;
     QueryContext* query_ctx;
     QueryContext* get_query_ctx() const { return query_ctx; }
-    PUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 };
 } // namespace doris
diff --git a/be/src/service/backend_options.cpp 
b/be/src/service/backend_options.cpp
index c8325733368..a8c48fd710e 100644
--- a/be/src/service/backend_options.cpp
+++ b/be/src/service/backend_options.cpp
@@ -74,6 +74,10 @@ TBackend BackendOptions::get_local_backend() {
     return _backend;
 }
 
+void BackendOptions::set_backend_id(int64_t backend_id) {
+    _backend.__set_id(backend_id);
+}
+
 void BackendOptions::set_localhost(const std::string& host) {
     _s_localhost = host;
 }
diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h
index 72293373883..8f504ba2ea7 100644
--- a/be/src/service/backend_options.h
+++ b/be/src/service/backend_options.h
@@ -34,6 +34,7 @@ public:
     static bool init();
     static const std::string& get_localhost();
     static TBackend get_local_backend();
+    static void set_backend_id(int64_t backend_id);
     static void set_localhost(const std::string& host);
     static bool is_bind_ipv6();
     static const char* get_service_bind_address();
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index f013b83e68d..b3962af87e3 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -56,7 +56,6 @@
 #include "common/config.h"
 #include "common/daemon.h"
 #include "common/logging.h"
-#include "common/phdr_cache.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "io/cache/block/block_file_cache_factory.h"
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index e683c4f2be0..52caf84e361 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -37,12 +37,12 @@ Status RuntimeFilterConsumer::init(RuntimeState* state, 
bool need_local_merge) {
 }
 
 void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) {
-    std::stringstream ss;
+    fmt::memory_buffer buffer;
     for (auto& rf_ctx : _runtime_filter_ctxs) {
         rf_ctx.runtime_filter->init_profile(profile);
-        ss << rf_ctx.runtime_filter->get_name() << ", ";
+        fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->get_name());
     }
-    profile->add_info_string("RuntimeFilters: ", ss.str());
+    profile->add_info_string("RuntimeFilters: ", to_string(buffer));
 }
 
 Status RuntimeFilterConsumer::_register_runtime_filter(bool need_local_merge) {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 02557644d5a..4ba8f924c00 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -472,8 +472,7 @@ Status VScanNode::_normalize_predicate(const VExprSPtr& 
conjunct_expr_root, VExp
             auto impl = conjunct_expr_root->get_impl();
             // If impl is not null, which means this a conjuncts from runtime 
filter.
             auto cur_expr = impl ? impl.get() : conjunct_expr_root.get();
-            bool _is_runtime_filter_predicate =
-                    _rf_vexpr_set.find(conjunct_expr_root) != 
_rf_vexpr_set.end();
+            bool _is_runtime_filter_predicate = 
_rf_vexpr_set.contains(conjunct_expr_root);
             SlotDescriptor* slot = nullptr;
             ColumnValueRangeType* range = nullptr;
             PushDownType pdt = PushDownType::UNACCEPTABLE;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to