This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 08dedc8e3bf [Improvement](load) Do no block in group commit sink 
(#36717)
08dedc8e3bf is described below

commit 08dedc8e3bf17920cbf5d52ee64b943c58f2dd85
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Jun 24 12:19:07 2024 +0800

    [Improvement](load) Do no block in group commit sink (#36717)
    
    Do not rely on a conditional variable in group commit sink
---
 .../exec/group_commit_block_sink_operator.cpp      |  23 ++---
 .../exec/group_commit_block_sink_operator.h        |   7 +-
 be/src/runtime/group_commit_mgr.cpp                | 102 +++++++++++----------
 be/src/runtime/group_commit_mgr.h                  |  14 +--
 4 files changed, 78 insertions(+), 68 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 3953eb63c4d..402354d6f24 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -52,22 +52,21 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* 
state) {
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
     }
-    _write_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                                  
"GroupCommitBlockSinkDependency", true);
-
+    _create_plan_dependency = 
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
+                                                        
"CreateGroupCommitPlanDependency", true);
+    _put_block_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                      
"GroupCommitPutBlockDependency", true);
     WARN_IF_ERROR(_initialize_load_queue(), "");
     return Status::OK();
 }
 
 Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
     auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
-    TUniqueId load_id;
-    load_id.__set_hi(p._load_id.hi);
-    load_id.__set_lo(p._load_id.lo);
     if (_state->exec_env()->wal_mgr()->is_running()) {
         
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
-                p._db_id, p._table_id, p._base_schema_version, load_id, 
_load_block_queue,
-                _state->be_exec_version(), _state->query_mem_tracker(), 
_write_dependency));
+                p._db_id, p._table_id, p._base_schema_version, p._load_id, 
_load_block_queue,
+                _state->be_exec_version(), _state->query_mem_tracker(), 
_create_plan_dependency,
+                _put_block_dependency));
         return Status::OK();
     } else {
         return Status::InternalError("be is stopping");
@@ -138,7 +137,8 @@ Status 
GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
             RETURN_IF_ERROR(_add_blocks(state, false));
         }
         RETURN_IF_ERROR(_load_block_queue->add_block(
-                state, output_block, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
+                state, output_block, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE,
+                _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id));
     }
     return Status::OK();
 }
@@ -181,9 +181,6 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
                                                    bool 
is_blocks_contain_all_load_data) {
     DCHECK(_is_block_appended == false);
     auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
-    TUniqueId load_id;
-    load_id.__set_hi(p._load_id.hi);
-    load_id.__set_lo(p._load_id.lo);
     if (_state->exec_env()->wal_mgr()->is_running()) {
         if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
             size_t estimated_wal_bytes =
@@ -212,7 +209,7 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
     }
     for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
         RETURN_IF_ERROR(_load_block_queue->add_block(
-                state, *it, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
+                state, *it, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE, p._load_id));
     }
     _is_block_appended = true;
     _blocks.clear();
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index 27e344deca6..caf7017d050 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -46,7 +46,9 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
-    std::vector<Dependency*> dependencies() const override { return 
{_write_dependency.get()}; }
+    std::vector<Dependency*> dependencies() const override {
+        return {_create_plan_dependency.get(), _put_block_dependency.get()};
+    }
     std::string debug_string(int indentation_level) const override;
 
 private:
@@ -75,7 +77,8 @@ private:
     Bitmap _filter_bitmap;
     int64_t _table_id;
     std::shared_ptr<Dependency> _finish_dependency;
-    std::shared_ptr<Dependency> _write_dependency = nullptr;
+    std::shared_ptr<Dependency> _create_plan_dependency = nullptr;
+    std::shared_ptr<Dependency> _put_block_dependency = nullptr;
 };
 
 class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index d5e2651fd4d..7a17fd88939 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,28 +35,14 @@
 namespace doris {
 
 Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
-                                 std::shared_ptr<vectorized::Block> block, 
bool write_wal) {
+                                 std::shared_ptr<vectorized::Block> block, 
bool write_wal,
+                                 UniqueId& load_id) {
     std::unique_lock l(mutex);
     RETURN_IF_ERROR(status);
     auto start = std::chrono::steady_clock::now();
     DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", {
         start = std::chrono::steady_clock::now() - 
std::chrono::milliseconds(120000);
     });
