This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c435c297c5 [Improvement](load) Do no block in group commit sink 
(#36612)
1c435c297c5 is described below

commit 1c435c297c58a3447cb0497ef46471e4d874bb61
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Jun 24 09:29:21 2024 +0800

    [Improvement](load) Do no block in group commit sink (#36612)
    
    Group commit sink operator will create an internal loading task before
    starting. This is a blocking stop now to create task by RPC which is not
    allowed on pipeline engine.
    This PR makes this blocking step a dependency.
---
 .../exec/group_commit_block_sink_operator.cpp      | 80 ++++++++++++--------
 .../exec/group_commit_block_sink_operator.h        |  5 +-
 be/src/runtime/group_commit_mgr.cpp                | 85 +++++++++++-----------
 be/src/runtime/group_commit_mgr.h                  | 22 +++---
 4 files changed, 107 insertions(+), 85 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 5de2e667d4e..3953eb63c4d 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -52,9 +52,28 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* 
state) {
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
     }
+    _write_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  
"GroupCommitBlockSinkDependency", true);
+
+    WARN_IF_ERROR(_initialize_load_queue(), "");
     return Status::OK();
 }
 
+Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
+    auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
+    TUniqueId load_id;
+    load_id.__set_hi(p._load_id.hi);
+    load_id.__set_lo(p._load_id.lo);
+    if (_state->exec_env()->wal_mgr()->is_running()) {
+        
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
+                p._db_id, p._table_id, p._base_schema_version, load_id, 
_load_block_queue,
+                _state->be_exec_version(), _state->query_mem_tracker(), 
_write_dependency));
+        return Status::OK();
+    } else {
+        return Status::InternalError("be is stopping");
+    }
+}
+
 Status GroupCommitBlockSinkLocalState::close(RuntimeState* state, Status 
close_status) {
     if (_closed) {
         return Status::OK();
@@ -79,8 +98,9 @@ Status GroupCommitBlockSinkLocalState::close(RuntimeState* 
state, Status close_s
 std::string GroupCommitBlockSinkLocalState::debug_string(int 
indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
-    fmt::format_to(debug_string_buffer, ", _load_block_queue: ({})",
-                   _load_block_queue ? _load_block_queue->debug_string() : 
"NULL");
+    fmt::format_to(debug_string_buffer, ", _load_block_queue: ({}), 
_base_schema_version: {}",
+                   _load_block_queue ? _load_block_queue->debug_string() : 
"NULL",
+                   
_parent->cast<GroupCommitBlockSinkOperatorX>()._base_schema_version);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -164,37 +184,31 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
     TUniqueId load_id;
     load_id.__set_hi(p._load_id.hi);
     load_id.__set_lo(p._load_id.lo);
-    if (_load_block_queue == nullptr) {
-        if (_state->exec_env()->wal_mgr()->is_running()) {
-            
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
-                    p._db_id, p._table_id, p._base_schema_version, load_id, 
_load_block_queue,
-                    _state->be_exec_version(), _state->query_mem_tracker()));
-            if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
-                size_t estimated_wal_bytes =
-                        
_calculate_estimated_wal_bytes(is_blocks_contain_all_load_data);
-                _group_commit_mode =
-                        
_load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes)
-                                ? TGroupCommitMode::ASYNC_MODE
-                                : TGroupCommitMode::SYNC_MODE;
-                if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
-                    LOG(INFO) << "Load id=" << print_id(_state->query_id())
-                              << ", use group commit label=" << 
_load_block_queue->label
-                              << " will not write wal because wal disk space 
usage reach max "
-                                 "limit. Detail info: "
-                              << 
_state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
-                } else {
-                    _estimated_wal_bytes = estimated_wal_bytes;
-                }
-            }
-            if (_load_block_queue->wait_internal_group_commit_finish ||
-                _group_commit_mode == TGroupCommitMode::SYNC_MODE) {
-                _load_block_queue->append_dependency(_finish_dependency);
+    if (_state->exec_env()->wal_mgr()->is_running()) {
+        if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
+            size_t estimated_wal_bytes =
+                    
_calculate_estimated_wal_bytes(is_blocks_contain_all_load_data);
+            _group_commit_mode = 
_load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes)
+                                         ? TGroupCommitMode::ASYNC_MODE
+                                         : TGroupCommitMode::SYNC_MODE;
+            if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+                LOG(INFO) << "Load id=" << print_id(_state->query_id())
+                          << ", use group commit label=" << 
_load_block_queue->label
+                          << " will not write wal because wal disk space usage 
reach max "
+                             "limit. Detail info: "
+                          << 
_state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
+            } else {
+                _estimated_wal_bytes = estimated_wal_bytes;
             }
-            _state->set_import_label(_load_block_queue->label);
-            _state->set_wal_id(_load_block_queue->txn_id);
-        } else {
-            return Status::InternalError("be is stopping");
         }
