This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7bcf173178d branch-3.0: [fix](load) fix dead lock when write memtable 
failed #49170 (#49230)
7bcf173178d is described below

commit 7bcf173178d938bbfef86323837dafb901150a30
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 20 10:53:03 2025 +0800

    branch-3.0: [fix](load) fix dead lock when write memtable failed #49170 
(#49230)
    
    Cherry-picked from #49170
    
    Co-authored-by: Xin Liao <liao...@selectdb.com>
---
 be/src/olap/memtable_memory_limiter.cpp            | 18 ++++-
 be/src/olap/memtable_writer.cpp                    |  6 +-
 .../test_memtable_write_failed.groovy              | 90 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 1cb6c0c8e2d..1f59f8fc341 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -103,6 +103,12 @@ bool MemTableMemoryLimiter::_load_usage_low() {
 }
 
 int64_t MemTableMemoryLimiter::_need_flush() {
+    DBUG_EXECUTE_IF("MemTableMemoryLimiter._need_flush.random_flush", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+            LOG(INFO) << "debug memtable need flush return 1";
+            return 1;
+        }
+    });
     int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
     int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
     int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
@@ -113,9 +119,15 @@ int64_t MemTableMemoryLimiter::_need_flush() {
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    if (!_soft_limit_reached() || _load_usage_low()) {
-        return;
-    }
+    do {
+        
DBUG_EXECUTE_IF("MemTableMemoryLimiter._handle_memtable_flush.limit_reached", {
+            LOG(INFO) << "debug memtable limit reached";
+            break;
+        });
+        if (!_soft_limit_reached() || _load_usage_low()) {
+            return;
+        }
+    } while (false);
     MonotonicStopWatch timer;
     timer.start();
     std::unique_lock<std::mutex> l(_lock);
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 3dc88ae668f..56312073c57 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -124,8 +124,12 @@ Status MemTableWriter::write(const vectorized::Block* 
block,
     // 2. However, memory pressure might trigger a flush operation on this 
failed memtable
     // 3. By resetting here, we ensure the failed memtable won't be included 
in any subsequent flush,
     //    thus preventing potential crashes
+    DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.3))) {
+            st = Status::InternalError<false>("write memtable random failed 
for debug");
+        }
+    });
     if (!st.ok()) [[unlikely]] {
-        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
         _reset_mem_table();
         return st;
     }
diff --git 
a/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy 
b/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy
new file mode 100644
index 00000000000..5664c022616
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_memtable_write_failed", "nonConcurrent") {
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    def testTable = "test_memtable_write_failed"
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS `${testTable}` (
+          `id` BIGINT NOT NULL AUTO_INCREMENT,
+          `value` int(11) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+    
+    def run_test = {thread_num, rows, iters -> 
+        def threads = []
+        (1..thread_num).each { id1 -> 
+            threads.add(Thread.start {
+                (1..iters).each { id2 -> 
+                    try {
+                         sql """insert into ${testTable}(value) select number 
from numbers("number" = "${rows}");"""
+                         String content = ""
+                         (1..4096).each {
+                             content += "${it},${it}\n"
+                         }
+                         content += content
+                         streamLoad {
+                             table "${testTable}"
+                             set 'column_separator', ','
+                             inputStream new 
ByteArrayInputStream(content.getBytes())
+                             time 30000 // limit inflight 10s
+
+                             check { result, exception, startTime, endTime ->
+                                 if (exception != null) {
+                                     throw exception
+                                 }
+                                 def json = parseJson(result)
+                                 if (json.Status.equalsIgnoreCase("success")) {
+                                     assertEquals(8192, json.NumberTotalRows)
+                                     assertEquals(0, json.NumberFilteredRows)
+                                 } else {
+                                     assertTrue(json.Message.contains("write 
memtable random failed for debug"))
+                                 }
+                             }
+                         }
+                    } catch (Exception e) {
+                         logger.info(e.getMessage())
+                         assertTrue(e.getMessage().contains("write memtable 
random failed for debug"))
+                    }
+                }
+            })
+        }
+        threads.each { thread -> thread.join() }
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("MemTableWriter.write.random_insert_error")
+        
GetDebugPoint().enableDebugPointForAllBEs("MemTableMemoryLimiter._handle_memtable_flush.limit_reached")
+        
GetDebugPoint().enableDebugPointForAllBEs("MemTableMemoryLimiter._need_flush.random_flush")
+        run_test(5, 10000, 10)
+    } catch (Exception e){
+        logger.info(e.getMessage())
+        assertTrue(e.getMessage().contains("write memtable random failed for 
debug"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("MemTableWriter.write.random_insert_error")
+        
GetDebugPoint().disableDebugPointForAllBEs("MemTableMemoryLimiter._handle_memtable_flush.limit_reached")
+        
GetDebugPoint().disableDebugPointForAllBEs("MemTableMemoryLimiter._need_flush.random_flush")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to