-    while (!runtime_state->is_cancelled() && status.ok() &&
-           _all_block_queues_bytes->load(std::memory_order_relaxed) >=
-                   config::group_commit_queue_mem_limit) {
-        _put_cond.wait_for(l,
-                           
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
-        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
-                std::chrono::steady_clock::now() - start);
-        if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT) 
{
-            return Status::TimedOut<false>(
-                    "Wal memory back pressure wait too much time! Load block 
queue txn id: {}, "
-                    "label: {}, instance id: {}, consumed memory: {}",
-                    txn_id, label, load_instance_id.to_string(),
-                    _all_block_queues_bytes->load(std::memory_order_relaxed));
-        }
-    }
     if (UNLIKELY(runtime_state->is_cancelled())) {
         return runtime_state->cancel_reason();
     }
@@ -69,8 +55,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
             _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
             std::stringstream ss;
             ss << "[";
-            for (const auto& id : _load_ids) {
-                ss << id.to_string() << ", ";
+            for (const auto& id : _load_ids_to_write_dep) {
+                ss << id.first.to_string() << ", ";
             }
             ss << "]";
             VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
@@ -92,6 +78,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
                 return st;
             }
         }
+        if (!runtime_state->is_cancelled() && status.ok() &&
+            _all_block_queues_bytes->load(std::memory_order_relaxed) >=
+                    config::group_commit_queue_mem_limit) {
+            DCHECK(_load_ids_to_write_dep.find(load_id) != 
_load_ids_to_write_dep.end());
+            _load_ids_to_write_dep[load_id]->block();
+        }
     }
     if (!_need_commit) {
         if (_data_bytes >= _group_commit_data_bytes) {
@@ -125,7 +117,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         }
     }
     while (!runtime_state->is_cancelled() && status.ok() && 
_block_queue.empty() &&
-           (!_need_commit || (_need_commit && !_load_ids.empty()))) {
+           (!_need_commit || (_need_commit && 
!_load_ids_to_write_dep.empty()))) {
         auto left_milliseconds = _group_commit_interval_ms;
         auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                                 std::chrono::steady_clock::now() - _start_time)
@@ -140,8 +132,8 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
             if (duration >= 10 * _group_commit_interval_ms) {
                 std::stringstream ss;
                 ss << "[";
-                for (auto& id : _load_ids) {
-                    ss << id.to_string() << ", ";
+                for (auto& id : _load_ids_to_write_dep) {
+                    ss << id.first.to_string() << ", ";
                 }
                 ss << "]";
                 LOG(INFO) << "find one group_commit need to commit, txn_id=" 
<< txn_id
@@ -167,8 +159,8 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
         std::stringstream ss;
         ss << "[";
-        for (const auto& id : _load_ids) {
-            ss << id.to_string() << ", ";
+        for (const auto& id : _load_ids_to_write_dep) {
+            ss << id.first.to_string() << ", ";
         }
         ss << "]";
         VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
@@ -183,30 +175,37 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
                    << ", the block is " << block->dump_data() << ", the block 
column size is "
                    << block->columns_bytes();
     }
-    if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
+    if (_block_queue.empty() && _need_commit && 
_load_ids_to_write_dep.empty()) {
         *eos = true;
     } else {
         *eos = false;
     }
-    _put_cond.notify_all();
+    if (_all_block_queues_bytes->load(std::memory_order_relaxed) <
+        config::group_commit_queue_mem_limit) {
+        for (auto& id : _load_ids_to_write_dep) {
+            id.second->set_ready();
+        }
+    }
     return Status::OK();
 }
 
 void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
     std::unique_lock l(mutex);
-    if (_load_ids.find(load_id) != _load_ids.end()) {
-        _load_ids.erase(load_id);
+    if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
+        _load_ids_to_write_dep[load_id]->set_always_ready();
+        _load_ids_to_write_dep.erase(load_id);
         _get_cond.notify_all();
     }
 }
 
-Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
+Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
+                                   const std::shared_ptr<pipeline::Dependency> 
put_block_dep) {
     std::unique_lock l(mutex);
     if (_need_commit) {
         return Status::InternalError<false>("block queue is set need commit, 
id=" +
                                             load_instance_id.to_string());
     }
-    _load_ids.emplace(load_id);
+    _load_ids_to_write_dep[load_id] = put_block_dep;
     group_commit_load_count.fetch_add(1);
     return Status::OK();
 }
