This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6ef9ed08aadcf263c4467ea87bccddfa81696a78 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Wed Jan 17 13:57:53 2024 +0800 [fix](multi-table-load) fix multi table load can not finish (#29957) --- be/src/io/fs/multi_table_pipe.cpp | 32 ++++++++++++---------- be/src/io/fs/multi_table_pipe.h | 19 ++++++++++--- .../routine_load/routine_load_task_executor.cpp | 2 +- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 36976a23972..916f8151739 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -209,7 +209,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para _unplanned_pipes.size(), _planned_pipes.size(), params.size()); _unplanned_pipes.clear(); - _inflight_plan_cnt += params.size(); + _inflight_cnt += params.size(); for (auto& plan : params) { if (!plan.__isset.table_name || _planned_pipes.find(plan.table_name) == _planned_pipes.end()) { @@ -263,20 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para _status = *status; } - --_inflight_plan_cnt; - if (_inflight_plan_cnt == 0 && is_consume_finished()) { - _ctx->number_total_rows = _number_total_rows; - _ctx->number_loaded_rows = _number_loaded_rows; - _ctx->number_filtered_rows = _number_filtered_rows; - _ctx->number_unselected_rows = _number_unselected_rows; - _ctx->commit_infos = _tablet_commit_infos; - LOG(INFO) << "all plan for multi-table load complete. number_total_rows=" - << _ctx->number_total_rows - << " number_loaded_rows=" << _ctx->number_loaded_rows - << " number_filtered_rows=" << _ctx->number_filtered_rows - << " number_unselected_rows=" << _ctx->number_unselected_rows; - _ctx->promise.set_value( - _status); // when all done, finish the routine load task + auto inflight_cnt = _inflight_cnt.fetch_sub(1); + if (inflight_cnt == 1 && is_consume_finished()) { + _handle_consumer_finished(); } })); } @@ -303,6 +292,19 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para #endif +void MultiTablePipe::_handle_consumer_finished() { + _ctx->number_total_rows = _number_total_rows; + _ctx->number_loaded_rows = _number_loaded_rows; + _ctx->number_filtered_rows = _number_filtered_rows; + _ctx->number_unselected_rows = _number_unselected_rows; + _ctx->commit_infos = _tablet_commit_infos; + LOG(INFO) << "all plan for multi-table load complete. number_total_rows=" + << _ctx->number_total_rows << " number_loaded_rows=" << _ctx->number_loaded_rows + << " number_filtered_rows=" << _ctx->number_filtered_rows + << " number_unselected_rows=" << _ctx->number_unselected_rows; + _ctx->promise.set_value(_status); // when all done, finish the routine load task +} + Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe) { std::lock_guard<std::mutex> l(_pipe_map_lock); diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h index 694794638af..36f6ec68b17 100644 --- a/be/src/io/fs/multi_table_pipe.h +++ b/be/src/io/fs/multi_table_pipe.h @@ -46,7 +46,13 @@ public: // request and execute plans for unplanned pipes Status request_and_exec_plans(); - void set_consume_finished() { _consume_finished.store(true, std::memory_order_release); } + void handle_consume_finished() { + _set_consume_finished(); + auto inflight_cnt = _inflight_cnt.fetch_sub(1); + if (inflight_cnt == 1) { + _handle_consumer_finished(); + } + } bool is_consume_finished() { return _consume_finished.load(std::memory_order_acquire); } @@ -71,25 +77,30 @@ private: template <typename ExecParam> Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params); + void _set_consume_finished() { _consume_finished.store(true, std::memory_order_release); } + + void _handle_consumer_finished(); + private: std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes; std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes; std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold - std::atomic<uint64_t> _inflight_plan_cnt {0}; // how many plan fragment are executing? + // inflight count, when it is zero, means consume and all plans is finished + std::atomic<uint64_t> _inflight_cnt {1}; std::atomic<bool> _consume_finished {false}; // note: Use raw pointer here to avoid cycle reference with StreamLoadContext. // Life cycle of MultiTablePipe is under control of StreamLoadContext, which means StreamLoadContext is created // before NultiTablePipe and released after it. It is safe to use raw pointer here. StreamLoadContext* _ctx = nullptr; Status _status; // save the first error status of all executing plan fragment -#ifndef BE_TEST + std::mutex _tablet_commit_infos_lock; std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each plan fragment std::atomic<int64_t> _number_total_rows {0}; std::atomic<int64_t> _number_loaded_rows {0}; std::atomic<int64_t> _number_filtered_rows {0}; std::atomic<int64_t> _number_unselected_rows {0}; -#endif + std::mutex _pipe_map_lock; std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index dc6b855cd5a..d22f2bb4a8c 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -350,7 +350,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(), "multi tables task executes plan error"); // need memory order - multi_table_pipe->set_consume_finished(); + multi_table_pipe->handle_consume_finished(); HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org