This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cec69f4cbd3 [Enhancement](wal) Add timout for wal memory back pressure 
(#29178)
cec69f4cbd3 is described below

commit cec69f4cbd3be1d4ce67faa820e3b4190b670a3c
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Tue Jan 2 11:02:17 2024 +0800

    [Enhancement](wal) Add timout for wal memory back pressure (#29178)
---
 be/src/common/config.cpp                    |  2 +-
 be/src/common/config.h                      |  2 +-
 be/src/runtime/group_commit_mgr.cpp         | 22 +++++++++++++++++++---
 be/src/runtime/group_commit_mgr.h           |  6 +++++-
 be/src/vec/sink/group_commit_block_sink.cpp | 11 ++++++-----
 be/src/vec/sink/group_commit_block_sink.h   |  2 +-
 6 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e94b4c6dadb..9b30ae145c4 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10");
 DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
 DEFINE_Bool(wait_internal_group_commit_finish, "false");
 // Max size(bytes) of group commit queues, used for mem back pressure, defult 
64M.
-DEFINE_Int32(group_commit_max_queue_size, "67108864");
+DEFINE_Int32(group_commit_queue_mem_limit, "67108864");
 // Max size(bytes) or percentage(%) of wal disk usage, used for disk space 
back pressure, default 10% of the disk available space.
 // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
 DEFINE_String(group_commit_wal_max_disk_limit, "10%");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2d257b49510..c340abe05f3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads);
 DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
 DECLARE_Bool(wait_internal_group_commit_finish);
 // Max size(bytes) of group commit queues, used for mem back pressure.
-DECLARE_Int32(group_commit_max_queue_size);
+DECLARE_Int32(group_commit_queue_mem_limit);
 // Max size(bytes) or percentage(%) of wal disk usage, used for disk space 
back pressure, default 10% of the disk available space.
 // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
 DECLARE_mString(group_commit_wal_max_disk_limit);
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 00b3a559fbf..b965efadd70 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -21,6 +21,7 @@
 #include <glog/logging.h>
 
 #include <atomic>
+#include <chrono>
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -40,14 +41,29 @@
 
 namespace doris {
 
-Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, 
bool write_wal) {
+Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
+                                 std::shared_ptr<vectorized::Block> block, 
bool write_wal) {
     std::unique_lock l(mutex);
     RETURN_IF_ERROR(status);
-    while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
-           config::group_commit_max_queue_size) {
+    auto start = std::chrono::steady_clock::now();
+    while (!runtime_state->is_cancelled() && status.ok() &&
+           _all_block_queues_bytes->load(std::memory_order_relaxed) >
+                   config::group_commit_queue_mem_limit) {
         _put_cond.wait_for(
                 l, 
std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
+        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+                std::chrono::steady_clock::now() - start);
+        if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) 
{
+            return Status::TimedOut(
+                    "Wal memory back pressure wait too much time! Load block 
queue txn id: {}, "
+                    "label: {}, instance id: {}",
+                    txn_id, label, load_instance_id.to_string());
+        }
     }
+    if (runtime_state->is_cancelled()) {
+        return Status::Cancelled(runtime_state->cancel_reason());
+    }
+    RETURN_IF_ERROR(status);
     if (block->rows() > 0) {
         _block_queue.push_back(block);
         if (write_wal) {
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index b0553b44876..03c917f8fcc 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -55,7 +55,8 @@ public:
               _all_block_queues_bytes(all_block_queues_bytes),
               _group_commit_interval_ms(group_commit_interval_ms) {};
 
-    Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
+    Status add_block(RuntimeState* runtime_state, 
std::shared_ptr<vectorized::Block> block,
+                     bool write_wal);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
                      bool* eos);
     Status add_load_id(const UniqueId& load_id);
@@ -68,7 +69,10 @@ public:
     bool has_enough_wal_disk_space(const 
std::vector<std::shared_ptr<vectorized::Block>>& blocks,
                                    const TUniqueId& load_id, bool 
is_blocks_contain_all_load_data);
 
+    // 1s
     static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
+    // 120s
+    static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000;
     UniqueId load_instance_id;
     std::string label;
     int64_t txn_id;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 56869665878..277b9859bd5 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -114,7 +114,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, 
Status close_status) {
             (double)state->num_rows_load_filtered() / num_selected_rows > 
_max_filter_ratio) {
             return Status::DataQualityError("too many filtered rows");
         }
-        RETURN_IF_ERROR(_add_blocks(true));
+        RETURN_IF_ERROR(_add_blocks(state, true));
     }
     if (_load_block_queue) {
         _load_block_queue->remove_load_id(_load_id);
@@ -220,15 +220,16 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* 
state,
         _blocks.emplace_back(output_block);
     } else {
         if (!_is_block_appended) {
-            RETURN_IF_ERROR(_add_blocks(false));
+            RETURN_IF_ERROR(_add_blocks(state, false));
         }
         RETURN_IF_ERROR(_load_block_queue->add_block(
-                output_block, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
+                state, output_block, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
     }
     return Status::OK();
 }
 
-Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) 
{
+Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
+                                         bool is_blocks_contain_all_load_data) 
{
     DCHECK(_is_block_appended == false);
     TUniqueId load_id;
     load_id.__set_hi(_load_id.hi);
@@ -257,7 +258,7 @@ Status GroupCommitBlockSink::_add_blocks(bool 
is_blocks_contain_all_load_data) {
     }
     for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
         RETURN_IF_ERROR(_load_block_queue->add_block(
-                *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
+                state, *it, _group_commit_mode == 
TGroupCommitMode::ASYNC_MODE));
     }
     _is_block_appended = true;
     _blocks.clear();
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index 84ffebf8fe1..3db4bdd31f8 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -47,7 +47,7 @@ public:
 
 private:
     Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> 
block);
-    Status _add_blocks(bool is_blocks_contain_all_load_data);
+    Status _add_blocks(RuntimeState* state, bool 
is_blocks_contain_all_load_data);
 
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to