This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b153d7e30b7 branch-4.0: [enhance](memtable) support adaptive memtable
write buffer size #56948 (#57023)
b153d7e30b7 is described below
commit b153d7e30b7f6e957269ac103ce2a8a42a2457b6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 17:51:06 2025 +0800
branch-4.0: [enhance](memtable) support adaptive memtable write buffer size
#56948 (#57023)
Cherry-picked from #56948
Co-authored-by: hui lai <[email protected]>
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/olap/memtable.cpp | 21 +++++++++-
be/src/olap/memtable.h | 6 +++
be/src/olap/memtable_writer.cpp | 23 +++++++---
be/src/olap/memtable_writer.h | 2 +
.../memtable/test_memtable_too_many_rows.groovy | 49 ++++++++++++++++++++++
7 files changed, 97 insertions(+), 6 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e7dec44248..c2ce46bb06e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -687,6 +687,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
+DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
// max buffer size used in memtable for the aggregated table, default 400MB
DEFINE_mInt64(write_buffer_size_for_agg, "104857600");
DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index af81e4ff273..df83ae46909 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -711,6 +711,7 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
+DECLARE_mBool(enable_adaptive_write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
DECLARE_mInt64(write_buffer_size_for_agg);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 1dbe2e8020b..e5c05c3a1d6 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -83,6 +83,7 @@ MemTable::MemTable(int64_t tablet_id,
std::shared_ptr<TabletSchema> tablet_schem
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_row_in_blocks =
std::make_unique<DorisVector<std::shared_ptr<RowInBlock>>>();
+ _load_mem_limit = MemInfo::mem_limit() *
config::load_process_max_memory_limit_percent / 100;
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
@@ -652,7 +653,7 @@ void MemTable::shrink_memtable_by_agg() {
bool MemTable::need_flush() const {
DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
- auto max_size = config::write_buffer_size;
+ auto max_size = _adaptive_write_buffer_size();
if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
auto update_columns_size = _num_columns;
auto min_buffer_size =
config::min_write_buffer_size_for_partial_update;
@@ -662,6 +663,24 @@ bool MemTable::need_flush() const {
return memory_usage() >= max_size;
}
+int64_t MemTable::_adaptive_write_buffer_size() const {
+ if (!config::enable_adaptive_write_buffer_size) [[unlikely]] {
+ return config::write_buffer_size;
+ }
+ const int64_t current_load_mem_value = MemoryProfile::load_current_usage();
+ int64_t factor = 4;
+ // Memory usage intervals:
+ // (80 %, 100 %] → 1× buffer
+ // (50 %, 80 %] → 2× buffer
+ // [0 %, 50 %] → 4× buffer
+ if (current_load_mem_value > (_load_mem_limit * 4) / 5) { // > 80 %
+ factor = 1;
+ } else if (current_load_mem_value > _load_mem_limit / 2) { // > 50 %
+ factor = 2;
+ }
+ return config::write_buffer_size * factor;
+}
+
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = _last_agg_pos + config::write_buffer_size_for_agg;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 5cd70e812a9..47426e354d0 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -20,6 +20,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
@@ -204,6 +205,8 @@ public:
void update_mem_type(MemType memtype) { _mem_type = memtype; }
+ int64_t raw_rows() { return _stat.raw_rows.load(); }
+
private:
// for vectorized
template <bool has_skip_bitmap_col>
@@ -213,6 +216,8 @@ private:
// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);
+ int64_t _adaptive_write_buffer_size() const;
+
private:
std::atomic<MemType> _mem_type;
int64_t _tablet_id;
@@ -231,6 +236,7 @@ private:
// In this way, we can make MemTable::memory_usage() to be more accurate,
and eventually
// reduce the number of segment files that are generated by current load
vectorized::Arena _arena;
+ int64_t _load_mem_limit = -1;
void _init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
const TupleDescriptor* tuple_desc);
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 82349d2e0ee..95ad9c192aa 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -103,6 +103,14 @@ Status MemTableWriter::write(const vectorized::Block*
block,
_req.tablet_id,
_req.load_id.hi(), _req.load_id.lo());
}
+ // Flush and reset memtable if it is raw rows great than int32_t.
+ int64_t raw_rows = _mem_table->raw_rows();
+ DBUG_EXECUTE_IF("MemTableWriter.too_many_raws",
+ { raw_rows = std::numeric_limits<int32_t>::max(); });
+ if (raw_rows + row_idxs.size() > std::numeric_limits<int32_t>::max()) {
+ RETURN_IF_ERROR(_flush_memtable());
+ }
+
_total_received_rows += row_idxs.size();
auto st = _mem_table->insert(block, row_idxs);
@@ -127,16 +135,21 @@ Status MemTableWriter::write(const vectorized::Block*
block,
_mem_table->shrink_memtable_by_agg();
}
if (UNLIKELY(_mem_table->need_flush())) {
- auto s = _flush_memtable_async();
- _reset_mem_table();
- if (UNLIKELY(!s.ok())) {
- return s;
- }
+ RETURN_IF_ERROR(_flush_memtable());
}
return Status::OK();
}
+Status MemTableWriter::_flush_memtable() {
+ auto s = _flush_memtable_async();
+ _reset_mem_table();
+ if (UNLIKELY(!s.ok())) {
+ return s;
+ }
+ return Status::OK();
+}
+
Status MemTableWriter::_flush_memtable_async() {
DCHECK(_flush_token != nullptr);
std::shared_ptr<MemTable> memtable;
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index aa1fd4025ed..7465e79a452 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -25,6 +25,7 @@
#include <cstdint>
#include <memory>
#include <mutex>
+#include <random>
#include <vector>
#include "common/status.h"
@@ -109,6 +110,7 @@ public:
}
private:
+ Status _flush_memtable();
// push a full memtable to flush executor
Status _flush_memtable_async();
diff --git
a/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy
b/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy
new file mode 100644
index 00000000000..38516864727
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_memtable_too_many_rows", "nonConcurrent") {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def testTable = "test_memtable_too_many_rows"
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `${testTable}` (
+ `id` BIGINT NOT NULL,
+ `value` int(11) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ def debugPoint = "MemTableWriter.too_many_raws"
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(debugPoint)
+ sql "insert into ${testTable} values(1,1)"
+ def res = sql "select * from ${testTable}"
+ logger.info("res: " + res.size())
+ assertTrue(res.size() == 1)
+ } catch (Exception e){
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains("write memtable too many rows
fail"))
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(debugPoint)
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]