This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e25544fe9a [Improvement](load) Do no block in group commit scan 
operator (#36730)
5e25544fe9a is described below

commit 5e25544fe9ac2b7165e97637a8bc446ca1f69ef2
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Jun 25 10:18:52 2024 +0800

    [Improvement](load) Do no block in group commit scan operator (#36730)
---
 .../pipeline/exec/group_commit_scan_operator.cpp   |  7 ++-
 be/src/pipeline/exec/group_commit_scan_operator.h  |  6 ++
 be/src/runtime/group_commit_mgr.cpp                | 73 +++++++++++++---------
 be/src/runtime/group_commit_mgr.h                  | 10 ++-
 4 files changed, 60 insertions(+), 36 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp 
b/be/src/pipeline/exec/group_commit_scan_operator.cpp
index 5c3f7e84ee8..3e6ad62c5dc 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp
@@ -33,7 +33,8 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
     auto& local_state = get_local_state(state);
     bool find_node = false;
     while (!find_node && !*eos) {
-        RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, 
&find_node, eos));
+        RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, 
&find_node, eos,
+                                                                
local_state._get_block_dependency));
     }
     return Status::OK();
 }
@@ -42,8 +43,10 @@ Status GroupCommitLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(ScanLocalState<GroupCommitLocalState>::init(state, info));
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<GroupCommitOperatorX>();
+    _get_block_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                      
"GroupCommitGetBlockDependency", true);
     return state->exec_env()->group_commit_mgr()->get_load_block_queue(
-            p._table_id, state->fragment_instance_id(), load_block_queue);
+            p._table_id, state->fragment_instance_id(), load_block_queue, 
_get_block_dependency);
 }
 
 Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h 
b/be/src/pipeline/exec/group_commit_scan_operator.h
index b4767d60543..46f50f37724 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.h
+++ b/be/src/pipeline/exec/group_commit_scan_operator.h
@@ -37,9 +37,15 @@ public:
             : ScanLocalState(state, parent) {}
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     std::shared_ptr<LoadBlockQueue> load_block_queue;
+    std::vector<Dependency*> dependencies() const override {
+        return {_scan_dependency.get(), _get_block_dependency.get()};
+    }
 
 private:
+    friend class GroupCommitOperatorX;
     Status _process_conjuncts(RuntimeState* state) override;
+
+    std::shared_ptr<Dependency> _get_block_dependency = nullptr;
 };
 
 class GroupCommitOperatorX final : public ScanOperatorX<GroupCommitLocalState> 
{
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 7a17fd88939..ab11b795ed5 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -100,12 +100,15 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state,
             _need_commit = true;
         }
     }
-    _get_cond.notify_all();
+    for (auto read_dep : _read_deps) {
+        read_dep->set_ready();
+    }
     return Status::OK();
 }
 
 Status LoadBlockQueue::get_block(RuntimeState* runtime_state, 
vectorized::Block* block,
-                                 bool* find_block, bool* eos) {
+                                 bool* find_block, bool* eos,
+                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep) {
     *find_block = false;
     *eos = false;
     std::unique_lock l(mutex);
@@ -116,34 +119,32 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
             _need_commit = true;
         }
     }
-    while (!runtime_state->is_cancelled() && status.ok() && 
_block_queue.empty() &&
-           (!_need_commit || (_need_commit && 
!_load_ids_to_write_dep.empty()))) {
-        auto left_milliseconds = _group_commit_interval_ms;
-        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
-                                std::chrono::steady_clock::now() - _start_time)
-                                .count();
-        if (!_need_commit) {
-            left_milliseconds = _group_commit_interval_ms - duration;
-            if (left_milliseconds <= 0) {
-                _need_commit = true;
-                break;
-            }
+    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+                            std::chrono::steady_clock::now() - _start_time)
+                            .count();
+    if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() 
&& !_need_commit) {
+        if (_group_commit_interval_ms - duration <= 0) {
+            _need_commit = true;
         } else {
-            if (duration >= 10 * _group_commit_interval_ms) {
-                std::stringstream ss;
-                ss << "[";
-                for (auto& id : _load_ids_to_write_dep) {
-                    ss << id.first.to_string() << ", ";
-                }
-                ss << "]";
-                LOG(INFO) << "find one group_commit need to commit, txn_id=" 
<< txn_id
-                          << ", label=" << label << ", instance_id=" << 
load_instance_id
-                          << ", duration=" << duration << ", load_ids=" << 
ss.str()
-                          << ", runtime_state=" << runtime_state;
+            get_block_dep->block();
+            return Status::OK();
+        }
+    } else if (!runtime_state->is_cancelled() && status.ok() && 
_block_queue.empty() &&
+               _need_commit && !_load_ids_to_write_dep.empty()) {
+        if (duration >= 10 * _group_commit_interval_ms) {
+            std::stringstream ss;
+            ss << "[";
+            for (auto& id : _load_ids_to_write_dep) {
+                ss << id.first.to_string() << ", ";
             }
+            ss << "]";
+            LOG(INFO) << "find one group_commit need to commit, txn_id=" << 
txn_id
+                      << ", label=" << label << ", instance_id=" << 
load_instance_id
+                      << ", duration=" << duration << ", load_ids=" << ss.str()
+                      << ", runtime_state=" << runtime_state;
         }
-        _get_cond.wait_for(l, std::chrono::milliseconds(
-                                      std::min(left_milliseconds, 
static_cast<int64_t>(10000))));
+        get_block_dep->block();
+        return Status::OK();
     }
     if (runtime_state->is_cancelled()) {
         auto st = runtime_state->cancel_reason();
@@ -194,7 +195,9 @@ void LoadBlockQueue::remove_load_id(const UniqueId& 
load_id) {
     if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
         _load_ids_to_write_dep[load_id]->set_always_ready();
         _load_ids_to_write_dep.erase(load_id);
-        _get_cond.notify_all();
+        for (auto read_dep : _read_deps) {
+            read_dep->set_ready();
+        }
     }
 }
 
@@ -543,7 +546,8 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, 
int64_t table_id,
 }
 
 Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
