This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new fcfff0d5919 [fix](routine-load) fix be core when partial table load failed #34712 (#35621) fcfff0d5919 is described below commit fcfff0d59196d40ba27723ce1295e96247b1ecce Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Thu May 30 12:59:36 2024 +0800 [fix](routine-load) fix be core when partial table load failed #34712 (#35621) --- be/src/io/fs/multi_table_pipe.cpp | 2 +- .../routine_load/routine_load_task_executor.cpp | 34 ++++++++++++++++++---- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 97f88161bc9..35fa8b02cf9 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -222,7 +222,6 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para _unplanned_pipes.size(), _planned_pipes.size(), params.size()); _unplanned_pipes.clear(); - _inflight_cnt += params.size(); for (auto& plan : params) { if (!plan.__isset.table_name || _planned_pipes.find(plan.table_name) == _planned_pipes.end()) { @@ -243,6 +242,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para CHECK(false); } + _inflight_cnt++; exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state, Status* status) { { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 7226dcfa484..c61b81cdfc8 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -284,6 +284,20 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, } \ } while (false); +#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg) \ + do { \ + Status _status_ = (stmt); \ + if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \ + err_handler(ctx, _status_, err_msg); \ + cb(ctx); \ + _status_ = ctx->future.get(); \ + if (!_status_.ok()) { \ + LOG(ERROR) << "failed to get future, " << ctx->brief(); \ + } \ + return; \ + } \ + } while (false); + LOG(INFO) << "begin to execute routine load task: " << ctx->brief(); // create data consumer group @@ -338,17 +352,27 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink); - // start to consume, this may block a while - HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed"); - if (ctx->is_multi_table) { + Status st; // plan the rest of unplanned data auto multi_table_pipe = std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink); - HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(), - "multi tables task executes plan error"); + // start to consume, this may block a while + st = consumer_grp->start_all(ctx, kafka_pipe); + if (!st.ok()) { + multi_table_pipe->handle_consume_finished(); + HANDLE_MULTI_TABLE_ERROR(st, "consuming failed"); + } + st = multi_table_pipe->request_and_exec_plans(); + if (!st.ok()) { + multi_table_pipe->handle_consume_finished(); + HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan error"); + } // need memory order multi_table_pipe->handle_consume_finished(); HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed"); + } else { + // start to consume, this may block a while + HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed"); } // wait for all consumers finished --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org