This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new acd59d27ccf [pipeline](load) Not blocking in execution threads (#36291)
acd59d27ccf is described below

commit acd59d27ccffb02cc12d9bd02ab766e29bbad267
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Jun 18 18:54:35 2024 +0800

    [pipeline](load) Not blocking in execution threads (#36291)
    
    In group-commit loading tasks, we will wait for a condition variables in
    execution threads before finishing. This is harmful for pipeline engine.
---
 .../exec/group_commit_block_sink_operator.cpp      | 67 ++++++++++++++--------
 .../exec/group_commit_block_sink_operator.h        |  9 ++-
 be/src/runtime/group_commit_mgr.cpp                | 14 ++++-
 be/src/runtime/group_commit_mgr.h                  | 21 ++++++-
 4 files changed, 83 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 50b314a7dc1..99fd6ef20ab 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -64,40 +64,27 @@ Status GroupCommitBlockSinkLocalState::close(RuntimeState* 
state, Status close_s
     SCOPED_TIMER(_close_timer);
     RETURN_IF_ERROR(Base::close(state, close_status));
     RETURN_IF_ERROR(close_status);
-    int64_t total_rows = state->num_rows_load_total();
-    int64_t loaded_rows = state->num_rows_load_total();
-    state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
-                                   state->num_rows_load_filtered());
-    state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() 
+ total_rows -
-                                         loaded_rows);
-    auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
-    if (!_is_block_appended) {
-        // if not meet the max_filter_ratio, we should return error status 
directly
-        int64_t num_selected_rows =
-                state->num_rows_load_total() - 
state->num_rows_load_unselected();
-        if (num_selected_rows > 0 &&
-            (double)state->num_rows_load_filtered() / num_selected_rows > 
p._max_filter_ratio) {
-            return Status::DataQualityError("too many filtered rows");
-        }
-        RETURN_IF_ERROR(_add_blocks(state, true));
-    }
-    if (_load_block_queue) {
-        _remove_estimated_wal_bytes();
-        _load_block_queue->remove_load_id(p._load_id);
-    }
     // wait to wal
     auto st = Status::OK();
     if (_load_block_queue && 
(_load_block_queue->wait_internal_group_commit_finish ||
                               _group_commit_mode == 
TGroupCommitMode::SYNC_MODE)) {
         std::unique_lock l(_load_block_queue->mutex);
         if (!_load_block_queue->process_finish) {
-            _load_block_queue->internal_group_commit_finish_cv.wait(l);
+            return Status::InternalError("_load_block_queue is not finished!");
         }
         st = _load_block_queue->status;
     }
     return st;
 }
 
+std::string GroupCommitBlockSinkLocalState::debug_string(int 
indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, ", _load_block_queue: ({})",
+                   _load_block_queue ? _load_block_queue->debug_string() : 
"NULL");
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
                                                   
std::shared_ptr<vectorized::Block> block) {
     if (block->rows() == 0) {
@@ -200,6 +187,10 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
                     _estimated_wal_bytes = estimated_wal_bytes;
                 }
             }
+            if (_load_block_queue->wait_internal_group_commit_finish ||
+                _group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+                _load_block_queue->append_dependency(_finish_dependency);
+            }
             _state->set_import_label(_load_block_queue->label);
             _state->set_wal_id(_load_block_queue->txn_id);
         } else {
@@ -233,6 +224,7 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
 }
 
 Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
+    RETURN_IF_ERROR(Base::init(t_sink));
     DCHECK(t_sink.__isset.olap_table_sink);
     auto& table_sink = t_sink.olap_table_sink;
     _tuple_desc_id = table_sink.tuple_id;
@@ -274,10 +266,35 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* 
state, vectorized::Bloc
     SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
     Status status = Status::OK();
 
+    auto wind_up = [&]() -> Status {
+        if (eos) {
+            int64_t total_rows = state->num_rows_load_total();
+            int64_t loaded_rows = state->num_rows_load_total();
+            state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
+                                           state->num_rows_load_filtered());
+            
state->update_num_rows_load_filtered(local_state._block_convertor->num_filtered_rows()
 +
+                                                 total_rows - loaded_rows);
+            if (!local_state._is_block_appended) {
+                // if not meet the max_filter_ratio, we should return error 
status directly
+                int64_t num_selected_rows =
+                        state->num_rows_load_total() - 
state->num_rows_load_unselected();
+                if (num_selected_rows > 0 &&
+                    (double)state->num_rows_load_filtered() / 
num_selected_rows >
+                            _max_filter_ratio) {
+                    return Status::DataQualityError("too many filtered rows");
+                }
+                RETURN_IF_ERROR(local_state._add_blocks(state, true));
+            }
+            local_state._remove_estimated_wal_bytes();
+            local_state._load_block_queue->remove_load_id(_load_id);
+        }
+        return Status::OK();
+    };
+
     auto rows = input_block->rows();
     auto bytes = input_block->bytes();
     if (UNLIKELY(rows == 0)) {
-        return status;
+        return wind_up();
     }
 
     // update incrementally so that FE can get the progress.
