This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1c435c297c5 [Improvement](load) Do no block in group commit sink (#36612) 1c435c297c5 is described below commit 1c435c297c58a3447cb0497ef46471e4d874bb61 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Jun 24 09:29:21 2024 +0800 [Improvement](load) Do no block in group commit sink (#36612) Group commit sink operator will create an internal loading task before starting. This is a blocking stop now to create task by RPC which is not allowed on pipeline engine. This PR makes this blocking step a dependency. --- .../exec/group_commit_block_sink_operator.cpp | 80 ++++++++++++-------- .../exec/group_commit_block_sink_operator.h | 5 +- be/src/runtime/group_commit_mgr.cpp | 85 +++++++++++----------- be/src/runtime/group_commit_mgr.h | 22 +++--- 4 files changed, 107 insertions(+), 85 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 5de2e667d4e..3953eb63c4d 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -52,9 +52,28 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); } + _write_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "GroupCommitBlockSinkDependency", true); + + WARN_IF_ERROR(_initialize_load_queue(), ""); return Status::OK(); } +Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { + auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); + TUniqueId load_id; + load_id.__set_hi(p._load_id.hi); + load_id.__set_lo(p._load_id.lo); + if (_state->exec_env()->wal_mgr()->is_running()) { + RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( + p._db_id, p._table_id, p._base_schema_version, load_id, _load_block_queue, + _state->be_exec_version(), _state->query_mem_tracker(), _write_dependency)); + return Status::OK(); + } else { + return Status::InternalError("be is stopping"); + } +} + Status GroupCommitBlockSinkLocalState::close(RuntimeState* state, Status close_status) { if (_closed) { return Status::OK(); @@ -79,8 +98,9 @@ Status GroupCommitBlockSinkLocalState::close(RuntimeState* state, Status close_s std::string GroupCommitBlockSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, ", _load_block_queue: ({})", - _load_block_queue ? _load_block_queue->debug_string() : "NULL"); + fmt::format_to(debug_string_buffer, ", _load_block_queue: ({}), _base_schema_version: {}", + _load_block_queue ? _load_block_queue->debug_string() : "NULL", + _parent->cast<GroupCommitBlockSinkOperatorX>()._base_schema_version); return fmt::to_string(debug_string_buffer); } @@ -164,37 +184,31 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, TUniqueId load_id; load_id.__set_hi(p._load_id.hi); load_id.__set_lo(p._load_id.lo); - if (_load_block_queue == nullptr) { - if (_state->exec_env()->wal_mgr()->is_running()) { - RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - p._db_id, p._table_id, p._base_schema_version, load_id, _load_block_queue, - _state->be_exec_version(), _state->query_mem_tracker())); - if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) { - size_t estimated_wal_bytes = - _calculate_estimated_wal_bytes(is_blocks_contain_all_load_data); - _group_commit_mode = - _load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes) - ? TGroupCommitMode::ASYNC_MODE - : TGroupCommitMode::SYNC_MODE; - if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) { - LOG(INFO) << "Load id=" << print_id(_state->query_id()) - << ", use group commit label=" << _load_block_queue->label - << " will not write wal because wal disk space usage reach max " - "limit. Detail info: " - << _state->exec_env()->wal_mgr()->get_wal_dirs_info_string(); - } else { - _estimated_wal_bytes = estimated_wal_bytes; - } - } - if (_load_block_queue->wait_internal_group_commit_finish || - _group_commit_mode == TGroupCommitMode::SYNC_MODE) { - _load_block_queue->append_dependency(_finish_dependency); + if (_state->exec_env()->wal_mgr()->is_running()) { + if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) { + size_t estimated_wal_bytes = + _calculate_estimated_wal_bytes(is_blocks_contain_all_load_data); + _group_commit_mode = _load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes) + ? TGroupCommitMode::ASYNC_MODE + : TGroupCommitMode::SYNC_MODE; + if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) { + LOG(INFO) << "Load id=" << print_id(_state->query_id()) + << ", use group commit label=" << _load_block_queue->label + << " will not write wal because wal disk space usage reach max " + "limit. Detail info: " + << _state->exec_env()->wal_mgr()->get_wal_dirs_info_string(); + } else { + _estimated_wal_bytes = estimated_wal_bytes; } - _state->set_import_label(_load_block_queue->label); - _state->set_wal_id(_load_block_queue->txn_id); - } else { - return Status::InternalError("be is stopping"); } + if (_load_block_queue->wait_internal_group_commit_finish || + _group_commit_mode == TGroupCommitMode::SYNC_MODE) { + _load_block_queue->append_dependency(_finish_dependency); + } + _state->set_import_label(_load_block_queue->label); + _state->set_wal_id(_load_block_queue->txn_id); + } else { + return Status::InternalError("be is stopping"); } for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { RETURN_IF_ERROR(_load_block_queue->add_block( @@ -263,6 +277,10 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get()); + if (!local_state._load_block_queue) { + RETURN_IF_ERROR(local_state._initialize_load_queue()); + } + DCHECK(local_state._load_block_queue); Status status = Status::OK(); auto wind_up = [&]() -> Status { diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h index f26e65b97da..27e344deca6 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.h +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -46,6 +46,7 @@ public: Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } + std::vector<Dependency*> dependencies() const override { return {_write_dependency.get()}; } std::string debug_string(int indentation_level) const override; private: @@ -54,12 +55,13 @@ private: Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data); size_t _calculate_estimated_wal_bytes(bool is_blocks_contain_all_load_data); void _remove_estimated_wal_bytes(); + Status _initialize_load_queue(); vectorized::VExprContextSPtrs _output_vexpr_ctxs; std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor; - std::shared_ptr<LoadBlockQueue> _load_block_queue; + std::shared_ptr<LoadBlockQueue> _load_block_queue = nullptr; // used to calculate if meet the max filter ratio std::vector<std::shared_ptr<vectorized::Block>> _blocks; bool _is_block_appended = false; @@ -73,6 +75,7 @@ private: Bitmap _filter_bitmap; int64_t _table_id; std::shared_ptr<Dependency> _finish_dependency; + std::shared_ptr<Dependency> _write_dependency = nullptr; }; class GroupCommitBlockSinkOperatorX final diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index d21535d6351..d5e2651fd4d 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -250,46 +250,48 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker) { + std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) { DCHECK(table_id == _table_id); - { - std::unique_lock l(_lock); - for (int i = 0; i < 3; i++) { - bool is_schema_version_match = true; - for (const auto& [_, inner_block_queue] : _load_block_queues) { - if (!inner_block_queue->need_commit()) { - if (base_schema_version == inner_block_queue->schema_version) { - if (inner_block_queue->add_load_id(load_id).ok()) { - load_block_queue = inner_block_queue; - return Status::OK(); - } - } else if (base_schema_version < inner_block_queue->schema_version) { - is_schema_version_match = false; + std::unique_lock l(_lock); + auto try_to_get_matched_queue = [&]() -> Status { + for (const auto& [_, inner_block_queue] : _load_block_queues) { + if (!inner_block_queue->need_commit()) { + if (base_schema_version == inner_block_queue->schema_version) { + if (inner_block_queue->add_load_id(load_id).ok()) { + load_block_queue = inner_block_queue; + return Status::OK(); } + } else { + return Status::DataQualityError<false>( + "schema version not match, maybe a schema change is in process. " + "Please " + "retry this load manually."); } } - if (!is_schema_version_match) { - return Status::DataQualityError<false>( - "schema version not match, maybe a schema change is in process. Please " - "retry this load manually."); - } - if (!_is_creating_plan_fragment) { - _is_creating_plan_fragment = true; - RETURN_IF_ERROR(_thread_pool->submit_func([this, be_exe_version, mem_tracker] { - auto st = _create_group_commit_load(be_exe_version, mem_tracker); - if (!st.ok()) { - LOG(WARNING) << "create group commit load error, st=" << st.to_string(); - std::unique_lock l(_lock); - _is_creating_plan_fragment = false; - _cv.notify_all(); - } - })); - } - _cv.wait_for(l, std::chrono::seconds(4)); } + return Status::InternalError<false>("can not get a block queue for table_id: " + + std::to_string(_table_id)); + }; + + if (try_to_get_matched_queue().ok()) { + return Status::OK(); } - return Status::InternalError<false>("can not get a block queue for table_id: " + - std::to_string(_table_id)); + if (!_is_creating_plan_fragment) { + _is_creating_plan_fragment = true; + dep->block(); + RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = dep] { + Defer defer {[&, dep = dep]() { + dep->set_ready(); + std::unique_lock l(_lock); + _is_creating_plan_fragment = false; + }}; + auto st = _create_group_commit_load(be_exe_version, mem_tracker); + if (!st.ok()) { + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + } + })); + } + return try_to_get_matched_queue(); } Status GroupCommitTable::_create_group_commit_load(int be_exe_version, @@ -378,8 +380,6 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, be_exe_version)); } _load_block_queues.emplace(instance_id, load_block_queue); - _is_creating_plan_fragment = false; - _cv.notify_all(); } } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, result.params, @@ -565,12 +565,10 @@ void GroupCommitMgr::stop() { LOG(INFO) << "GroupCommitMgr is stopped"; } -Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id, - int64_t base_schema_version, - const UniqueId& load_id, - std::shared_ptr<LoadBlockQueue>& load_block_queue, - int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker) { +Status GroupCommitMgr::get_first_block_load_queue( + int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, + std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, + std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) { std::shared_ptr<GroupCommitTable> group_commit_table; { std::lock_guard wlock(_lock); @@ -582,7 +580,8 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i group_commit_table = _table_map[table_id]; } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( - table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker)); + table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker, + dep)); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 65f9f09670c..679da81f75f 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -87,13 +87,14 @@ public: std::string debug_string() const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, - "load_instance_id={}, label={}, txn_id={}, " - "wait_internal_group_commit_finish={}, data_size_condition={}, " - "group_commit_load_count={}, process_finish={}", - load_instance_id.to_string(), label, txn_id, - wait_internal_group_commit_finish, data_size_condition, - group_commit_load_count, process_finish.load()); + fmt::format_to( + debug_string_buffer, + "load_instance_id={}, label={}, txn_id={}, " + "wait_internal_group_commit_finish={}, data_size_condition={}, " + "group_commit_load_count={}, process_finish={}, _need_commit={}, schema_version={}", + load_instance_id.to_string(), label, txn_id, wait_internal_group_commit_finish, + data_size_condition, group_commit_load_count, process_finish.load(), _need_commit, + schema_version); return fmt::to_string(debug_string_buffer); } @@ -154,7 +155,8 @@ public: const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker); + std::shared_ptr<MemTrackerLimiter> mem_tracker, + std::shared_ptr<pipeline::Dependency> dep); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr<LoadBlockQueue>& load_block_queue); @@ -178,7 +180,6 @@ private: int64_t _table_id; std::mutex _lock; - std::condition_variable _cv; // fragment_instance_id to load_block_queue std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues; bool _is_creating_plan_fragment = false; @@ -198,7 +199,8 @@ public: const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker); + std::shared_ptr<MemTrackerLimiter> mem_tracker, + std::shared_ptr<pipeline::Dependency> dep); std::promise<Status> debug_promise; std::future<Status> debug_future = debug_promise.get_future(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org