+        if (_load_block_queue->wait_internal_group_commit_finish ||
+            _group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+            _load_block_queue->append_dependency(_finish_dependency);
+        }
+        _state->set_import_label(_load_block_queue->label);
+        _state->set_wal_id(_load_block_queue->txn_id);
+    } else {
+        return Status::InternalError("be is stopping");
     }
     for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
         RETURN_IF_ERROR(_load_block_queue->add_block(
@@ -263,6 +277,10 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* 
state, vectorized::Bloc
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
     SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
+    if (!local_state._load_block_queue) {
+        RETURN_IF_ERROR(local_state._initialize_load_queue());
+    }
+    DCHECK(local_state._load_block_queue);
     Status status = Status::OK();
 
     auto wind_up = [&]() -> Status {
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index f26e65b97da..27e344deca6 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -46,6 +46,7 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+    std::vector<Dependency*> dependencies() const override { return 
{_write_dependency.get()}; }
     std::string debug_string(int indentation_level) const override;
 
 private:
@@ -54,12 +55,13 @@ private:
     Status _add_blocks(RuntimeState* state, bool 
is_blocks_contain_all_load_data);
     size_t _calculate_estimated_wal_bytes(bool 
is_blocks_contain_all_load_data);
     void _remove_estimated_wal_bytes();
+    Status _initialize_load_queue();
 
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
     std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
 
-    std::shared_ptr<LoadBlockQueue> _load_block_queue;
+    std::shared_ptr<LoadBlockQueue> _load_block_queue = nullptr;
     // used to calculate if meet the max filter ratio
     std::vector<std::shared_ptr<vectorized::Block>> _blocks;
     bool _is_block_appended = false;
@@ -73,6 +75,7 @@ private:
     Bitmap _filter_bitmap;
     int64_t _table_id;
     std::shared_ptr<Dependency> _finish_dependency;
+    std::shared_ptr<Dependency> _write_dependency = nullptr;
 };
 
 class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index d21535d6351..d5e2651fd4d 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -250,46 +250,48 @@ void LoadBlockQueue::_cancel_without_lock(const Status& 
st) {
 Status GroupCommitTable::get_first_block_load_queue(
         int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
-        std::shared_ptr<MemTrackerLimiter> mem_tracker) {
+        std::shared_ptr<MemTrackerLimiter> mem_tracker, 
std::shared_ptr<pipeline::Dependency> dep) {
     DCHECK(table_id == _table_id);
-    {
-        std::unique_lock l(_lock);
-        for (int i = 0; i < 3; i++) {
-            bool is_schema_version_match = true;
-            for (const auto& [_, inner_block_queue] : _load_block_queues) {
-                if (!inner_block_queue->need_commit()) {
-                    if (base_schema_version == 
inner_block_queue->schema_version) {
-                        if (inner_block_queue->add_load_id(load_id).ok()) {
-                            load_block_queue = inner_block_queue;
-                            return Status::OK();
-                        }
-                    } else if (base_schema_version < 
inner_block_queue->schema_version) {
-                        is_schema_version_match = false;
+    std::unique_lock l(_lock);
+    auto try_to_get_matched_queue = [&]() -> Status {
+        for (const auto& [_, inner_block_queue] : _load_block_queues) {
+            if (!inner_block_queue->need_commit()) {
+                if (base_schema_version == inner_block_queue->schema_version) {
+                    if (inner_block_queue->add_load_id(load_id).ok()) {
+                        load_block_queue = inner_block_queue;
+                        return Status::OK();
                     }
+                } else {
+                    return Status::DataQualityError<false>(
+                            "schema version not match, maybe a schema change 
is in process. "
+                            "Please "
+                            "retry this load manually.");
                 }
             }
-            if (!is_schema_version_match) {
-                return Status::DataQualityError<false>(
-                        "schema version not match, maybe a schema change is in 
process. Please "
-                        "retry this load manually.");
-            }
-            if (!_is_creating_plan_fragment) {
-                _is_creating_plan_fragment = true;
-                RETURN_IF_ERROR(_thread_pool->submit_func([this, 
be_exe_version, mem_tracker] {
-                    auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
-                    if (!st.ok()) {
-                        LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
-                        std::unique_lock l(_lock);
-                        _is_creating_plan_fragment = false;
-                        _cv.notify_all();
-                    }
-                }));
-            }
-            _cv.wait_for(l, std::chrono::seconds(4));
         }
+        return Status::InternalError<false>("can not get a block queue for 
table_id: " +
+                                            std::to_string(_table_id));
+    };
+
+    if (try_to_get_matched_queue().ok()) {
+        return Status::OK();
     }
-    return Status::InternalError<false>("can not get a block queue for 
table_id: " +
-                                        std::to_string(_table_id));
+    if (!_is_creating_plan_fragment) {
+        _is_creating_plan_fragment = true;
+        dep->block();
+        RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, 
mem_tracker, dep = dep] {
+            Defer defer {[&, dep = dep]() {
+                dep->set_ready();
+                std::unique_lock l(_lock);
+                _is_creating_plan_fragment = false;
+            }};
+            auto st = _create_group_commit_load(be_exe_version, mem_tracker);
+            if (!st.ok()) {
+                LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
+            }
+        }));
+    }
+    return try_to_get_matched_queue();
 }
 
 Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
@@ -378,8 +380,6 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
                         be_exe_version));
             }
             _load_block_queues.emplace(instance_id, load_block_queue);
-            _is_creating_plan_fragment = false;
-            _cv.notify_all();
         }
     }
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
result.params,
@@ -565,12 +565,10 @@ void GroupCommitMgr::stop() {
     LOG(INFO) << "GroupCommitMgr is stopped";
 }
 
-Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t 
table_id,
-                                                  int64_t base_schema_version,
-                                                  const UniqueId& load_id,
-                                                  
std::shared_ptr<LoadBlockQueue>& load_block_queue,
-                                                  int be_exe_version,
-                                                  
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
+Status GroupCommitMgr::get_first_block_load_queue(
+        int64_t db_id, int64_t table_id, int64_t base_schema_version, const 
UniqueId& load_id,
+        std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
+        std::shared_ptr<MemTrackerLimiter> mem_tracker, 
std::shared_ptr<pipeline::Dependency> dep) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard wlock(_lock);
@@ -582,7 +580,8 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t 
db_id, int64_t table_i
         group_commit_table = _table_map[table_id];
     }
     RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
-            table_id, base_schema_version, load_id, load_block_queue, 
be_exe_version, mem_tracker));
+            table_id, base_schema_version, load_id, load_block_queue, 
be_exe_version, mem_tracker,
+            dep));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 65f9f09670c..679da81f75f 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -87,13 +87,14 @@ public:
 
     std::string debug_string() const {
         fmt::memory_buffer debug_string_buffer;
-        fmt::format_to(debug_string_buffer,
-                       "load_instance_id={}, label={}, txn_id={}, "
-                       "wait_internal_group_commit_finish={}, 
data_size_condition={}, "
-                       "group_commit_load_count={}, process_finish={}",
-                       load_instance_id.to_string(), label, txn_id,
-                       wait_internal_group_commit_finish, data_size_condition,
-                       group_commit_load_count, process_finish.load());
+        fmt::format_to(
+                debug_string_buffer,
+                "load_instance_id={}, label={}, txn_id={}, "
+                "wait_internal_group_commit_finish={}, data_size_condition={}, 
"
+                "group_commit_load_count={}, process_finish={}, 
_need_commit={}, schema_version={}",
+                load_instance_id.to_string(), label, txn_id, 
wait_internal_group_commit_finish,
+                data_size_condition, group_commit_load_count, 
process_finish.load(), _need_commit,
+                schema_version);
         return fmt::to_string(debug_string_buffer);
     }
 
@@ -154,7 +155,8 @@ public:
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
-                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker);
+                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
+                                      std::shared_ptr<pipeline::Dependency> 
dep);
     Status get_load_block_queue(const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
@@ -178,7 +180,6 @@ private:
     int64_t _table_id;
 
     std::mutex _lock;
-    std::condition_variable _cv;
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
     bool _is_creating_plan_fragment = false;
@@ -198,7 +199,8 @@ public:
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
-                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker);
+                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
+                                      std::shared_ptr<pipeline::Dependency> 
dep);
     std::promise<Status> debug_promise;
     std::future<Status> debug_future = debug_promise.get_future();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to