This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new fcfff0d5919 [fix](routine-load) fix be core when partial table load 
failed #34712 (#35621)
fcfff0d5919 is described below

commit fcfff0d59196d40ba27723ce1295e96247b1ecce
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Thu May 30 12:59:36 2024 +0800

    [fix](routine-load) fix be core when partial table load failed #34712 
(#35621)
---
 be/src/io/fs/multi_table_pipe.cpp                  |  2 +-
 .../routine_load/routine_load_task_executor.cpp    | 34 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 97f88161bc9..35fa8b02cf9 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -222,7 +222,6 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
                              _unplanned_pipes.size(), _planned_pipes.size(), 
params.size());
     _unplanned_pipes.clear();
 
-    _inflight_cnt += params.size();
     for (auto& plan : params) {
         if (!plan.__isset.table_name ||
             _planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
@@ -243,6 +242,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
             CHECK(false);
         }
 
+        _inflight_cnt++;
         exec_env->fragment_mgr()->exec_plan_fragment(plan, 
[this](RuntimeState* state,
                                                                   Status* 
status) {
             {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 7226dcfa484..c61b81cdfc8 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -284,6 +284,20 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
         }                                                                  \
     } while (false);
 
+#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg)                            \
+    do {                                                                   \
+        Status _status_ = (stmt);                                          \
+        if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
+            err_handler(ctx, _status_, err_msg);                           \
+            cb(ctx);                                                       \
+            _status_ = ctx->future.get();                                  \
+            if (!_status_.ok()) {                                          \
+                LOG(ERROR) << "failed to get future, " << ctx->brief();    \
+            }                                                              \
+            return;                                                        \
+        }                                                                  \
+    } while (false);
+
     LOG(INFO) << "begin to execute routine load task: " << ctx->brief();
 
     // create data consumer group
@@ -338,17 +352,27 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
             std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
 
-    // start to consume, this may block a while
-    HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
     if (ctx->is_multi_table) {
+        Status st;
         // plan the rest of unplanned data
         auto multi_table_pipe = 
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
-        HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
-                     "multi tables task executes plan error");
+        // start to consume, this may block a while
+        st = consumer_grp->start_all(ctx, kafka_pipe);
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
+        }
+        st = multi_table_pipe->request_and_exec_plans();
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan 
error");
+        }
         // need memory order
         multi_table_pipe->handle_consume_finished();
         HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
+    } else {
+        // start to consume, this may block a while
+        HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming 
failed");
     }
 
     // wait for all consumers finished


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to