This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 08dedc8e3bf [Improvement](load) Do no block in group commit sink (#36717) 08dedc8e3bf is described below commit 08dedc8e3bf17920cbf5d52ee64b943c58f2dd85 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Jun 24 12:19:07 2024 +0800 [Improvement](load) Do no block in group commit sink (#36717) Do not rely on a conditional variable in group commit sink --- .../exec/group_commit_block_sink_operator.cpp | 23 ++--- .../exec/group_commit_block_sink_operator.h | 7 +- be/src/runtime/group_commit_mgr.cpp | 102 +++++++++++---------- be/src/runtime/group_commit_mgr.h | 14 +-- 4 files changed, 78 insertions(+), 68 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 3953eb63c4d..402354d6f24 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -52,22 +52,21 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); } - _write_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "GroupCommitBlockSinkDependency", true); - + _create_plan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "CreateGroupCommitPlanDependency", true); + _put_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "GroupCommitPutBlockDependency", true); WARN_IF_ERROR(_initialize_load_queue(), ""); return Status::OK(); } Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); - TUniqueId load_id; - load_id.__set_hi(p._load_id.hi); - load_id.__set_lo(p._load_id.lo); if (_state->exec_env()->wal_mgr()->is_running()) { RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - p._db_id, p._table_id, p._base_schema_version, load_id, _load_block_queue, - _state->be_exec_version(), _state->query_mem_tracker(), _write_dependency)); + p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue, + _state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency, + _put_block_dependency)); return Status::OK(); } else { return Status::InternalError("be is stopping"); @@ -138,7 +137,8 @@ Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state, RETURN_IF_ERROR(_add_blocks(state, false)); } RETURN_IF_ERROR(_load_block_queue->add_block( - state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); + state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE, + _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id)); } return Status::OK(); } @@ -181,9 +181,6 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data) { DCHECK(_is_block_appended == false); auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); - TUniqueId load_id; - load_id.__set_hi(p._load_id.hi); - load_id.__set_lo(p._load_id.lo); if (_state->exec_env()->wal_mgr()->is_running()) { if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) { size_t estimated_wal_bytes = @@ -212,7 +209,7 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, } for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { RETURN_IF_ERROR(_load_block_queue->add_block( - state, *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); + state, *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE, p._load_id)); } _is_block_appended = true; _blocks.clear(); diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h index 27e344deca6..caf7017d050 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.h +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -46,7 +46,9 @@ public: Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } - std::vector<Dependency*> dependencies() const override { return {_write_dependency.get()}; } + std::vector<Dependency*> dependencies() const override { + return {_create_plan_dependency.get(), _put_block_dependency.get()}; + } std::string debug_string(int indentation_level) const override; private: @@ -75,7 +77,8 @@ private: Bitmap _filter_bitmap; int64_t _table_id; std::shared_ptr<Dependency> _finish_dependency; - std::shared_ptr<Dependency> _write_dependency = nullptr; + std::shared_ptr<Dependency> _create_plan_dependency = nullptr; + std::shared_ptr<Dependency> _put_block_dependency = nullptr; }; class GroupCommitBlockSinkOperatorX final diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index d5e2651fd4d..7a17fd88939 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -35,28 +35,14 @@ namespace doris { Status LoadBlockQueue::add_block(RuntimeState* runtime_state, - std::shared_ptr<vectorized::Block> block, bool write_wal) { + std::shared_ptr<vectorized::Block> block, bool write_wal, + UniqueId& load_id) { std::unique_lock l(mutex); RETURN_IF_ERROR(status); auto start = std::chrono::steady_clock::now(); DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", { start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000); }); - while (!runtime_state->is_cancelled() && status.ok() && - _all_block_queues_bytes->load(std::memory_order_relaxed) >= - config::group_commit_queue_mem_limit) { - _put_cond.wait_for(l, - std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME)); - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now() - start); - if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT) { - return Status::TimedOut<false>( - "Wal memory back pressure wait too much time! Load block queue txn id: {}, " - "label: {}, instance id: {}, consumed memory: {}", - txn_id, label, load_instance_id.to_string(), - _all_block_queues_bytes->load(std::memory_order_relaxed)); - } - } if (UNLIKELY(runtime_state->is_cancelled())) { return runtime_state->cancel_reason(); } @@ -69,8 +55,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); std::stringstream ss; ss << "["; - for (const auto& id : _load_ids) { - ss << id.to_string() << ", "; + for (const auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; } ss << "]"; VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). " @@ -92,6 +78,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, return st; } } + if (!runtime_state->is_cancelled() && status.ok() && + _all_block_queues_bytes->load(std::memory_order_relaxed) >= + config::group_commit_queue_mem_limit) { + DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()); + _load_ids_to_write_dep[load_id]->block(); + } } if (!_need_commit) { if (_data_bytes >= _group_commit_data_bytes) { @@ -125,7 +117,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* } } while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && - (!_need_commit || (_need_commit && !_load_ids.empty()))) { + (!_need_commit || (_need_commit && !_load_ids_to_write_dep.empty()))) { auto left_milliseconds = _group_commit_interval_ms; auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - _start_time) @@ -140,8 +132,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* if (duration >= 10 * _group_commit_interval_ms) { std::stringstream ss; ss << "["; - for (auto& id : _load_ids) { - ss << id.to_string() << ", "; + for (auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; } ss << "]"; LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id @@ -167,8 +159,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); std::stringstream ss; ss << "["; - for (const auto& id : _load_ids) { - ss << id.to_string() << ", "; + for (const auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; } ss << "]"; VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). " @@ -183,30 +175,37 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* << ", the block is " << block->dump_data() << ", the block column size is " << block->columns_bytes(); } - if (_block_queue.empty() && _need_commit && _load_ids.empty()) { + if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) { *eos = true; } else { *eos = false; } - _put_cond.notify_all(); + if (_all_block_queues_bytes->load(std::memory_order_relaxed) < + config::group_commit_queue_mem_limit) { + for (auto& id : _load_ids_to_write_dep) { + id.second->set_ready(); + } + } return Status::OK(); } void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { std::unique_lock l(mutex); - if (_load_ids.find(load_id) != _load_ids.end()) { - _load_ids.erase(load_id); + if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) { + _load_ids_to_write_dep[load_id]->set_always_ready(); + _load_ids_to_write_dep.erase(load_id); _get_cond.notify_all(); } } -Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { +Status LoadBlockQueue::add_load_id(const UniqueId& load_id, + const std::shared_ptr<pipeline::Dependency> put_block_dep) { std::unique_lock l(mutex); if (_need_commit) { return Status::InternalError<false>("block queue is set need commit, id=" + load_instance_id.to_string()); } - _load_ids.emplace(load_id); + _load_ids_to_write_dep[load_id] = put_block_dep; group_commit_load_count.fetch_add(1); return Status::OK(); } @@ -228,8 +227,8 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); std::stringstream ss; ss << "["; - for (const auto& id : _load_ids) { - ss << id.to_string() << ", "; + for (const auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; } ss << "]"; VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). " @@ -245,20 +244,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { << block_data.block->columns_bytes(); _block_queue.pop_front(); } + for (auto& id : _load_ids_to_write_dep) { + id.second->set_always_ready(); + } } Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) { + std::shared_ptr<MemTrackerLimiter> mem_tracker, + std::shared_ptr<pipeline::Dependency> create_plan_dep, + std::shared_ptr<pipeline::Dependency> put_block_dep) { DCHECK(table_id == _table_id); std::unique_lock l(_lock); auto try_to_get_matched_queue = [&]() -> Status { for (const auto& [_, inner_block_queue] : _load_block_queues) { if (!inner_block_queue->need_commit()) { if (base_schema_version == inner_block_queue->schema_version) { - if (inner_block_queue->add_load_id(load_id).ok()) { + if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { load_block_queue = inner_block_queue; + return Status::OK(); } } else { @@ -278,18 +283,19 @@ Status GroupCommitTable::get_first_block_load_queue( } if (!_is_creating_plan_fragment) { _is_creating_plan_fragment = true; - dep->block(); - RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = dep] { - Defer defer {[&, dep = dep]() { - dep->set_ready(); - std::unique_lock l(_lock); - _is_creating_plan_fragment = false; - }}; - auto st = _create_group_commit_load(be_exe_version, mem_tracker); - if (!st.ok()) { - LOG(WARNING) << "create group commit load error, st=" << st.to_string(); - } - })); + create_plan_dep->block(); + RETURN_IF_ERROR( + _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] { + Defer defer {[&, dep = dep]() { + dep->set_ready(); + std::unique_lock l(_lock); + _is_creating_plan_fragment = false; + }}; + auto st = _create_group_commit_load(be_exe_version, mem_tracker); + if (!st.ok()) { + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + } + })); } return try_to_get_matched_queue(); } @@ -568,7 +574,9 @@ void GroupCommitMgr::stop() { Status GroupCommitMgr::get_first_block_load_queue( int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) { + std::shared_ptr<MemTrackerLimiter> mem_tracker, + std::shared_ptr<pipeline::Dependency> create_plan_dep, + std::shared_ptr<pipeline::Dependency> put_block_dep) { std::shared_ptr<GroupCommitTable> group_commit_table; { std::lock_guard wlock(_lock); @@ -581,7 +589,7 @@ Status GroupCommitMgr::get_first_block_load_queue( } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker, - dep)); + create_plan_dep, put_block_dep)); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 679da81f75f..f290d2aa6bb 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -70,10 +70,11 @@ public: _all_block_queues_bytes(all_block_queues_bytes) {}; Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block, - bool write_wal); + bool write_wal, UniqueId& load_id); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, bool* eos); - Status add_load_id(const UniqueId& load_id); + Status add_load_id(const UniqueId& load_id, + const std::shared_ptr<pipeline::Dependency> put_block_dep); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); bool need_commit() { return _need_commit; } @@ -118,7 +119,7 @@ private: void _cancel_without_lock(const Status& st); // the set of load ids of all blocks in this queue - std::set<UniqueId> _load_ids; + std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> _load_ids_to_write_dep; std::list<BlockData> _block_queue; // wal @@ -136,7 +137,6 @@ private: // memory back pressure, memory consumption of all tables' load block queues std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; - std::condition_variable _put_cond; std::condition_variable _get_cond; static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s @@ -156,7 +156,8 @@ public: std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, - std::shared_ptr<pipeline::Dependency> dep); + std::shared_ptr<pipeline::Dependency> create_plan_dep, + std::shared_ptr<pipeline::Dependency> put_block_dep); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr<LoadBlockQueue>& load_block_queue); @@ -200,7 +201,8 @@ public: std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, - std::shared_ptr<pipeline::Dependency> dep); + std::shared_ptr<pipeline::Dependency> create_plan_dep, + std::shared_ptr<pipeline::Dependency> put_block_dep); std::promise<Status> debug_promise; std::future<Status> debug_future = debug_promise.get_future(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org