This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6ef9ed08aadcf263c4467ea87bccddfa81696a78
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Wed Jan 17 13:57:53 2024 +0800

    [fix](multi-table-load) fix multi table load can not finish (#29957)
---
 be/src/io/fs/multi_table_pipe.cpp                  | 32 ++++++++++++----------
 be/src/io/fs/multi_table_pipe.h                    | 19 ++++++++++---
 .../routine_load/routine_load_task_executor.cpp    |  2 +-
 3 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 36976a23972..916f8151739 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -209,7 +209,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
                              _unplanned_pipes.size(), _planned_pipes.size(), 
params.size());
     _unplanned_pipes.clear();
 
-    _inflight_plan_cnt += params.size();
+    _inflight_cnt += params.size();
     for (auto& plan : params) {
         if (!plan.__isset.table_name ||
             _planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
@@ -263,20 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
                         _status = *status;
                     }
 
-                    --_inflight_plan_cnt;
-                    if (_inflight_plan_cnt == 0 && is_consume_finished()) {
-                        _ctx->number_total_rows = _number_total_rows;
-                        _ctx->number_loaded_rows = _number_loaded_rows;
-                        _ctx->number_filtered_rows = _number_filtered_rows;
-                        _ctx->number_unselected_rows = _number_unselected_rows;
-                        _ctx->commit_infos = _tablet_commit_infos;
-                        LOG(INFO) << "all plan for multi-table load complete. 
number_total_rows="
-                                  << _ctx->number_total_rows
-                                  << " number_loaded_rows=" << 
_ctx->number_loaded_rows
-                                  << " number_filtered_rows=" << 
_ctx->number_filtered_rows
-                                  << " number_unselected_rows=" << 
_ctx->number_unselected_rows;
-                        _ctx->promise.set_value(
-                                _status); // when all done, finish the routine 
load task
+                    auto inflight_cnt = _inflight_cnt.fetch_sub(1);
+                    if (inflight_cnt == 1 && is_consume_finished()) {
+                        _handle_consumer_finished();
                     }
                 }));
     }
@@ -303,6 +292,19 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
 
 #endif
 
+void MultiTablePipe::_handle_consumer_finished() {
+    _ctx->number_total_rows = _number_total_rows;
+    _ctx->number_loaded_rows = _number_loaded_rows;
+    _ctx->number_filtered_rows = _number_filtered_rows;
+    _ctx->number_unselected_rows = _number_unselected_rows;
+    _ctx->commit_infos = _tablet_commit_infos;
+    LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
+              << _ctx->number_total_rows << " number_loaded_rows=" << 
_ctx->number_loaded_rows
+              << " number_filtered_rows=" << _ctx->number_filtered_rows
+              << " number_unselected_rows=" << _ctx->number_unselected_rows;
+    _ctx->promise.set_value(_status); // when all done, finish the routine 
load task
+}
+
 Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
                                 std::shared_ptr<io::StreamLoadPipe> pipe) {
     std::lock_guard<std::mutex> l(_pipe_map_lock);
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index 694794638af..36f6ec68b17 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -46,7 +46,13 @@ public:
     // request and execute plans for unplanned pipes
     Status request_and_exec_plans();
 
-    void set_consume_finished() { _consume_finished.store(true, 
std::memory_order_release); }
+    void handle_consume_finished() {
+        _set_consume_finished();
+        auto inflight_cnt = _inflight_cnt.fetch_sub(1);
+        if (inflight_cnt == 1) {
+            _handle_consumer_finished();
+        }
+    }
 
     bool is_consume_finished() { return 
_consume_finished.load(std::memory_order_acquire); }
 
@@ -71,25 +77,30 @@ private:
     template <typename ExecParam>
     Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);
 
+    void _set_consume_finished() { _consume_finished.store(true, 
std::memory_order_release); }
+
+    void _handle_consumer_finished();
+
 private:
     std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> 
_planned_pipes;
     std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> 
_unplanned_pipes;
     std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when 
exceed threshold
-    std::atomic<uint64_t> _inflight_plan_cnt {0}; // how many plan fragment 
are executing?
+    // inflight count, when it is zero, means consume and all plans is finished
+    std::atomic<uint64_t> _inflight_cnt {1};
     std::atomic<bool> _consume_finished {false};
     // note: Use raw pointer here to avoid cycle reference with 
StreamLoadContext.
     // Life cycle of MultiTablePipe is under control of StreamLoadContext, 
which means StreamLoadContext is created
     // before NultiTablePipe and released after it. It is safe to use raw 
pointer here.
     StreamLoadContext* _ctx = nullptr;
     Status _status; // save the first error status of all executing plan 
fragment
-#ifndef BE_TEST
+
     std::mutex _tablet_commit_infos_lock;
     std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each 
plan fragment
     std::atomic<int64_t> _number_total_rows {0};
     std::atomic<int64_t> _number_loaded_rows {0};
     std::atomic<int64_t> _number_filtered_rows {0};
     std::atomic<int64_t> _number_unselected_rows {0};
-#endif
+
     std::mutex _pipe_map_lock;
     std::unordered_map<TUniqueId /*instance id*/, 
std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
 
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index dc6b855cd5a..d22f2bb4a8c 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -350,7 +350,7 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
         HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
                      "multi tables task executes plan error");
         // need memory order
-        multi_table_pipe->set_consume_finished();
+        multi_table_pipe->handle_consume_finished();
         HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to