This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch rf-thrift-poc
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f31922bb2084c169ece69597f64ec3c354cb0876
Author: BiteTheDDDDt <x...@selectdb.com>
AuthorDate: Thu Mar 27 15:00:30 2025 +0800

    update
---
 be/src/http/action/http_stream.cpp                 |  4 +-
 be/src/http/action/stream_load.cpp                 |  7 ++--
 be/src/io/fs/multi_table_pipe.cpp                  | 19 ++++------
 be/src/io/fs/multi_table_pipe.h                    |  3 +-
 be/src/runtime/fragment_mgr.cpp                    | 44 ++++++++++++----------
 be/src/runtime/fragment_mgr.h                      |  5 ++-
 be/src/runtime/group_commit_mgr.cpp                |  6 ++-
 .../routine_load/routine_load_task_executor.cpp    |  3 +-
 .../runtime/stream_load/stream_load_executor.cpp   |  7 ++--
 be/src/runtime/stream_load/stream_load_executor.h  |  5 ++-
 be/src/service/internal_service.cpp                | 18 ++++++++-
 11 files changed, 73 insertions(+), 48 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index a7665716194..fb8be70e5a6 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -387,8 +387,8 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
         }
         ctx->put_result.pipeline_params.__set_content_length(content_length);
     }
-
-    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+    TPipelineFragmentParamsList mocked;
+    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, 
mocked);
 }
 
 void 
HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> 
ctx,
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index d91d85695a1..204f11e8f87 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -167,7 +167,8 @@ Status 
StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
     if (!ctx->use_streaming) {
         // we need to close file first, then execute_plan_fragment here
         ctx->body_sink.reset();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx));
+        TPipelineFragmentParamsList mocked;
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, 
mocked));
     }
 
     // wait stream load finish
@@ -779,8 +780,8 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
     if (!ctx->use_streaming) {
         return Status::OK();
     }
-
-    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+    TPipelineFragmentParamsList mocked;
+    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, 
mocked);
 }
 
 Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* 
file_path) {
diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 463f002596a..014afb0e8ee 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -214,7 +214,7 @@ Status MultiTablePipe::request_and_exec_plans() {
 
         if (_ctx->multi_table_put_result.__isset.params &&
             !_ctx->multi_table_put_result.__isset.pipeline_params) {
-            st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
+            return Status::Aborted("only support pipeline engine");
         } else if (!_ctx->multi_table_put_result.__isset.params &&
                    _ctx->multi_table_put_result.__isset.pipeline_params) {
             st = exec_plans(exec_env, 
_ctx->multi_table_put_result.pipeline_params);
@@ -229,8 +229,8 @@ Status MultiTablePipe::request_and_exec_plans() {
     return st;
 }
 
-template <typename ExecParam>
-Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> 
params) {
+Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
+                                  const std::vector<TPipelineFragmentParams>& 
params) {
     // put unplanned pipes into planned pipes and clear unplanned pipes
     for (auto& pair : _unplanned_tables) {
         _ctx->table_list.push_back(pair.first);
@@ -249,9 +249,10 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
         }
 
         _inflight_cnt++;
-
+        TPipelineFragmentParamsList mocked;
         RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
-                plan, QuerySource::ROUTINE_LOAD, [this, plan](RuntimeState* 
state, Status* status) {
+                plan, QuerySource::ROUTINE_LOAD,
+                [this, plan](RuntimeState* state, Status* status) {
                     DCHECK(state);
                     auto pair = _planned_tables.find(plan.table_name);
                     if (pair == _planned_tables.end()) {
@@ -292,7 +293,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
                     if (inflight_cnt == 1 && is_consume_finished()) {
                         _handle_consumer_finished();
                     }
-                }));
+                },
+                mocked));
     }
 
     return Status::OK();
@@ -344,10 +346,5 @@ void MultiTablePipe::_handle_consumer_finished() {
     _ctx->promise.set_value(_status); // when all done, finish the routine 
load task
 }
 
