This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 606223ab62c Revert "[refactor](pipeline) simplify runtime state ctor (#25995)" (#26029) 606223ab62c is described below commit 606223ab62c2408b17fdb019ff824977cd1d14a7 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Oct 27 18:15:30 2023 +0800 Revert "[refactor](pipeline) simplify runtime state ctor (#25995)" (#26029) This reverts commit a01922cdc55e2b3a63d9a9aafb38ac5ed64c6dd3. --- be/src/pipeline/pipeline_fragment_context.cpp | 9 +++------ .../pipeline_x/pipeline_x_fragment_context.cpp | 9 +++------ be/src/runtime/runtime_state.cpp | 21 +++++++++++---------- be/src/runtime/runtime_state.h | 8 +++----- 4 files changed, 20 insertions(+), 27 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 38e293367bf..686ff0fe0ad 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -221,12 +221,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re local_params.backend_num); // 1. init _runtime_state - _runtime_state = RuntimeState::create_unique( - local_params.fragment_instance_id, request.query_id, request.fragment_id, - request.query_options, _query_ctx->query_globals, _exec_env); - if (local_params.__isset.runtime_filter_params) { - _runtime_state->set_runtime_filter_params(local_params.runtime_filter_params); - } + _runtime_state = RuntimeState::create_unique(local_params, request.query_id, + request.fragment_id, request.query_options, + _query_ctx->query_globals, _exec_env); _runtime_state->set_query_ctx(_query_ctx.get()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); _runtime_state->set_tracer(std::move(tracer)); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 832d86128d2..5ca5829e1a8 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -390,12 +390,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( for (size_t i = 0; i < target_size; i++) { const auto& local_params = request.local_params[i]; - _runtime_states[i] = RuntimeState::create_unique( - local_params.fragment_instance_id, request.query_id, request.fragment_id, - request.query_options, _query_ctx->query_globals, _exec_env); - if (local_params.__isset.runtime_filter_params) { - _runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params); - } + _runtime_states[i] = RuntimeState::create_unique(local_params, request.query_id, + request.fragment_id, request.query_options, + _query_ctx->query_globals, _exec_env); _runtime_states[i]->set_query_ctx(_query_ctx.get()); _runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker); _runtime_states[i]->set_tracer(_runtime_state->get_tracer()); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index d86c0cbb01d..7a24b86621b 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -101,10 +101,11 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, DCHECK(status.ok()); } -RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, - int32_t fragment_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env) - : _profile("Fragment " + print_id(instance_id)), +RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, + const TUniqueId& query_id, int32_t fragment_id, + const TQueryOptions& query_options, const TQueryGlobals& query_globals, + ExecEnv* exec_env) + : _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)), _load_channel_profile("<unnamed>"), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)), @@ -124,7 +125,12 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ _normal_row_number(0), _error_row_number(0), _error_log_file(nullptr) { - DCHECK(init(instance_id, query_options, query_globals, exec_env).ok()); + if (pipeline_params.__isset.runtime_filter_params) { + _runtime_filter_mgr->set_runtime_filter_params(pipeline_params.runtime_filter_params); + } + Status status = + init(pipeline_params.fragment_instance_id, query_options, query_globals, exec_env); + DCHECK(status.ok()); } RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, @@ -267,11 +273,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt return Status::OK(); } -void RuntimeState::set_runtime_filter_params( - const TRuntimeFilterParams& runtime_filter_params) const { - _runtime_filter_mgr->set_runtime_filter_params(runtime_filter_params); -} - void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string& name) { _query_mem_tracker = std::make_shared<MemTrackerLimiter>( MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}", name, print_id(id))); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f33b4c4febe..29ef581947c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -72,9 +72,9 @@ public: const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); - RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, int32 fragment_id, - const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env); + RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id, + int32 fragment_id, const TQueryOptions& query_options, + const TQueryGlobals& query_globals, ExecEnv* exec_env); // Used by pipelineX. This runtime state is only used for setup. RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, @@ -93,8 +93,6 @@ public: Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); - void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params) const; - // for ut and non-query. void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; } void init_mem_trackers(const TUniqueId& id = TUniqueId(), const std::string& name = "unknown"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org