-                                              std::shared_ptr<LoadBlockQueue>& 
load_block_queue) {
+                                              std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                              
std::shared_ptr<pipeline::Dependency> get_block_dep) {
     std::unique_lock l(_lock);
     auto it = _load_block_queues.find(instance_id);
     if (it == _load_block_queues.end()) {
@@ -551,6 +555,7 @@ Status GroupCommitTable::get_load_block_queue(const 
TUniqueId& instance_id,
                                      " not found");
     }
     load_block_queue = it->second;
+    load_block_queue->append_read_dependency(get_block_dep);
     return Status::OK();
 }
 
@@ -594,7 +599,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
 }
 
 Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& 
instance_id,
-                                            std::shared_ptr<LoadBlockQueue>& 
load_block_queue) {
+                                            std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                            
std::shared_ptr<pipeline::Dependency> get_block_dep) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -605,7 +611,7 @@ Status GroupCommitMgr::get_load_block_queue(int64_t 
table_id, const TUniqueId& i
         }
         group_commit_table = it->second;
     }
-    return group_commit_table->get_load_block_queue(instance_id, 
load_block_queue);
+    return group_commit_table->get_load_block_queue(instance_id, 
load_block_queue, get_block_dep);
 }
 
 Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
@@ -637,6 +643,11 @@ void 
LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency> fin
     }
 }
 
+void 
LoadBlockQueue::append_read_dependency(std::shared_ptr<pipeline::Dependency> 
read_dep) {
+    std::lock_guard<std::mutex> lock(mutex);
+    _read_deps.push_back(read_dep);
+}
+
 bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
     DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { 
return false; });
     auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index f290d2aa6bb..702ebb9c746 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -72,7 +72,7 @@ public:
     Status add_block(RuntimeState* runtime_state, 
std::shared_ptr<vectorized::Block> block,
                      bool write_wal, UniqueId& load_id);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
-                     bool* eos);
+                     bool* eos, std::shared_ptr<pipeline::Dependency> 
get_block_dep);
     Status add_load_id(const UniqueId& load_id,
                        const std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     void remove_load_id(const UniqueId& load_id);
@@ -85,6 +85,7 @@ public:
     Status close_wal();
     bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
     void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
+    void append_read_dependency(std::shared_ptr<pipeline::Dependency> 
read_dep);
 
     std::string debug_string() const {
         fmt::memory_buffer debug_string_buffer;
@@ -120,6 +121,7 @@ private:
 
     // the set of load ids of all blocks in this queue
     std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> 
_load_ids_to_write_dep;
+    std::vector<std::shared_ptr<pipeline::Dependency>> _read_deps;
     std::list<BlockData> _block_queue;
 
     // wal
@@ -159,7 +161,8 @@ public:
                                       std::shared_ptr<pipeline::Dependency> 
create_plan_dep,
                                       std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     Status get_load_block_queue(const TUniqueId& instance_id,
-                                std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+                                std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                std::shared_ptr<pipeline::Dependency> 
get_block_dep);
 
 private:
     Status _create_group_commit_load(int be_exe_version,
@@ -195,7 +198,8 @@ public:
 
     // used when init group_commit_scan_node
     Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
-                                std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+                                std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                std::shared_ptr<pipeline::Dependency> 
get_block_dep);
     Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t 
base_schema_version,
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to