-template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
-                                           
std::vector<TExecPlanFragmentParams> params);
-template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
-                                           
std::vector<TPipelineFragmentParams> params);
-
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index f1d2e523652..eb63c63f56d 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -67,8 +67,7 @@ private:
     // [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe
     Status dispatch(const std::string& table, const char* data, size_t size, 
AppendFunc cb);
 
-    template <typename ExecParam>
-    Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);
+    Status exec_plans(ExecEnv* exec_env, const 
std::vector<TPipelineFragmentParams>& params);
 
     void _set_consume_finished() { _consume_finished.store(true, 
std::memory_order_release); }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5f079cfa2c7..fcdee1adcad 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -609,7 +609,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
 }
 
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
-                                       const QuerySource query_source) {
+                                       const QuerySource query_source,
+                                       const TPipelineFragmentParamsList& 
parent) {
     if (params.txn_conf.need_txn) {
         std::shared_ptr<StreamLoadContext> stream_load_ctx =
                 std::make_shared<StreamLoadContext>(_exec_env);
@@ -638,10 +639,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         RETURN_IF_ERROR(
                 _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, 
stream_load_ctx));
 
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
+        RETURN_IF_ERROR(
+                
_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx, 
parent));
         return Status::OK();
     } else {
-        return exec_plan_fragment(params, query_source, empty_function);
+        return exec_plan_fragment(params, query_source, empty_function, 
parent);
     }
 }
 
@@ -820,7 +822,8 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& 
query_id) {
 }
 
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
-                                       QuerySource query_source, const 
FinishCallback& cb) {
+                                       QuerySource query_source, const 
FinishCallback& cb,
+                                       const TPipelineFragmentParamsList& 
parent) {
     VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment 
params is "
              << apache::thrift::ThriftDebugString(params).c_str();
     // sometimes TExecPlanFragmentParams debug string is too long and glog
@@ -852,20 +855,20 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
     DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
                     { return 
Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
-
-    const auto& local_param = params.local_params[0];
-    if (local_param.__isset.runtime_filter_params &&
-        !local_param.runtime_filter_params.rid_to_runtime_filter.empty()) {
-        auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>(
-                
RuntimeFilterParamsContext::create(context->get_runtime_state()));
-        RETURN_IF_ERROR(handler->init(params.query_id, 
local_param.runtime_filter_params));
-        query_ctx->set_merge_controller_handler(handler);
-
-        query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                local_param.runtime_filter_params);
-    }
-    if (local_param.__isset.topn_filter_descs) {
-        query_ctx->init_runtime_predicates(local_param.topn_filter_descs);
+    if (parent.__isset.runtime_filter_info) {
+        auto info = parent.runtime_filter_info;
+        if (info.__isset.runtime_filter_params &&
+            !info.runtime_filter_params.rid_to_runtime_filter.empty()) {
+            auto handler = 
std::make_shared<RuntimeFilterMergeControllerEntity>(
+                    
RuntimeFilterParamsContext::create(context->get_runtime_state()));
+            RETURN_IF_ERROR(handler->init(params.query_id, 
info.runtime_filter_params));
+            query_ctx->set_merge_controller_handler(handler);
+
+            
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(info.runtime_filter_params);
+        }
+        if (info.__isset.topn_filter_descs) {
+            query_ctx->init_runtime_predicates(info.topn_filter_descs);
+        }
     }
 
     {
@@ -1271,7 +1274,10 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
     exec_fragment_params.__set_query_options(query_options);
     VLOG_ROW << "external exec_plan_fragment params is "
              << 
apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
-    return exec_plan_fragment(exec_fragment_params, 
QuerySource::EXTERNAL_CONNECTOR);
+
+    // external_plan only support single table scan, so parent is empty
+    TPipelineFragmentParamsList parent;
+    return exec_plan_fragment(exec_fragment_params, 
QuerySource::EXTERNAL_CONNECTOR, parent);
 }
 
 Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 88c822628b1..eae79c70cc0 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -122,7 +122,8 @@ public:
     // execute one plan fragment
     Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
QuerySource query_type);
 
-    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type);
+    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type,
+                              const TPipelineFragmentParamsList& parent);
 
     void remove_pipeline_context(std::pair<TUniqueId, int> key);
 
