This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7bcf173178d branch-3.0: [fix](load) fix dead lock when write memtable failed #49170 (#49230) 7bcf173178d is described below commit 7bcf173178d938bbfef86323837dafb901150a30 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Mar 20 10:53:03 2025 +0800 branch-3.0: [fix](load) fix dead lock when write memtable failed #49170 (#49230) Cherry-picked from #49170 Co-authored-by: Xin Liao <liao...@selectdb.com> --- be/src/olap/memtable_memory_limiter.cpp | 18 ++++- be/src/olap/memtable_writer.cpp | 6 +- .../test_memtable_write_failed.groovy | 90 ++++++++++++++++++++++ 3 files changed, 110 insertions(+), 4 deletions(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 1cb6c0c8e2d..1f59f8fc341 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -103,6 +103,12 @@ bool MemTableMemoryLimiter::_load_usage_low() { } int64_t MemTableMemoryLimiter::_need_flush() { + DBUG_EXECUTE_IF("MemTableMemoryLimiter._need_flush.random_flush", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG(INFO) << "debug memtable need flush return 1"; + return 1; + } + }); int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit; int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark(); int64_t limit3 = _process_used_mem_more_than_soft_mem_limit(); @@ -113,9 +119,15 @@ int64_t MemTableMemoryLimiter::_need_flush() { void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - if (!_soft_limit_reached() || _load_usage_low()) { - return; - } + do { + DBUG_EXECUTE_IF("MemTableMemoryLimiter._handle_memtable_flush.limit_reached", { + LOG(INFO) << "debug memtable limit reached"; + break; + }); + if (!_soft_limit_reached() || _load_usage_low()) { + return; + } + } while (false); MonotonicStopWatch timer; timer.start(); std::unique_lock<std::mutex> l(_lock); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 3dc88ae668f..56312073c57 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -124,8 +124,12 @@ Status MemTableWriter::write(const vectorized::Block* block, // 2. However, memory pressure might trigger a flush operation on this failed memtable // 3. By resetting here, we ensure the failed memtable won't be included in any subsequent flush, // thus preventing potential crashes + DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", { + if (rand() % 100 < (100 * dp->param("percent", 0.3))) { + st = Status::InternalError<false>("write memtable random failed for debug"); + } + }); if (!st.ok()) [[unlikely]] { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); _reset_mem_table(); return st; } diff --git a/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy b/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy new file mode 100644 index 00000000000..5664c022616 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_memtable_write_failed", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllBEs() + def testTable = "test_memtable_write_failed" + sql """ DROP TABLE IF EXISTS ${testTable}""" + + sql """ + CREATE TABLE IF NOT EXISTS `${testTable}` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `value` int(11) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + def run_test = {thread_num, rows, iters -> + def threads = [] + (1..thread_num).each { id1 -> + threads.add(Thread.start { + (1..iters).each { id2 -> + try { + sql """insert into ${testTable}(value) select number from numbers("number" = "${rows}");""" + String content = "" + (1..4096).each { + content += "${it},${it}\n" + } + content += content + streamLoad { + table "${testTable}" + set 'column_separator', ',' + inputStream new ByteArrayInputStream(content.getBytes()) + time 30000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + if (json.Status.equalsIgnoreCase("success")) { + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } else { + assertTrue(json.Message.contains("write memtable random failed for debug")) + } + } + } + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("write memtable random failed for debug")) + } + } + }) + } + threads.each { thread -> thread.join() } + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("MemTableWriter.write.random_insert_error") + GetDebugPoint().enableDebugPointForAllBEs("MemTableMemoryLimiter._handle_memtable_flush.limit_reached") + GetDebugPoint().enableDebugPointForAllBEs("MemTableMemoryLimiter._need_flush.random_flush") + run_test(5, 10000, 10) + } catch (Exception e){ + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("write memtable random failed for debug")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("MemTableWriter.write.random_insert_error") + GetDebugPoint().disableDebugPointForAllBEs("MemTableMemoryLimiter._handle_memtable_flush.limit_reached") + GetDebugPoint().disableDebugPointForAllBEs("MemTableMemoryLimiter._need_flush.random_flush") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org