This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cec69f4cbd3 [Enhancement](wal) Add timout for wal memory back pressure (#29178) cec69f4cbd3 is described below commit cec69f4cbd3be1d4ce67faa820e3b4190b670a3c Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Tue Jan 2 11:02:17 2024 +0800 [Enhancement](wal) Add timout for wal memory back pressure (#29178) --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- be/src/runtime/group_commit_mgr.cpp | 22 +++++++++++++++++++--- be/src/runtime/group_commit_mgr.h | 6 +++++- be/src/vec/sink/group_commit_block_sink.cpp | 11 ++++++----- be/src/vec/sink/group_commit_block_sink.h | 2 +- 6 files changed, 33 insertions(+), 12 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e94b4c6dadb..9b30ae145c4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10"); DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000"); DEFINE_Bool(wait_internal_group_commit_finish, "false"); // Max size(bytes) of group commit queues, used for mem back pressure, defult 64M. -DEFINE_Int32(group_commit_max_queue_size, "67108864"); +DEFINE_Int32(group_commit_queue_mem_limit, "67108864"); // Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DEFINE_String(group_commit_wal_max_disk_limit, "10%"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 2d257b49510..c340abe05f3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads); DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio); DECLARE_Bool(wait_internal_group_commit_finish); // Max size(bytes) of group commit queues, used for mem back pressure. -DECLARE_Int32(group_commit_max_queue_size); +DECLARE_Int32(group_commit_queue_mem_limit); // Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DECLARE_mString(group_commit_wal_max_disk_limit); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 00b3a559fbf..b965efadd70 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -21,6 +21,7 @@ #include <glog/logging.h> #include <atomic> +#include <chrono> #include <cstddef> #include <cstdint> #include <memory> @@ -40,14 +41,29 @@ namespace doris { -Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool write_wal) { +Status LoadBlockQueue::add_block(RuntimeState* runtime_state, + std::shared_ptr<vectorized::Block> block, bool write_wal) { std::unique_lock l(mutex); RETURN_IF_ERROR(status); - while (_all_block_queues_bytes->load(std::memory_order_relaxed) > - config::group_commit_max_queue_size) { + auto start = std::chrono::steady_clock::now(); + while (!runtime_state->is_cancelled() && status.ok() && + _all_block_queues_bytes->load(std::memory_order_relaxed) > + config::group_commit_queue_mem_limit) { _put_cond.wait_for( l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME)); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now() - start); + if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) { + return Status::TimedOut( + "Wal memory back pressure wait too much time! Load block queue txn id: {}, " + "label: {}, instance id: {}", + txn_id, label, load_instance_id.to_string()); + } } + if (runtime_state->is_cancelled()) { + return Status::Cancelled(runtime_state->cancel_reason()); + } + RETURN_IF_ERROR(status); if (block->rows() > 0) { _block_queue.push_back(block); if (write_wal) { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index b0553b44876..03c917f8fcc 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -55,7 +55,8 @@ public: _all_block_queues_bytes(all_block_queues_bytes), _group_commit_interval_ms(group_commit_interval_ms) {}; - Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal); + Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block, + bool write_wal); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, bool* eos); Status add_load_id(const UniqueId& load_id); @@ -68,7 +69,10 @@ public: bool has_enough_wal_disk_space(const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const TUniqueId& load_id, bool is_blocks_contain_all_load_data); + // 1s static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; + // 120s + static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000; UniqueId load_instance_id; std::string label; int64_t txn_id; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 56869665878..277b9859bd5 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -114,7 +114,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { (double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) { return Status::DataQualityError("too many filtered rows"); } - RETURN_IF_ERROR(_add_blocks(true)); + RETURN_IF_ERROR(_add_blocks(state, true)); } if (_load_block_queue) { _load_block_queue->remove_load_id(_load_id); @@ -220,15 +220,16 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, _blocks.emplace_back(output_block); } else { if (!_is_block_appended) { - RETURN_IF_ERROR(_add_blocks(false)); + RETURN_IF_ERROR(_add_blocks(state, false)); } RETURN_IF_ERROR(_load_block_queue->add_block( - output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); + state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); } return Status::OK(); } -Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) { +Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, + bool is_blocks_contain_all_load_data) { DCHECK(_is_block_appended == false); TUniqueId load_id; load_id.__set_hi(_load_id.hi); @@ -257,7 +258,7 @@ Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) { } for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { RETURN_IF_ERROR(_load_block_queue->add_block( - *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); + state, *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); } _is_block_appended = true; _blocks.clear(); diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index 84ffebf8fe1..3db4bdd31f8 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -47,7 +47,7 @@ public: private: Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block); - Status _add_blocks(bool is_blocks_contain_all_load_data); + Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data); vectorized::VExprContextSPtrs _output_vexpr_ctxs; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org