This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f62a4462dd [Enhancement](wal) Add wal space back pressure (#26483)
5f62a4462dd is described below

commit 5f62a4462dd670d83cd09349a31125c21686fc71
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Thu Nov 9 12:29:05 2023 +0800

    [Enhancement](wal) Add wal space back pressure (#26483)
---
 be/src/common/config.cpp                |  7 +++++--
 be/src/common/config.h                  |  3 +++
 be/src/olap/wal_manager.cpp             | 24 +++++++++++++++++++++++-
 be/src/olap/wal_manager.h               |  4 ++++
 be/src/olap/wal_writer.cpp              | 20 +++++++++++++++++++-
 be/src/olap/wal_writer.h                | 13 ++++++++++++-
 be/test/olap/wal_manager_test.cpp       |  4 +++-
 be/test/olap/wal_reader_writer_test.cpp |  5 ++++-
 8 files changed, 73 insertions(+), 7 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 29af4d5059a..0133a78ecf9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1118,8 +1118,11 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, 
"true");
 // Dir of default timezone files
 DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
 
-// Max size(bytes) of group commit queues, used for mem back pressure.
-DEFINE_Int32(group_commit_max_queue_size, "65536");
+// Max size(bytes) of group commit queues, used for mem back pressure, defult 
64M.
+DEFINE_Int32(group_commit_max_queue_size, "67108864");
+
+// Max size(bytes) of wal disk using, used for disk space back pressure, 
default 64M.
+DEFINE_Int32(wal_max_disk_size, "67108864");
 
 // Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
 DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f0fdf58558c..d9276aef9a5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1195,6 +1195,9 @@ DECLARE_String(default_tzfiles_path);
 // Max size(bytes) of group commit queues, used for mem back pressure.
 DECLARE_Int32(group_commit_max_queue_size);
 
+// Max size(bytes) of wal disk using, used for disk space back pressure.
+DECLARE_Int32(wal_max_disk_size);
+
 // Ingest binlog work pool size
 DECLARE_Int32(ingest_binlog_work_pool_size);
 
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index cf84c4a9661..921f7da8e38 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -19,10 +19,15 @@
 
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include <atomic>
 #include <chrono>
+#include <cstdint>
 #include <filesystem>
+#include <memory>
+#include <utility>
 
 #include "io/fs/local_file_system.h"
+#include "olap/wal_writer.h"
 #include "runtime/client_cache.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/plan_fragment_executor.h"
@@ -35,11 +40,13 @@ namespace doris {
 WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
         : _exec_env(exec_env), _stop_background_threads_latch(1) {
     doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
+    _all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
 }
 
 WalManager::~WalManager() {
     LOG(INFO) << "WalManager is destoried";
 }
+
 void WalManager::stop() {
     _stop = true;
     _stop_background_threads_latch.count_down();
@@ -117,8 +124,12 @@ Status WalManager::create_wal_writer(int64_t wal_id, 
std::shared_ptr<WalWriter>&
         
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
     }
     LOG(INFO) << "create wal " << wal_path;
-    wal_writer = std::make_shared<WalWriter>(wal_path);
+    wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes);
     RETURN_IF_ERROR(wal_writer->init());
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+        _wal_id_to_writer_map.emplace(wal_id, wal_writer);
+    }
     return Status::OK();
 }
 