@@ -131,7 +132,7 @@ public:
                               const FinishCallback& cb);
 
     Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type,
-                              const FinishCallback& cb);
+                              const FinishCallback& cb, const 
TPipelineFragmentParamsList& parent);
 
     Status start_query_execution(const PExecPlanFragmentStartRequest* request);
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 87ad1c975f4..70d250deba1 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -588,8 +588,10 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t 
db_id, int64_t table_id,
                          << ", st=" << finish_st.to_string();
         }
     };
-    return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params,
-                                                         
QuerySource::GROUP_COMMIT_LOAD, finish_cb);
+
+    TPipelineFragmentParamsList mocked;
+    return _exec_env->fragment_mgr()->exec_plan_fragment(
+            pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb, 
mocked);
 }
 
 Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d24146a4499..582283b7858 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -396,7 +396,8 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
         // only for normal load, single-stream-multi-table load will be 
planned during consuming
 #ifndef BE_TEST
         // execute plan fragment, async
-        
HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx),
+        TPipelineFragmentParamsList mocked;
+        
HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, 
mocked),
                      "failed to execute plan fragment");
 #else
         // only for test
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 48682a21677..cdea553d896 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -65,7 +65,8 @@ bvar::LatencyRecorder 
g_stream_load_begin_txn_latency("stream_load", "begin_txn"
 bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", 
"precommit_txn");
 bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", 
"commit_txn");
 
-Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> 
ctx) {
+Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> 
ctx,
+                                                 const 
TPipelineFragmentParamsList& parent) {
 // submit this params
 #ifndef BE_TEST
     ctx->start_write_data_nanos = MonotonicNanos();
@@ -146,8 +147,8 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
         st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
                                                            
QuerySource::STREAM_LOAD, exec_fragment);
     } else {
-        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params,
-                                                           
QuerySource::STREAM_LOAD, exec_fragment);
+        st = _exec_env->fragment_mgr()->exec_plan_fragment(
+                ctx->put_result.pipeline_params, QuerySource::STREAM_LOAD, 
exec_fragment, parent);
     }
 
     if (!st.ok()) {
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/runtime/stream_load/stream_load_executor.h
index 1364bbbf31b..3472ae5a200 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/PaloInternalService_types.h>
+
 #include <memory>
 
 #include "common/factory_creator.h"
@@ -49,7 +51,8 @@ public:
 
     virtual void rollback_txn(StreamLoadContext* ctx);
 
-    Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
+    Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx,
+                                 const TPipelineFragmentParamsList& parent);
 
 protected:
     // collect the load statistics from context and set them to stat
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 86c42b7c4b7..c863afe057d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -579,13 +579,27 @@ Status PInternalService::_exec_plan_fragment_impl(
         }
         MonotonicStopWatch timer;
         timer.start();
+
+        // work for old version frontend
+        if (!t_request.__isset.runtime_filter_info) {
+            TRuntimeFilterInfo runtime_filter_info;
+            auto local_param = fragment_list[0].local_params[0];
+            if (local_param.__isset.runtime_filter_params) {
+                
runtime_filter_info.__set_runtime_filter_params(local_param.runtime_filter_params);
+            }
+            if (local_param.__isset.topn_filter_descs) {
+                
runtime_filter_info.__set_topn_filter_descs(local_param.topn_filter_descs);
+            }
+            t_request.__set_runtime_filter_info(runtime_filter_info);
+        }
+
         for (const TPipelineFragmentParams& fragment : fragment_list) {
             if (cb) {
                 RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
-                        fragment, QuerySource::INTERNAL_FRONTEND, cb));
+                        fragment, QuerySource::INTERNAL_FRONTEND, cb, 
t_request));
             } else {
                 RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
-                        fragment, QuerySource::INTERNAL_FRONTEND));
+                        fragment, QuerySource::INTERNAL_FRONTEND, t_request));
             }
         }
         timer.stop();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to