This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch rf-thrift-poc in repository https://gitbox.apache.org/repos/asf/doris.git
commit f31922bb2084c169ece69597f64ec3c354cb0876 Author: BiteTheDDDDt <x...@selectdb.com> AuthorDate: Thu Mar 27 15:00:30 2025 +0800 update --- be/src/http/action/http_stream.cpp | 4 +- be/src/http/action/stream_load.cpp | 7 ++-- be/src/io/fs/multi_table_pipe.cpp | 19 ++++------ be/src/io/fs/multi_table_pipe.h | 3 +- be/src/runtime/fragment_mgr.cpp | 44 ++++++++++++---------- be/src/runtime/fragment_mgr.h | 5 ++- be/src/runtime/group_commit_mgr.cpp | 6 ++- .../routine_load/routine_load_task_executor.cpp | 3 +- .../runtime/stream_load/stream_load_executor.cpp | 7 ++-- be/src/runtime/stream_load/stream_load_executor.h | 5 ++- be/src/service/internal_service.cpp | 18 ++++++++- 11 files changed, 73 insertions(+), 48 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index a7665716194..fb8be70e5a6 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -387,8 +387,8 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, } ctx->put_result.pipeline_params.__set_content_length(content_length); } - - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); + TPipelineFragmentParamsList mocked; + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); } void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d91d85695a1..204f11e8f87 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -167,7 +167,8 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) { if (!ctx->use_streaming) { // we need to close file first, then execute_plan_fragment here ctx->body_sink.reset(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx)); + TPipelineFragmentParamsList mocked; + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked)); } // wait stream load finish @@ -779,8 +780,8 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!ctx->use_streaming) { return Status::OK(); } - - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); + TPipelineFragmentParamsList mocked; + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); } Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) { diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 463f002596a..014afb0e8ee 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -214,7 +214,7 @@ Status MultiTablePipe::request_and_exec_plans() { if (_ctx->multi_table_put_result.__isset.params && !_ctx->multi_table_put_result.__isset.pipeline_params) { - st = exec_plans(exec_env, _ctx->multi_table_put_result.params); + return Status::Aborted("only support pipeline engine"); } else if (!_ctx->multi_table_put_result.__isset.params && _ctx->multi_table_put_result.__isset.pipeline_params) { st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params); @@ -229,8 +229,8 @@ Status MultiTablePipe::request_and_exec_plans() { return st; } -template <typename ExecParam> -Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) { +Status MultiTablePipe::exec_plans(ExecEnv* exec_env, + const std::vector<TPipelineFragmentParams>& params) { // put unplanned pipes into planned pipes and clear unplanned pipes for (auto& pair : _unplanned_tables) { _ctx->table_list.push_back(pair.first); @@ -249,9 +249,10 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para } _inflight_cnt++; - + TPipelineFragmentParamsList mocked; RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment( - plan, QuerySource::ROUTINE_LOAD, [this, plan](RuntimeState* state, Status* status) { + plan, QuerySource::ROUTINE_LOAD, + [this, plan](RuntimeState* state, Status* status) { DCHECK(state); auto pair = _planned_tables.find(plan.table_name); if (pair == _planned_tables.end()) { @@ -292,7 +293,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para if (inflight_cnt == 1 && is_consume_finished()) { _handle_consumer_finished(); } - })); + }, + mocked)); } return Status::OK(); @@ -344,10 +346,5 @@ void MultiTablePipe::_handle_consumer_finished() { _ctx->promise.set_value(_status); // when all done, finish the routine load task } -template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, - std::vector<TExecPlanFragmentParams> params); -template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, - std::vector<TPipelineFragmentParams> params); - } // namespace io } // namespace doris diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h index f1d2e523652..eb63c63f56d 100644 --- a/be/src/io/fs/multi_table_pipe.h +++ b/be/src/io/fs/multi_table_pipe.h @@ -67,8 +67,7 @@ private: // [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb); - template <typename ExecParam> - Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params); + Status exec_plans(ExecEnv* exec_env, const std::vector<TPipelineFragmentParams>& params); void _set_consume_finished() { _consume_finished.store(true, std::memory_order_release); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5f079cfa2c7..fcdee1adcad 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -609,7 +609,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, } Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, - const QuerySource query_source) { + const QuerySource query_source, + const TPipelineFragmentParamsList& parent) { if (params.txn_conf.need_txn) { std::shared_ptr<StreamLoadContext> stream_load_ctx = std::make_shared<StreamLoadContext>(_exec_env); @@ -638,10 +639,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, RETURN_IF_ERROR( _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx)); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); + RETURN_IF_ERROR( + _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx, parent)); return Status::OK(); } else { - return exec_plan_fragment(params, query_source, empty_function); + return exec_plan_fragment(params, query_source, empty_function, parent); } } @@ -820,7 +822,8 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { } Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, - QuerySource query_source, const FinishCallback& cb) { + QuerySource query_source, const FinishCallback& cb, + const TPipelineFragmentParamsList& parent) { VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog @@ -852,20 +855,20 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed", { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); }); - - const auto& local_param = params.local_params[0]; - if (local_param.__isset.runtime_filter_params && - !local_param.runtime_filter_params.rid_to_runtime_filter.empty()) { - auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>( - RuntimeFilterParamsContext::create(context->get_runtime_state())); - RETURN_IF_ERROR(handler->init(params.query_id, local_param.runtime_filter_params)); - query_ctx->set_merge_controller_handler(handler); - - query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_param.runtime_filter_params); - } - if (local_param.__isset.topn_filter_descs) { - query_ctx->init_runtime_predicates(local_param.topn_filter_descs); + if (parent.__isset.runtime_filter_info) { + auto info = parent.runtime_filter_info; + if (info.__isset.runtime_filter_params && + !info.runtime_filter_params.rid_to_runtime_filter.empty()) { + auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>( + RuntimeFilterParamsContext::create(context->get_runtime_state())); + RETURN_IF_ERROR(handler->init(params.query_id, info.runtime_filter_params)); + query_ctx->set_merge_controller_handler(handler); + + query_ctx->runtime_filter_mgr()->set_runtime_filter_params(info.runtime_filter_params); + } + if (info.__isset.topn_filter_descs) { + query_ctx->init_runtime_predicates(info.topn_filter_descs); + } } { @@ -1271,7 +1274,10 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, exec_fragment_params.__set_query_options(query_options); VLOG_ROW << "external exec_plan_fragment params is " << apache::thrift::ThriftDebugString(exec_fragment_params).c_str(); - return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR); + + // external_plan only support single table scan, so parent is empty + TPipelineFragmentParamsList parent; + return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR, parent); } Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 88c822628b1..eae79c70cc0 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -122,7 +122,8 @@ public: // execute one plan fragment Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type); - Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type); + Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, + const TPipelineFragmentParamsList& parent); void remove_pipeline_context(std::pair<TUniqueId, int> key); @@ -131,7 +132,7 @@ public: const FinishCallback& cb); Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, - const FinishCallback& cb); + const FinishCallback& cb, const TPipelineFragmentParamsList& parent); Status start_query_execution(const PExecPlanFragmentStartRequest* request); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 87ad1c975f4..70d250deba1 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -588,8 +588,10 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, << ", st=" << finish_st.to_string(); } }; - return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, - QuerySource::GROUP_COMMIT_LOAD, finish_cb); + + TPipelineFragmentParamsList mocked; + return _exec_env->fragment_mgr()->exec_plan_fragment( + pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb, mocked); } Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index d24146a4499..582283b7858 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -396,7 +396,8 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, // only for normal load, single-stream-multi-table load will be planned during consuming #ifndef BE_TEST // execute plan fragment, async - HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx), + TPipelineFragmentParamsList mocked; + HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked), "failed to execute plan fragment"); #else // only for test diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 48682a21677..cdea553d896 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -65,7 +65,8 @@ bvar::LatencyRecorder g_stream_load_begin_txn_latency("stream_load", "begin_txn" bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precommit_txn"); bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); -Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx) { +Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx, + const TPipelineFragmentParamsList& parent) { // submit this params #ifndef BE_TEST ctx->start_write_data_nanos = MonotonicNanos(); @@ -146,8 +147,8 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, QuerySource::STREAM_LOAD, exec_fragment); } else { - st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, - QuerySource::STREAM_LOAD, exec_fragment); + st = _exec_env->fragment_mgr()->exec_plan_fragment( + ctx->put_result.pipeline_params, QuerySource::STREAM_LOAD, exec_fragment, parent); } if (!st.ok()) { diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 1364bbbf31b..3472ae5a200 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -17,6 +17,8 @@ #pragma once +#include <gen_cpp/PaloInternalService_types.h> + #include <memory> #include "common/factory_creator.h" @@ -49,7 +51,8 @@ public: virtual void rollback_txn(StreamLoadContext* ctx); - Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx); + Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx, + const TPipelineFragmentParamsList& parent); protected: // collect the load statistics from context and set them to stat diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 86c42b7c4b7..c863afe057d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -579,13 +579,27 @@ Status PInternalService::_exec_plan_fragment_impl( } MonotonicStopWatch timer; timer.start(); + + // work for old version frontend + if (!t_request.__isset.runtime_filter_info) { + TRuntimeFilterInfo runtime_filter_info; + auto local_param = fragment_list[0].local_params[0]; + if (local_param.__isset.runtime_filter_params) { + runtime_filter_info.__set_runtime_filter_params(local_param.runtime_filter_params); + } + if (local_param.__isset.topn_filter_descs) { + runtime_filter_info.__set_topn_filter_descs(local_param.topn_filter_descs); + } + t_request.__set_runtime_filter_info(runtime_filter_info); + } + for (const TPipelineFragmentParams& fragment : fragment_list) { if (cb) { RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( - fragment, QuerySource::INTERNAL_FRONTEND, cb)); + fragment, QuerySource::INTERNAL_FRONTEND, cb, t_request)); } else { RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( - fragment, QuerySource::INTERNAL_FRONTEND)); + fragment, QuerySource::INTERNAL_FRONTEND, t_request)); } } timer.stop(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org