@@ -241,6 +252,17 @@ size_t WalManager::get_wal_table_size(const std::string& 
table_id) {
 Status WalManager::delete_wal(int64_t wal_id) {
     {
         std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+        if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) 
{
+            _all_wal_disk_bytes->store(
+                    
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
+                                                   std::memory_order_relaxed),
+                    std::memory_order_relaxed);
+            _wal_id_to_writer_map[wal_id]->cv.notify_one();
+            _wal_id_to_writer_map.erase(wal_id);
+        }
+        if (_wal_id_to_writer_map.empty()) {
+            CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
+        }
         std::string wal_path = _wal_path_map[wal_id];
         RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
         LOG(INFO) << "delete file=" << wal_path;
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index f5b49f6ddaf..d8fa4a70527 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
+
 #include "common/config.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/FrontendService_types.h"
@@ -56,6 +58,8 @@ private:
     std::vector<std::string> _wal_dirs;
     std::shared_mutex _wal_lock;
     std::unordered_map<int64_t, std::string> _wal_path_map;
+    std::unordered_map<int64_t, std::shared_ptr<WalWriter>> 
_wal_id_to_writer_map;
+    std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
     bool _stop = false;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp
index 7cd427453bb..085c0f0e31f 100644
--- a/be/src/olap/wal_writer.cpp
+++ b/be/src/olap/wal_writer.cpp
@@ -17,6 +17,9 @@
 
 #include "olap/wal_writer.h"
 
+#include <atomic>
+
+#include "common/config.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
@@ -25,7 +28,12 @@
 
 namespace doris {
 
-WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
+WalWriter::WalWriter(const std::string& file_name,
+                     const std::shared_ptr<std::atomic_size_t>& 
all_wal_disk_bytes)
+        : _file_name(file_name),
+          _count(0),
+          _disk_bytes(0),
+          _all_wal_disk_bytes(all_wal_disk_bytes) {}
 
 WalWriter::~WalWriter() {}
 
@@ -44,6 +52,12 @@ Status WalWriter::finalize() {
 }
 
 Status WalWriter::append_blocks(const PBlockArray& blocks) {
+    {
+        std::unique_lock l(_mutex);
+        while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > 
config::wal_max_disk_size) {
+            cv.wait_for(l, 
std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
+        }
+    }
     size_t total_size = 0;
     for (const auto& block : blocks) {
         total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
@@ -62,6 +76,10 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
         offset += CHECKSUM_SIZE;
     }
     DCHECK(offset == total_size);
+    _disk_bytes += total_size;
+    _all_wal_disk_bytes->store(
+            _all_wal_disk_bytes->fetch_add(total_size, 
std::memory_order_relaxed),
+            std::memory_order_relaxed);
     // write rows
     RETURN_IF_ERROR(_file_writer->append({row_binary, offset}));
     _count++;
diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h
index 12fd84f258f..9fd0a396788 100644
--- a/be/src/olap/wal_writer.h
+++ b/be/src/olap/wal_writer.h
@@ -17,9 +17,13 @@
 
 #pragma once
 
+#include <atomic>
+#include <memory>
+
 #include "common/status.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "util/lock.h"
 
 namespace doris {
 
@@ -27,23 +31,30 @@ using PBlockArray = std::vector<PBlock*>;
 
 class WalWriter {
 public:
-    explicit WalWriter(const std::string& file_name);
+    explicit WalWriter(const std::string& file_name,
+                       const std::shared_ptr<std::atomic_size_t>& 
all_wal_disk_bytes);
     ~WalWriter();
 
     Status init();
     Status finalize();
 
     Status append_blocks(const PBlockArray& blocks);
+    size_t disk_bytes() const { return _disk_bytes; };
 
     std::string file_name() { return _file_name; };
     static const int64_t LENGTH_SIZE = 8;
     static const int64_t CHECKSUM_SIZE = 4;
+    doris::ConditionVariable cv;
 
 private:
+    static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000;
     std::string _file_name;
     io::FileWriterPtr _file_writer;
     int64_t _count;
     int64_t _batch;
+    size_t _disk_bytes;
+    std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
+    doris::Mutex _mutex;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/test/olap/wal_manager_test.cpp 
b/be/test/olap/wal_manager_test.cpp
index fbb5fdbf03f..ec387680a63 100644
--- a/be/test/olap/wal_manager_test.cpp
+++ b/be/test/olap/wal_manager_test.cpp
@@ -78,7 +78,9 @@ public:
     void prepare() { 
static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); }
 
     void createWal(const std::string& wal_path) {
-        auto wal_writer = WalWriter(wal_path);
+        std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
+                std::make_shared<std::atomic_size_t>(0);
+        auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes);
         static_cast<void>(wal_writer.init());
         static_cast<void>(wal_writer.finalize());
     }
diff --git a/be/test/olap/wal_reader_writer_test.cpp 
b/be/test/olap/wal_reader_writer_test.cpp
index 71c822013a2..09460477e38 100644
--- a/be/test/olap/wal_reader_writer_test.cpp
+++ b/be/test/olap/wal_reader_writer_test.cpp
@@ -17,6 +17,7 @@
 #include <gtest/gtest.h>
 
 #include <filesystem>
+#include <memory>
 
 #include "agent/be_exec_version_manager.h"
 #include "common/object_pool.h"
@@ -89,7 +90,9 @@ void generate_block(PBlock& pblock, int row_index) {
 
 TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
     std::string file_name = _s_test_data_path + "/abcd123.txt";
-    auto wal_writer = WalWriter(file_name);
+    std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
+            std::make_shared<std::atomic_size_t>(0);
+    auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes);
     static_cast<void>(wal_writer.init());
     size_t file_len = 0;
     int64_t file_size = -1;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to