This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 3abb222064d [fix](group commit) Fix 
test_group_commit_async_wal_msg_fault_injection case (#35313) (#38911)
3abb222064d is described below

commit 3abb222064d9eaa1adcce5ab0331e245ba5e8410
Author: meiyi <myime...@gmail.com>
AuthorDate: Tue Aug 6 17:57:22 2024 +0800

    [fix](group commit) Fix test_group_commit_async_wal_msg_fault_injection 
case (#35313) (#38911)
    
    pick https://github.com/apache/doris/pull/35313
---
 be/src/runtime/group_commit_mgr.cpp                | 10 +++++----
 be/src/vec/sink/group_commit_block_sink.cpp        | 26 ++++++++++++----------
 ...oup_commit_async_wal_msg_fault_injection.groovy | 12 +++++-----
 3 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index b6b4c5d646d..fe53ce7e6cb 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -505,10 +505,12 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
     }
     LOG(INFO) << ss.str();
     
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 {
-        std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string();
-        LOG(INFO) << "debug promise set: " << msg;
-        ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
-                Status ::InternalError(msg));
+        if (dp->param<int64_t>("table_id", -1) == table_id) {
+            std ::string msg = 
_exec_env->wal_mgr()->get_wal_dirs_info_string();
+            LOG(INFO) << "table_id" << std::to_string(table_id) << " set debug 
promise: " << msg;
+            
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
+                    Status ::InternalError(msg));
+        }
     };);
     return st;
 }
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 3fb36922415..40e4b97a6e8 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -287,18 +287,20 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* 
state,
     _is_block_appended = true;
     _blocks.clear();
     
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 {
-        if (_load_block_queue) {
-            _remove_estimated_wal_bytes();
-            _load_block_queue->remove_load_id(_load_id);
-        }
-        if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
-                    std ::chrono ::seconds(60)) == std ::future_status 
::ready) {
-            auto st = 
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
-            ExecEnv::GetInstance()->group_commit_mgr()->debug_promise = 
std::promise<Status>();
-            ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
-                    
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
-            LOG(INFO) << "debug future output: " << st.to_string();
-            RETURN_IF_ERROR(st);
+        if (dp->param<int64_t>("table_id", -1) == _table_id) {
+            if (_load_block_queue) {
+                _remove_estimated_wal_bytes();
+                _load_block_queue->remove_load_id(_load_id);
+            }
+            if 
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
+                        std ::chrono ::seconds(60)) == std ::future_status 
::ready) {
+                auto st = 
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
+                ExecEnv::GetInstance()->group_commit_mgr()->debug_promise = 
std::promise<Status>();
+                ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
+                        
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
+                LOG(INFO) << "debug future output: " << st.to_string();
+                RETURN_IF_ERROR(st);
+            }
         }
     });
     return Status::OK();
diff --git 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
index bb064305130..c9e22504b1b 100644
--- 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -19,8 +19,7 @@ import org.awaitility.Awaitility
 import static java.util.concurrent.TimeUnit.SECONDS
 
 suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
-
-
+    def dbName = "regression_test_fault_injection_p0"
     def tableName = "wal_test"
 
     def getRowCount = { expectedRowCount ->
@@ -47,10 +46,11 @@ 
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
         """
 
     GetDebugPoint().clearDebugPointsForAllBEs()
+    def tableId = getTableId(dbName, tableName)
 
     def exception = false;
         try {
-            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 [table_id:"${tableId}"])
             streamLoad {
                 table "${tableName}"
                 set 'column_separator', ','
@@ -83,10 +83,11 @@ 
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
         """
 
     GetDebugPoint().clearDebugPointsForAllBEs()
+    tableId = getTableId(dbName, tableName)
 
     exception = false;
         try {
-            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 [table_id:"${tableId}"])
             
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
             streamLoad {
                 table "${tableName}"
@@ -121,10 +122,11 @@ 
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
         """
 
     GetDebugPoint().clearDebugPointsForAllBEs()
+    tableId = getTableId(dbName, tableName)
 
     exception = false;
         try {
-            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 [table_id:"${tableId}"])
             
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
             streamLoad {
                 table "${tableName}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to