This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 606223ab62c Revert "[refactor](pipeline) simplify runtime state ctor 
(#25995)" (#26029)
606223ab62c is described below

commit 606223ab62c2408b17fdb019ff824977cd1d14a7
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Oct 27 18:15:30 2023 +0800

    Revert "[refactor](pipeline) simplify runtime state ctor (#25995)" (#26029)
    
    This reverts commit a01922cdc55e2b3a63d9a9aafb38ac5ed64c6dd3.
---
 be/src/pipeline/pipeline_fragment_context.cpp       |  9 +++------
 .../pipeline_x/pipeline_x_fragment_context.cpp      |  9 +++------
 be/src/runtime/runtime_state.cpp                    | 21 +++++++++++----------
 be/src/runtime/runtime_state.h                      |  8 +++-----
 4 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 38e293367bf..686ff0fe0ad 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -221,12 +221,9 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
              local_params.backend_num);
 
     // 1. init _runtime_state
-    _runtime_state = RuntimeState::create_unique(
-            local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
-            request.query_options, _query_ctx->query_globals, _exec_env);
-    if (local_params.__isset.runtime_filter_params) {
-        
_runtime_state->set_runtime_filter_params(local_params.runtime_filter_params);
-    }
+    _runtime_state = RuntimeState::create_unique(local_params, 
request.query_id,
+                                                 request.fragment_id, 
request.query_options,
+                                                 _query_ctx->query_globals, 
_exec_env);
     _runtime_state->set_query_ctx(_query_ctx.get());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
     _runtime_state->set_tracer(std::move(tracer));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 832d86128d2..5ca5829e1a8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -390,12 +390,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
     for (size_t i = 0; i < target_size; i++) {
         const auto& local_params = request.local_params[i];
 
-        _runtime_states[i] = RuntimeState::create_unique(
-                local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
-                request.query_options, _query_ctx->query_globals, _exec_env);
-        if (local_params.__isset.runtime_filter_params) {
-            
_runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params);
-        }
+        _runtime_states[i] = RuntimeState::create_unique(local_params, 
request.query_id,
+                                                         request.fragment_id, 
request.query_options,
+                                                         
_query_ctx->query_globals, _exec_env);
         _runtime_states[i]->set_query_ctx(_query_ctx.get());
         
_runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker);
         _runtime_states[i]->set_tracer(_runtime_state->get_tracer());
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d86c0cbb01d..7a24b86621b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -101,10 +101,11 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
     DCHECK(status.ok());
 }
 
-RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& 
query_id,
-                           int32_t fragment_id, const TQueryOptions& 
query_options,
-                           const TQueryGlobals& query_globals, ExecEnv* 
exec_env)
-        : _profile("Fragment " + print_id(instance_id)),
+RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
+                           const TUniqueId& query_id, int32_t fragment_id,
+                           const TQueryOptions& query_options, const 
TQueryGlobals& query_globals,
+                           ExecEnv* exec_env)
+        : _profile("Fragment " + 
print_id(pipeline_params.fragment_instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
           _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
@@ -124,7 +125,12 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, 
const TUniqueId& query_
           _normal_row_number(0),
           _error_row_number(0),
           _error_log_file(nullptr) {
-    DCHECK(init(instance_id, query_options, query_globals, exec_env).ok());
+    if (pipeline_params.__isset.runtime_filter_params) {
+        
_runtime_filter_mgr->set_runtime_filter_params(pipeline_params.runtime_filter_params);
+    }
+    Status status =
+            init(pipeline_params.fragment_instance_id, query_options, 
query_globals, exec_env);
+    DCHECK(status.ok());
 }
 
 RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
@@ -267,11 +273,6 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
     return Status::OK();
 }
 
-void RuntimeState::set_runtime_filter_params(
-        const TRuntimeFilterParams& runtime_filter_params) const {
-    _runtime_filter_mgr->set_runtime_filter_params(runtime_filter_params);
-}
-
 void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string& 
name) {
     _query_mem_tracker = std::make_shared<MemTrackerLimiter>(
             MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}", 
name, print_id(id)));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f33b4c4febe..29ef581947c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -72,9 +72,9 @@ public:
                  const TQueryOptions& query_options, const TQueryGlobals& 
query_globals,
                  ExecEnv* exec_env);
 
-    RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, 
int32 fragment_id,
-                 const TQueryOptions& query_options, const TQueryGlobals& 
query_globals,
-                 ExecEnv* exec_env);
+    RuntimeState(const TPipelineInstanceParams& pipeline_params, const 
TUniqueId& query_id,
+                 int32 fragment_id, const TQueryOptions& query_options,
+                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
 
     // Used by pipelineX. This runtime state is only used for setup.
     RuntimeState(const TUniqueId& query_id, int32 fragment_id, const 
TQueryOptions& query_options,
@@ -93,8 +93,6 @@ public:
     Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& 
query_options,
                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
 
-    void set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params) const;
-
     // for ut and non-query.
     void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; }
     void init_mem_trackers(const TUniqueId& id = TUniqueId(), const 
std::string& name = "unknown");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to