This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch rf-thrift-poc in repository https://gitbox.apache.org/repos/asf/doris.git
commit 309e2175e5ea83300f5f4ad23404b2662a0b75d4 Author: BiteTheDDDDt <x...@selectdb.com> AuthorDate: Wed Mar 26 18:36:11 2025 +0800 update --- be/src/pipeline/pipeline_fragment_context.cpp | 9 --------- be/src/runtime/fragment_mgr.cpp | 14 ++++++++++---- be/src/runtime/runtime_state.cpp | 4 ---- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9e1f96d737a..e62addd95ad 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -300,15 +300,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->set_total_load_streams(request.total_load_streams); _runtime_state->set_num_local_sink(request.num_local_sink); - const auto& local_params = request.local_params[0]; - if (local_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_params.runtime_filter_params); - } - if (local_params.__isset.topn_filter_descs) { - _query_ctx->init_runtime_predicates(local_params.topn_filter_descs); - } - // init fragment_instance_ids const auto target_size = request.local_params.size(); _fragment_instance_ids.resize(target_size); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 063db6e9dcd..5f079cfa2c7 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -853,13 +853,19 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed", { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); }); - if (params.local_params[0].__isset.runtime_filter_params && - !params.local_params[0].runtime_filter_params.rid_to_runtime_filter.empty()) { + const auto& local_param = params.local_params[0]; + if (local_param.__isset.runtime_filter_params && + !local_param.runtime_filter_params.rid_to_runtime_filter.empty()) { auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>( RuntimeFilterParamsContext::create(context->get_runtime_state())); - RETURN_IF_ERROR( - handler->init(params.query_id, params.local_params[0].runtime_filter_params)); + RETURN_IF_ERROR(handler->init(params.query_id, local_param.runtime_filter_params)); query_ctx->set_merge_controller_handler(handler); + + query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_param.runtime_filter_params); + } + if (local_param.__isset.topn_filter_descs) { + query_ctx->init_runtime_predicates(local_param.topn_filter_descs); } { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 88a38d6356d..958d6c19122 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -88,10 +88,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, } #endif DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan"); - if (fragment_exec_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - fragment_exec_params.runtime_filter_params); - } } RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org