This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new acd59d27ccf [pipeline](load) Not blocking in execution threads (#36291) acd59d27ccf is described below commit acd59d27ccffb02cc12d9bd02ab766e29bbad267 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Jun 18 18:54:35 2024 +0800 [pipeline](load) Not blocking in execution threads (#36291) In group-commit loading tasks, we will wait for a condition variables in execution threads before finishing. This is harmful for pipeline engine. --- .../exec/group_commit_block_sink_operator.cpp | 67 ++++++++++++++-------- .../exec/group_commit_block_sink_operator.h | 9 ++- be/src/runtime/group_commit_mgr.cpp | 14 ++++- be/src/runtime/group_commit_mgr.h | 21 ++++++- 4 files changed, 83 insertions(+), 28 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 50b314a7dc1..99fd6ef20ab 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -64,40 +64,27 @@ Status GroupCommitBlockSinkLocalState::close(RuntimeState* state, Status close_s SCOPED_TIMER(_close_timer); RETURN_IF_ERROR(Base::close(state, close_status)); RETURN_IF_ERROR(close_status); - int64_t total_rows = state->num_rows_load_total(); - int64_t loaded_rows = state->num_rows_load_total(); - state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + - state->num_rows_load_filtered()); - state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - - loaded_rows); - auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); - if (!_is_block_appended) { - // if not meet the max_filter_ratio, we should return error status directly - int64_t num_selected_rows = - state->num_rows_load_total() - state->num_rows_load_unselected(); - if (num_selected_rows > 0 && - (double)state->num_rows_load_filtered() / num_selected_rows > p._max_filter_ratio) { - return Status::DataQualityError("too many filtered rows"); - } - RETURN_IF_ERROR(_add_blocks(state, true)); - } - if (_load_block_queue) { - _remove_estimated_wal_bytes(); - _load_block_queue->remove_load_id(p._load_id); - } // wait to wal auto st = Status::OK(); if (_load_block_queue && (_load_block_queue->wait_internal_group_commit_finish || _group_commit_mode == TGroupCommitMode::SYNC_MODE)) { std::unique_lock l(_load_block_queue->mutex); if (!_load_block_queue->process_finish) { - _load_block_queue->internal_group_commit_finish_cv.wait(l); + return Status::InternalError("_load_block_queue is not finished!"); } st = _load_block_queue->status; } return st; } +std::string GroupCommitBlockSinkLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, ", _load_block_queue: ({})", + _load_block_queue ? _load_block_queue->debug_string() : "NULL"); + return fmt::to_string(debug_string_buffer); +} + Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block) { if (block->rows() == 0) { @@ -200,6 +187,10 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, _estimated_wal_bytes = estimated_wal_bytes; } } + if (_load_block_queue->wait_internal_group_commit_finish || + _group_commit_mode == TGroupCommitMode::SYNC_MODE) { + _load_block_queue->append_dependency(_finish_dependency); + } _state->set_import_label(_load_block_queue->label); _state->set_wal_id(_load_block_queue->txn_id); } else { @@ -233,6 +224,7 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, } Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(Base::init(t_sink)); DCHECK(t_sink.__isset.olap_table_sink); auto& table_sink = t_sink.olap_table_sink; _tuple_desc_id = table_sink.tuple_id; @@ -274,10 +266,35 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get()); Status status = Status::OK(); + auto wind_up = [&]() -> Status { + if (eos) { + int64_t total_rows = state->num_rows_load_total(); + int64_t loaded_rows = state->num_rows_load_total(); + state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + + state->num_rows_load_filtered()); + state->update_num_rows_load_filtered(local_state._block_convertor->num_filtered_rows() + + total_rows - loaded_rows); + if (!local_state._is_block_appended) { + // if not meet the max_filter_ratio, we should return error status directly + int64_t num_selected_rows = + state->num_rows_load_total() - state->num_rows_load_unselected(); + if (num_selected_rows > 0 && + (double)state->num_rows_load_filtered() / num_selected_rows > + _max_filter_ratio) { + return Status::DataQualityError("too many filtered rows"); + } + RETURN_IF_ERROR(local_state._add_blocks(state, true)); + } + local_state._remove_estimated_wal_bytes(); + local_state._load_block_queue->remove_load_id(_load_id); + } + return Status::OK(); + }; + auto rows = input_block->rows(); auto bytes = input_block->bytes(); if (UNLIKELY(rows == 0)) { - return status; + return wind_up(); } // update incrementally so that FE can get the progress. @@ -324,7 +341,9 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc block->swap(res_block.to_block()); } // add block into block queue - return local_state._add_block(state, block); + RETURN_IF_ERROR(local_state._add_block(state, block)); + + return wind_up(); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h index b65326725b6..f26e65b97da 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.h +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -34,13 +34,19 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi public: GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state), _filter_bitmap(1024) {} + : Base(parent, state), _filter_bitmap(1024) { + _finish_dependency = + std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), + parent->get_name() + "_FINISH_DEPENDENCY", true); + } ~GroupCommitBlockSinkLocalState() override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; + Dependency* finishdependency() override { return _finish_dependency.get(); } + std::string debug_string(int indentation_level) const override; private: friend class GroupCommitBlockSinkOperatorX; @@ -66,6 +72,7 @@ private: TGroupCommitMode::type _group_commit_mode; Bitmap _filter_bitmap; int64_t _table_id; + std::shared_ptr<Dependency> _finish_dependency; }; class GroupCommitBlockSinkOperatorX final diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 06cf494c842..6e81e3ae4c3 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -26,6 +26,7 @@ #include "common/compiler_util.h" #include "common/config.h" #include "common/status.h" +#include "pipeline/dependency.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "util/debug_points.h" @@ -459,8 +460,10 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ { std::unique_lock l2(load_block_queue->mutex); load_block_queue->process_finish = true; + for (auto dep : load_block_queue->dependencies) { + dep->set_always_ready(); + } } - load_block_queue->internal_group_commit_finish_cv.notify_all(); } _load_block_queues.erase(instance_id); } @@ -616,6 +619,15 @@ Status LoadBlockQueue::close_wal() { return Status::OK(); } +void LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep) { + std::lock_guard<std::mutex> lock(mutex); + // If not finished, dependencies should be blocked. + if (!process_finish) { + finish_dep->block(); + dependencies.push_back(finish_dep); + } +} + bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) { DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; }); auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index c41f6abd6fe..65f9f09670c 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -41,6 +41,10 @@ class ExecEnv; class TUniqueId; class RuntimeState; +namespace pipeline { +class Dependency; +} + struct BlockData { BlockData(const std::shared_ptr<vectorized::Block>& block) : block(block), block_bytes(block->bytes()) {}; @@ -79,6 +83,19 @@ public: int be_exe_version); Status close_wal(); bool has_enough_wal_disk_space(size_t estimated_wal_bytes); + void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep); + + std::string debug_string() const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "load_instance_id={}, label={}, txn_id={}, " + "wait_internal_group_commit_finish={}, data_size_condition={}, " + "group_commit_load_count={}, process_finish={}", + load_instance_id.to_string(), label, txn_id, + wait_internal_group_commit_finish, data_size_condition, + group_commit_load_count, process_finish.load()); + return fmt::to_string(debug_string_buffer); + } UniqueId load_instance_id; std::string label; @@ -92,9 +109,9 @@ public: // the execute status of this internal group commit std::mutex mutex; - std::condition_variable internal_group_commit_finish_cv; - bool process_finish = false; + std::atomic<bool> process_finish = false; Status status = Status::OK(); + std::vector<std::shared_ptr<pipeline::Dependency>> dependencies; private: void _cancel_without_lock(const Status& st); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org