@@ -228,8 +227,8 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) 
{
         _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
         std::stringstream ss;
         ss << "[";
-        for (const auto& id : _load_ids) {
-            ss << id.to_string() << ", ";
+        for (const auto& id : _load_ids_to_write_dep) {
+            ss << id.first.to_string() << ", ";
         }
         ss << "]";
         VLOG_DEBUG << "[Group Commit Debug] 
(LoadBlockQueue::_cancel_without_block). "
@@ -245,20 +244,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status& 
st) {
                    << block_data.block->columns_bytes();
         _block_queue.pop_front();
     }
+    for (auto& id : _load_ids_to_write_dep) {
+        id.second->set_always_ready();
+    }
 }
 
 Status GroupCommitTable::get_first_block_load_queue(
         int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
-        std::shared_ptr<MemTrackerLimiter> mem_tracker, 
std::shared_ptr<pipeline::Dependency> dep) {
+        std::shared_ptr<MemTrackerLimiter> mem_tracker,
+        std::shared_ptr<pipeline::Dependency> create_plan_dep,
+        std::shared_ptr<pipeline::Dependency> put_block_dep) {
     DCHECK(table_id == _table_id);
     std::unique_lock l(_lock);
     auto try_to_get_matched_queue = [&]() -> Status {
         for (const auto& [_, inner_block_queue] : _load_block_queues) {
             if (!inner_block_queue->need_commit()) {
                 if (base_schema_version == inner_block_queue->schema_version) {
-                    if (inner_block_queue->add_load_id(load_id).ok()) {
+                    if (inner_block_queue->add_load_id(load_id, 
put_block_dep).ok()) {
                         load_block_queue = inner_block_queue;
+
                         return Status::OK();
                     }
                 } else {
@@ -278,18 +283,19 @@ Status GroupCommitTable::get_first_block_load_queue(
     }
     if (!_is_creating_plan_fragment) {
         _is_creating_plan_fragment = true;
-        dep->block();
-        RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, 
mem_tracker, dep = dep] {
-            Defer defer {[&, dep = dep]() {
-                dep->set_ready();
-                std::unique_lock l(_lock);
-                _is_creating_plan_fragment = false;
-            }};
-            auto st = _create_group_commit_load(be_exe_version, mem_tracker);
-            if (!st.ok()) {
-                LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
-            }
-        }));
+        create_plan_dep->block();
+        RETURN_IF_ERROR(
+                _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep 
= create_plan_dep] {
+                    Defer defer {[&, dep = dep]() {
+                        dep->set_ready();
+                        std::unique_lock l(_lock);
+                        _is_creating_plan_fragment = false;
+                    }};
+                    auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
+                    }
+                }));
     }
     return try_to_get_matched_queue();
 }
@@ -568,7 +574,9 @@ void GroupCommitMgr::stop() {
 Status GroupCommitMgr::get_first_block_load_queue(
         int64_t db_id, int64_t table_id, int64_t base_schema_version, const 
UniqueId& load_id,
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
-        std::shared_ptr<MemTrackerLimiter> mem_tracker, 
std::shared_ptr<pipeline::Dependency> dep) {
+        std::shared_ptr<MemTrackerLimiter> mem_tracker,
+        std::shared_ptr<pipeline::Dependency> create_plan_dep,
+        std::shared_ptr<pipeline::Dependency> put_block_dep) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard wlock(_lock);
@@ -581,7 +589,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
     }
     RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
             table_id, base_schema_version, load_id, load_block_queue, 
be_exe_version, mem_tracker,
-            dep));
+            create_plan_dep, put_block_dep));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 679da81f75f..f290d2aa6bb 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -70,10 +70,11 @@ public:
               _all_block_queues_bytes(all_block_queues_bytes) {};
 
     Status add_block(RuntimeState* runtime_state, 
std::shared_ptr<vectorized::Block> block,
-                     bool write_wal);
+                     bool write_wal, UniqueId& load_id);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
                      bool* eos);
-    Status add_load_id(const UniqueId& load_id);
+    Status add_load_id(const UniqueId& load_id,
+                       const std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     void remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
     bool need_commit() { return _need_commit; }
@@ -118,7 +119,7 @@ private:
     void _cancel_without_lock(const Status& st);
 
     // the set of load ids of all blocks in this queue
-    std::set<UniqueId> _load_ids;
+    std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> 
_load_ids_to_write_dep;
     std::list<BlockData> _block_queue;
 
     // wal
@@ -136,7 +137,6 @@ private:
 
     // memory back pressure, memory consumption of all tables' load block 
queues
     std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
-    std::condition_variable _put_cond;
     std::condition_variable _get_cond;
     static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000;      // 1s
     static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
@@ -156,7 +156,8 @@ public:
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
                                       std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
-                                      std::shared_ptr<pipeline::Dependency> 
dep);
+                                      std::shared_ptr<pipeline::Dependency> 
create_plan_dep,
+                                      std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     Status get_load_block_queue(const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
@@ -200,7 +201,8 @@ public:
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
                                       std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
-                                      std::shared_ptr<pipeline::Dependency> 
dep);
+                                      std::shared_ptr<pipeline::Dependency> 
create_plan_dep,
+                                      std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     std::promise<Status> debug_promise;
     std::future<Status> debug_future = debug_promise.get_future();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to