This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b013f8006d [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729) b013f8006d is described below commit b013f8006d74c8dc159c9e10a0ade6e27a16519a Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Fri Jul 14 12:16:32 2023 +0800 [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729) --- be/src/io/file_factory.cpp | 27 +++++--- be/src/io/file_factory.h | 2 +- be/src/io/fs/multi_table_pipe.cpp | 85 +++++++++++++++++++------ be/src/io/fs/multi_table_pipe.h | 3 + be/src/io/fs/stream_load_pipe.cpp | 7 ++ be/src/io/fs/stream_load_pipe.h | 3 + be/src/pipeline/pipeline_fragment_context.cpp | 6 +- be/src/runtime/runtime_state.cpp | 6 +- be/src/runtime/runtime_state.h | 9 ++- be/src/vec/exec/format/csv/csv_reader.cpp | 3 +- be/src/vec/exec/format/json/new_json_reader.cpp | 3 +- 11 files changed, 114 insertions(+), 40 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index adc02a0aa2..d46d2c5b4c 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -144,7 +144,7 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr // file scan node/stream load pipe Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, - const TUniqueId& fragment_instance_id) { + RuntimeState* runtime_state) { auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); @@ -152,15 +152,26 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS *file_reader = stream_load_ctx->pipe; - if (file_reader->get() != nullptr) { - auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader); - if (multi_table_pipe != nullptr) { - *file_reader = multi_table_pipe->getPipe(fragment_instance_id); - LOG(INFO) << "create pipe reader for fragment instance: " << fragment_instance_id - << " pipe: " << (*file_reader).get(); - } + if (file_reader->get() == nullptr) { + return Status::OK(); } + auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader); + if (multi_table_pipe == nullptr || runtime_state == nullptr) { + return Status::OK(); + } + + TUniqueId pipe_id; + if (runtime_state->enable_pipeline_exec()) { + pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(), + runtime_state->fragment_id()); + } else { + pipe_id = runtime_state->fragment_instance_id(); + } + *file_reader = multi_table_pipe->getPipe(pipe_id); + LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id + << " pipe: " << (*file_reader).get(); + return Status::OK(); } diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 42589df531..a311b8d58b 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -66,7 +66,7 @@ public: // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, - const TUniqueId& fragment_instance_id); + RuntimeState* runtime_state); static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd, const io::FileReaderOptions& reader_options, diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 853baa4f12..12de3fa8d4 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -20,11 +20,15 @@ #include <gen_cpp/FrontendService.h> #include <gen_cpp/FrontendService_types.h> #include <gen_cpp/HeartbeatService_types.h> +#include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Types_types.h> #include <thrift/protocol/TDebugProtocol.h> +#include <type_traits> + #include "common/status.h" #include "runtime/client_cache.h" +#include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" #include "runtime/stream_load/new_load_stream_mgr.h" @@ -130,7 +134,9 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size #ifndef BE_TEST Status MultiTablePipe::request_and_exec_plans() { - if (_unplanned_pipes.empty()) return Status::OK(); + if (_unplanned_pipes.empty()) { + return Status::OK(); + } // get list of table names in unplanned pipes std::vector<std::string> tables; @@ -175,24 +181,52 @@ Status MultiTablePipe::request_and_exec_plans() { return plan_status; } + Status st; + if (_ctx->multi_table_put_result.__isset.params && + !_ctx->multi_table_put_result.__isset.pipeline_params) { + st = exec_plans(exec_env, _ctx->multi_table_put_result.params); + } else if (!_ctx->multi_table_put_result.__isset.params && + _ctx->multi_table_put_result.__isset.pipeline_params) { + st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params); + } else { + return Status::Aborted("too many or too few params are set in multi_table_put_result."); + } + + return st; +} + +template <typename ExecParam> +Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) { // put unplanned pipes into planned pipes and clear unplanned pipes for (auto& pipe : _unplanned_pipes) { _ctx->table_list.push_back(pipe.first); _planned_pipes.emplace(pipe.first, pipe.second); } LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}", - _unplanned_pipes.size(), _planned_pipes.size(), - _ctx->multi_table_put_result.params.size()); + _unplanned_pipes.size(), _planned_pipes.size(), params.size()); _unplanned_pipes.clear(); - _inflight_plan_cnt += _ctx->multi_table_put_result.params.size(); - for (auto& plan : _ctx->multi_table_put_result.params) { - // TODO: use pipeline in the future (currently is buggy for load) - DCHECK_EQ(plan.__isset.table_name, true); - DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end()); - putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]); - LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id - << " table=" << plan.table_name; + _inflight_plan_cnt += params.size(); + for (auto& plan : params) { + if (!plan.__isset.table_name || + _planned_pipes.find(plan.table_name) == _planned_pipes.end()) { + return Status::Aborted("Missing vital param: table_name"); + } + + if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) { + putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]); + LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id + << " table=" << plan.table_name; + } else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) { + auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id); + putPipe(pipe_id, _planned_pipes[plan.table_name]); + LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name; + } else { + LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or " + "`TPipelineFragmentParams`, will crash"; + CHECK(false); + } + exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state, Status* status) { { @@ -243,6 +277,7 @@ Status MultiTablePipe::request_and_exec_plans() { return Status::OK(); } + #else Status MultiTablePipe::request_and_exec_plans() { // put unplanned pipes into planned pipes @@ -254,36 +289,46 @@ Status MultiTablePipe::request_and_exec_plans() { _unplanned_pipes.clear(); return Status::OK(); } + +template <typename ExecParam> +Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) { + return Status::OK(); +} + #endif -Status MultiTablePipe::putPipe(const TUniqueId& fragment_instance_id, - std::shared_ptr<io::StreamLoadPipe> pipe) { +Status MultiTablePipe::putPipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe) { std::lock_guard<std::mutex> l(_pipe_map_lock); - auto it = _pipe_map.find(fragment_instance_id); + auto it = _pipe_map.find(pipe_id); if (it != std::end(_pipe_map)) { return Status::InternalError("id already exist"); } - _pipe_map.emplace(fragment_instance_id, pipe); + _pipe_map.emplace(pipe_id, pipe); return Status::OK(); } -std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& fragment_instance_id) { +std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& pipe_id) { std::lock_guard<std::mutex> l(_pipe_map_lock); - auto it = _pipe_map.find(fragment_instance_id); + auto it = _pipe_map.find(pipe_id); if (it == std::end(_pipe_map)) { return std::shared_ptr<io::StreamLoadPipe>(nullptr); } return it->second; } -void MultiTablePipe::removePipe(const TUniqueId& fragment_instance_id) { +void MultiTablePipe::removePipe(const TUniqueId& pipe_id) { std::lock_guard<std::mutex> l(_pipe_map_lock); - auto it = _pipe_map.find(fragment_instance_id); + auto it = _pipe_map.find(pipe_id); if (it != std::end(_pipe_map)) { _pipe_map.erase(it); - VLOG_NOTICE << "remove stream load pipe: " << fragment_instance_id; + VLOG_NOTICE << "remove stream load pipe: " << pipe_id; } } +template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, + std::vector<TExecPlanFragmentParams> params); +template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, + std::vector<TPipelineFragmentParams> params); + } // namespace io } // namespace doris \ No newline at end of file diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h index b225ea3061..fd30722256 100644 --- a/be/src/io/fs/multi_table_pipe.h +++ b/be/src/io/fs/multi_table_pipe.h @@ -68,6 +68,9 @@ private: // [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb); + template <typename ExecParam> + Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params); + private: std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes; std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes; diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index d79cfd028d..23a4e3c604 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -250,5 +250,12 @@ void StreamLoadPipe::cancel(const std::string& reason) { _put_cond.notify_all(); } +TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id) { + TUniqueId pipe_id; + pipe_id.lo = query_id.lo + fragment_id; + pipe_id.hi = query_id.hi; + return pipe_id; +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index 848175ce9a..a184cf9e78 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -78,6 +78,9 @@ public: size_t get_queue_size() { return _buf_queue.size(); } + // used for pipeline load, which use TUniqueId(lo: query_id.lo + fragment_id, hi: query_id.hi) as pipe_id + static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id); + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7e85b19205..48316a5008 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -217,9 +217,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re .tag("pthread_id", (uintptr_t)pthread_self()); // 1. init _runtime_state - _runtime_state = - RuntimeState::create_unique(local_params, request.query_id, request.query_options, - _query_ctx->query_globals, _exec_env); + _runtime_state = RuntimeState::create_unique(local_params, request.query_id, + request.fragment_id, request.query_options, + _query_ctx->query_globals, _exec_env); _runtime_state->set_query_ctx(_query_ctx.get()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); _runtime_state->set_tracer(std::move(tracer)); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 66bc4a5743..28de6c752e 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -99,8 +99,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, } RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, - const TUniqueId& query_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env) + const TUniqueId& query_id, int32_t fragment_id, + const TQueryOptions& query_options, const TQueryGlobals& query_globals, + ExecEnv* exec_env) : _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)), _load_channel_profile("<unnamed>"), _obj_pool(new ObjectPool()), @@ -108,6 +109,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _query_id(query_id), + _fragment_id(fragment_id), _is_cancelled(false), _per_fragment_instance_idx(0), _num_rows_load_total(0), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index d1b90579e2..89543e492b 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -39,6 +39,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/factory_creator.h" #include "common/status.h" +#include "gutil/integral_types.h" #include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" @@ -66,8 +67,8 @@ public: ExecEnv* exec_env); RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id, - const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env); + int32 fragment_id, const TQueryOptions& query_options, + const TQueryGlobals& query_globals, ExecEnv* exec_env); // RuntimeState for executing expr in fe-support. RuntimeState(const TQueryGlobals& query_globals); @@ -115,6 +116,8 @@ public: const std::string& user() const { return _user; } const TUniqueId& query_id() const { return _query_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + // should only be called in pipeline engine + int32_t fragment_id() const { return _fragment_id; } ExecEnv* exec_env() { return _exec_env; } std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const; @@ -460,6 +463,8 @@ private: cctz::time_zone _timezone_obj; TUniqueId _query_id; + // fragment id for each TPipelineFragmentParams + int32_t _fragment_id; TUniqueId _fragment_instance_id; TQueryOptions _query_options; ExecEnv* _exec_env = nullptr; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 7390d9103a..9da29ed6dc 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -166,8 +166,7 @@ Status CsvReader::init_reader(bool is_load) { _file_description.start_offset = start_offset; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, - _state->fragment_instance_id())); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state)); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index f6eabaa7cd..7d17f650c6 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -378,8 +378,7 @@ Status NewJsonReader::_open_file_reader() { _file_description.start_offset = start_offset; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, - _state->fragment_instance_id())); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state)); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org