This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 82bb3ed50f59eab8e47c0dac8f1c84cc22db549e
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Fri Feb 2 00:12:15 2024 +0800

    [Fix](group commit) Fix pre allocated err handling for group commit async 
load and add regression test #30718
---
 be/src/olap/wal/wal_manager.cpp                    |  8 +-
 be/src/olap/wal/wal_manager.h                      |  2 +-
 be/src/runtime/group_commit_mgr.cpp                | 12 +++
 be/src/runtime/group_commit_mgr.h                  |  3 +
 be/src/vec/sink/group_commit_block_sink.cpp        | 16 ++++
 .../fault_injection_p0/group_commit_wal_msg.csv    |  5 ++
 ...oup_commit_async_wal_msg_fault_injection.groovy | 97 ++++++++++++++++++++++
 7 files changed, 138 insertions(+), 5 deletions(-)

diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 6d59c6df686..511751693c8 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -352,9 +352,9 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t 
table_id, int64_t wal_
     }
     table_ptr->add_wal(wal_id, wal);
 #ifndef BE_TEST
-    WARN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)),
+    WARN_IF_ERROR(update_wal_dir_limit(get_base_wal_path(wal)),
                   "Failed to update wal dir limit while add recover wal!");
-    WARN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)),
+    WARN_IF_ERROR(update_wal_dir_used(get_base_wal_path(wal)),
                   "Failed to update wal dir used while add recove wal!");
 #endif
     return Status::OK();
@@ -413,7 +413,7 @@ Status WalManager::get_wal_dir_available_size(const 
std::string& wal_dir, size_t
     return _wal_dirs_info->get_wal_dir_available_size(wal_dir, 
available_bytes);
 }
 
-std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) {
+std::string WalManager::get_base_wal_path(const std::string& wal_path_str) {
     io::Path wal_path = wal_path_str;
     for (int i = 0; i < 3; ++i) {
         if (!wal_path.has_parent_path()) {
@@ -505,7 +505,7 @@ Status WalManager::delete_wal(int64_t table_id, int64_t 
wal_id, size_t block_que
         }
     }
     erase_wal_queue(table_id, wal_id);
-    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 
0,
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(get_base_wal_path(wal_path), 
0,
                                                  block_queue_pre_allocated));
     return Status::OK();
 }
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index dfa2859cbe3..db75d991154 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -84,13 +84,13 @@ public:
                            std::shared_ptr<std::condition_variable>& cv);
     Status wait_replay_wal_finish(int64_t wal_id);
     Status notify_relay_wal(int64_t wal_id);
+    static std::string get_base_wal_path(const std::string& wal_path_str);
 
 private:
     // wal back pressure
     Status _init_wal_dirs_conf();
     Status _init_wal_dirs();
     Status _init_wal_dirs_info();
-    std::string _get_base_wal_path(const std::string& wal_path_str);
     Status _update_wal_dir_info_thread();
 
     // replay wal
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index a41aef2acf2..88a8150a60f 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -25,6 +25,7 @@
 #include "client_cache.h"
 #include "common/compiler_util.h"
 #include "common/config.h"
+#include "common/status.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "util/debug_points.h"
@@ -392,6 +393,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     // status: exec_plan_fragment result
     // st: commit txn rpc status
     // result_status: commit txn result
+    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
+                    { st = Status::InternalError(""); });
     if (status.ok() && st.ok() &&
         (result_status.ok() || 
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
         if (!config::group_commit_wait_replay_wal_finish) {
@@ -405,6 +408,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
         std::string wal_path;
         RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
         RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, 
txn_id, wal_path));
+        RETURN_IF_ERROR(_exec_env->wal_mgr()->update_wal_dir_pre_allocated(
+                WalManager::get_base_wal_path(wal_path), 0,
+                load_block_queue->block_queue_pre_allocated()));
     }
     std::stringstream ss;
     ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id 
<< ", label=" << label
@@ -419,6 +425,12 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
         ss << ", rows=" << state->num_rows_load_success();
     }
     LOG(INFO) << ss.str();
+    
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 {
+        std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string();
+        LOG(INFO) << "debug promise set: " << msg;
+        ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
+                Status ::InternalError(msg));
+    };);
     return st;
 }
 
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 49bdd5f87a7..6467ca38561 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -22,6 +22,7 @@
 #include <atomic>
 #include <condition_variable>
 #include <cstdint>
+#include <future>
 #include <memory>
 #include <mutex>
 #include <shared_mutex>
@@ -170,6 +171,8 @@ public:
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version);
+    std::promise<Status> debug_promise;
+    std::future<Status> debug_future = debug_promise.get_future();
 
 private:
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index cc1abaafcf2..f10becd056b 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -19,12 +19,14 @@
 
 #include <gen_cpp/DataSinks_types.h>
 
+#include <future>
 #include <shared_mutex>
 
 #include "common/exception.h"
 #include "runtime/exec_env.h"
 #include "runtime/group_commit_mgr.h"
 #include "runtime/runtime_state.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/sink/vtablet_finder.h"
@@ -260,6 +262,20 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* 
state,
     }
     _is_block_appended = true;
     _blocks.clear();
+    
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
 {
+        if (_load_block_queue) {
+            _load_block_queue->remove_load_id(_load_id);
+        }
+        if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
+                    std ::chrono ::seconds(60)) == std ::future_status 
::ready) {
+            auto st = 
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
+            ExecEnv::GetInstance()->group_commit_mgr()->debug_promise = 
std::promise<Status>();
+            ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
+                    
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
+            LOG(INFO) << "debug future output: " << st.to_string();
+            RETURN_IF_ERROR(st);
+        }
+    });
     return Status::OK();
 }
 
diff --git a/regression-test/data/fault_injection_p0/group_commit_wal_msg.csv 
b/regression-test/data/fault_injection_p0/group_commit_wal_msg.csv
new file mode 100644
index 00000000000..6ab7bd6bcdf
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/group_commit_wal_msg.csv
@@ -0,0 +1,5 @@
+1,1
+2,2
+3,3
+4,4
+5,5
\ No newline at end of file
diff --git 
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
new file mode 100644
index 00000000000..0ad2d8b8f27
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
+
+
+    def tableName = "wal_test"
+ 
+    // test successful group commit async load
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def exception = false;
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', ','
+                set 'group_commit', 'async_mode'
+                unset 'label'
+                file 'group_commit_wal_msg.csv'
+                time 10000 
+            }
+            assertFalse(true);
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains('pre allocated 0 Bytes'))
+            exception = true;
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            assertTrue(exception)
+        }
+
+    // test failed group commit async load
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    exception = false;
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', ','
+                set 'group_commit', 'async_mode'
+                unset 'label'
+                file 'group_commit_wal_msg.csv'
+                time 10000 
+            }
+            assertFalse(true);
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains('pre allocated 0 Bytes'))
+            exception = true;
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
+            assertTrue(exception)
+        }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to