@@ -324,7 +341,9 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* 
state, vectorized::Bloc
         block->swap(res_block.to_block());
     }
     // add block into block queue
-    return local_state._add_block(state, block);
+    RETURN_IF_ERROR(local_state._add_block(state, block));
+
+    return wind_up();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index b65326725b6..f26e65b97da 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -34,13 +34,19 @@ class GroupCommitBlockSinkLocalState final : public 
PipelineXSinkLocalState<Basi
 
 public:
     GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
-            : Base(parent, state), _filter_bitmap(1024) {}
+            : Base(parent, state), _filter_bitmap(1024) {
+        _finish_dependency =
+                std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                             parent->get_name() + 
"_FINISH_DEPENDENCY", true);
+    }
 
     ~GroupCommitBlockSinkLocalState() override;
 
     Status open(RuntimeState* state) override;
 
     Status close(RuntimeState* state, Status exec_status) override;
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+    std::string debug_string(int indentation_level) const override;
 
 private:
     friend class GroupCommitBlockSinkOperatorX;
@@ -66,6 +72,7 @@ private:
     TGroupCommitMode::type _group_commit_mode;
     Bitmap _filter_bitmap;
     int64_t _table_id;
+    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 06cf494c842..6e81e3ae4c3 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -26,6 +26,7 @@
 #include "common/compiler_util.h"
 #include "common/config.h"
 #include "common/status.h"
+#include "pipeline/dependency.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "util/debug_points.h"
@@ -459,8 +460,10 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
             {
                 std::unique_lock l2(load_block_queue->mutex);
                 load_block_queue->process_finish = true;
+                for (auto dep : load_block_queue->dependencies) {
+                    dep->set_always_ready();
+                }
             }
-            load_block_queue->internal_group_commit_finish_cv.notify_all();
         }
         _load_block_queues.erase(instance_id);
     }
@@ -616,6 +619,15 @@ Status LoadBlockQueue::close_wal() {
     return Status::OK();
 }
 
+void LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency> 
finish_dep) {
+    std::lock_guard<std::mutex> lock(mutex);
+    // If not finished, dependencies should be blocked.
+    if (!process_finish) {
+        finish_dep->block();
+        dependencies.push_back(finish_dep);
+    }
+}
+
 bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
     DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { 
return false; });
     auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index c41f6abd6fe..65f9f09670c 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -41,6 +41,10 @@ class ExecEnv;
 class TUniqueId;
 class RuntimeState;
 
+namespace pipeline {
+class Dependency;
+}
+
 struct BlockData {
     BlockData(const std::shared_ptr<vectorized::Block>& block)
             : block(block), block_bytes(block->bytes()) {};
@@ -79,6 +83,19 @@ public:
                       int be_exe_version);
     Status close_wal();
     bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
+    void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
+
+    std::string debug_string() const {
+        fmt::memory_buffer debug_string_buffer;
+        fmt::format_to(debug_string_buffer,
+                       "load_instance_id={}, label={}, txn_id={}, "
+                       "wait_internal_group_commit_finish={}, 
data_size_condition={}, "
+                       "group_commit_load_count={}, process_finish={}",
+                       load_instance_id.to_string(), label, txn_id,
+                       wait_internal_group_commit_finish, data_size_condition,
+                       group_commit_load_count, process_finish.load());
+        return fmt::to_string(debug_string_buffer);
+    }
 
     UniqueId load_instance_id;
     std::string label;
@@ -92,9 +109,9 @@ public:
 
     // the execute status of this internal group commit
     std::mutex mutex;
-    std::condition_variable internal_group_commit_finish_cv;
-    bool process_finish = false;
+    std::atomic<bool> process_finish = false;
     Status status = Status::OK();
+    std::vector<std::shared_ptr<pipeline::Dependency>> dependencies;
 
 private:
     void _cancel_without_lock(const Status& st);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to