This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5e25544fe9a [Improvement](load) Do no block in group commit scan operator (#36730) 5e25544fe9a is described below commit 5e25544fe9ac2b7165e97637a8bc446ca1f69ef2 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Jun 25 10:18:52 2024 +0800 [Improvement](load) Do no block in group commit scan operator (#36730) --- .../pipeline/exec/group_commit_scan_operator.cpp | 7 ++- be/src/pipeline/exec/group_commit_scan_operator.h | 6 ++ be/src/runtime/group_commit_mgr.cpp | 73 +++++++++++++--------- be/src/runtime/group_commit_mgr.h | 10 ++- 4 files changed, 60 insertions(+), 36 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp b/be/src/pipeline/exec/group_commit_scan_operator.cpp index 5c3f7e84ee8..3e6ad62c5dc 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.cpp +++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp @@ -33,7 +33,8 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* b auto& local_state = get_local_state(state); bool find_node = false; while (!find_node && !*eos) { - RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos)); + RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos, + local_state._get_block_dependency)); } return Status::OK(); } @@ -42,8 +43,10 @@ Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(ScanLocalState<GroupCommitLocalState>::init(state, info)); SCOPED_TIMER(_init_timer); auto& p = _parent->cast<GroupCommitOperatorX>(); + _get_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "GroupCommitGetBlockDependency", true); return state->exec_env()->group_commit_mgr()->get_load_block_queue( - p._table_id, state->fragment_instance_id(), load_block_queue); + p._table_id, state->fragment_instance_id(), load_block_queue, _get_block_dependency); } Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) { diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h b/be/src/pipeline/exec/group_commit_scan_operator.h index b4767d60543..46f50f37724 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.h +++ b/be/src/pipeline/exec/group_commit_scan_operator.h @@ -37,9 +37,15 @@ public: : ScanLocalState(state, parent) {} Status init(RuntimeState* state, LocalStateInfo& info) override; std::shared_ptr<LoadBlockQueue> load_block_queue; + std::vector<Dependency*> dependencies() const override { + return {_scan_dependency.get(), _get_block_dependency.get()}; + } private: + friend class GroupCommitOperatorX; Status _process_conjuncts(RuntimeState* state) override; + + std::shared_ptr<Dependency> _get_block_dependency = nullptr; }; class GroupCommitOperatorX final : public ScanOperatorX<GroupCommitLocalState> { diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 7a17fd88939..ab11b795ed5 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -100,12 +100,15 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, _need_commit = true; } } - _get_cond.notify_all(); + for (auto read_dep : _read_deps) { + read_dep->set_ready(); + } return Status::OK(); } Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block, - bool* find_block, bool* eos) { + bool* find_block, bool* eos, + std::shared_ptr<pipeline::Dependency> get_block_dep) { *find_block = false; *eos = false; std::unique_lock l(mutex); @@ -116,34 +119,32 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* _need_commit = true; } } - while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && - (!_need_commit || (_need_commit && !_load_ids_to_write_dep.empty()))) { - auto left_milliseconds = _group_commit_interval_ms; - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now() - _start_time) - .count(); - if (!_need_commit) { - left_milliseconds = _group_commit_interval_ms - duration; - if (left_milliseconds <= 0) { - _need_commit = true; - break; - } + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now() - _start_time) + .count(); + if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && !_need_commit) { + if (_group_commit_interval_ms - duration <= 0) { + _need_commit = true; } else { - if (duration >= 10 * _group_commit_interval_ms) { - std::stringstream ss; - ss << "["; - for (auto& id : _load_ids_to_write_dep) { - ss << id.first.to_string() << ", "; - } - ss << "]"; - LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id - << ", label=" << label << ", instance_id=" << load_instance_id - << ", duration=" << duration << ", load_ids=" << ss.str() - << ", runtime_state=" << runtime_state; + get_block_dep->block(); + return Status::OK(); + } + } else if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && + _need_commit && !_load_ids_to_write_dep.empty()) { + if (duration >= 10 * _group_commit_interval_ms) { + std::stringstream ss; + ss << "["; + for (auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; } + ss << "]"; + LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id + << ", label=" << label << ", instance_id=" << load_instance_id + << ", duration=" << duration << ", load_ids=" << ss.str() + << ", runtime_state=" << runtime_state; } - _get_cond.wait_for(l, std::chrono::milliseconds( - std::min(left_milliseconds, static_cast<int64_t>(10000)))); + get_block_dep->block(); + return Status::OK(); } if (runtime_state->is_cancelled()) { auto st = runtime_state->cancel_reason(); @@ -194,7 +195,9 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) { _load_ids_to_write_dep[load_id]->set_always_ready(); _load_ids_to_write_dep.erase(load_id); - _get_cond.notify_all(); + for (auto read_dep : _read_deps) { + read_dep->set_ready(); + } } } @@ -543,7 +546,8 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, } Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, - std::shared_ptr<LoadBlockQueue>& load_block_queue) { + std::shared_ptr<LoadBlockQueue>& load_block_queue, + std::shared_ptr<pipeline::Dependency> get_block_dep) { std::unique_lock l(_lock); auto it = _load_block_queues.find(instance_id); if (it == _load_block_queues.end()) { @@ -551,6 +555,7 @@ Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, " not found"); } load_block_queue = it->second; + load_block_queue->append_read_dependency(get_block_dep); return Status::OK(); } @@ -594,7 +599,8 @@ Status GroupCommitMgr::get_first_block_load_queue( } Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, - std::shared_ptr<LoadBlockQueue>& load_block_queue) { + std::shared_ptr<LoadBlockQueue>& load_block_queue, + std::shared_ptr<pipeline::Dependency> get_block_dep) { std::shared_ptr<GroupCommitTable> group_commit_table; { std::lock_guard<std::mutex> l(_lock); @@ -605,7 +611,7 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i } group_commit_table = it->second; } - return group_commit_table->get_load_block_queue(instance_id, load_block_queue); + return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep); } Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, @@ -637,6 +643,11 @@ void LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency> fin } } +void LoadBlockQueue::append_read_dependency(std::shared_ptr<pipeline::Dependency> read_dep) { + std::lock_guard<std::mutex> lock(mutex); + _read_deps.push_back(read_dep); +} + bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) { DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; }); auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index f290d2aa6bb..702ebb9c746 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -72,7 +72,7 @@ public: Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block, bool write_wal, UniqueId& load_id); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, - bool* eos); + bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep); Status add_load_id(const UniqueId& load_id, const std::shared_ptr<pipeline::Dependency> put_block_dep); void remove_load_id(const UniqueId& load_id); @@ -85,6 +85,7 @@ public: Status close_wal(); bool has_enough_wal_disk_space(size_t estimated_wal_bytes); void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep); + void append_read_dependency(std::shared_ptr<pipeline::Dependency> read_dep); std::string debug_string() const { fmt::memory_buffer debug_string_buffer; @@ -120,6 +121,7 @@ private: // the set of load ids of all blocks in this queue std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> _load_ids_to_write_dep; + std::vector<std::shared_ptr<pipeline::Dependency>> _read_deps; std::list<BlockData> _block_queue; // wal @@ -159,7 +161,8 @@ public: std::shared_ptr<pipeline::Dependency> create_plan_dep, std::shared_ptr<pipeline::Dependency> put_block_dep); Status get_load_block_queue(const TUniqueId& instance_id, - std::shared_ptr<LoadBlockQueue>& load_block_queue); + std::shared_ptr<LoadBlockQueue>& load_block_queue, + std::shared_ptr<pipeline::Dependency> get_block_dep); private: Status _create_group_commit_load(int be_exe_version, @@ -195,7 +198,8 @@ public: // used when init group_commit_scan_node Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, - std::shared_ptr<LoadBlockQueue>& load_block_queue); + std::shared_ptr<LoadBlockQueue>& load_block_queue, + std::shared_ptr<pipeline::Dependency> get_block_dep); Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org