This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new eea36767912 [fix](group commit) fix group commit insert rpc may stuck 
(#39391) (#39458)
eea36767912 is described below

commit eea36767912b8174962df8465c431e0a017c067e
Author: meiyi <myime...@gmail.com>
AuthorDate: Fri Aug 16 13:19:00 2024 +0800

    [fix](group commit) fix group commit insert rpc may stuck (#39391) (#39458)
    
    pick https://github.com/apache/doris/pull/39391
---
 be/src/runtime/fragment_mgr.cpp                    |  2 ++
 be/src/runtime/group_commit_mgr.cpp                |  2 ++
 be/src/service/internal_service.cpp                | 23 ++++++++++++++++++---
 .../group_commit/test_group_commit_error.groovy    | 24 ++++++++++++++++++++++
 4 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8ceeff10481..c61bb82df75 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -620,6 +620,8 @@ void FragmentMgr::remove_pipeline_context(
 template <typename Params>
 Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, 
bool pipeline,
                                    std::shared_ptr<QueryContext>& query_ctx) {
+    DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed",
+                    { return 
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
     if (params.is_simplified_param) {
         // Get common components from _query_ctx_map
         std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index d97b268fc27..a5bf52d2ca7 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,6 +35,8 @@ namespace doris {
 
 Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
                                  std::shared_ptr<vectorized::Block> block, 
bool write_wal) {
+    DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
+                    { return 
Status::InternalError("LoadBlockQueue.add_block.failed"); });
     std::unique_lock l(mutex);
     RETURN_IF_ERROR(status);
     auto start = std::chrono::steady_clock::now();
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 8ab2b06a805..013af8e8030 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2229,7 +2229,10 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
     TUniqueId load_id;
     load_id.__set_hi(request->load_id().hi());
     load_id.__set_lo(request->load_id().lo());
-    bool ret = _light_work_pool.try_offer([this, request, response, done, 
load_id]() {
+    std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
+    std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
+    bool ret = _light_work_pool.try_offer([this, request, response, done, 
load_id, lock,
+                                           is_done]() {
         brpc::ClosureGuard closure_guard(done);
         std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
         auto pipe = std::make_shared<io::StreamLoadPipe>(
@@ -2243,7 +2246,13 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
                         request->exec_plan_fragment_request().request(),
                         request->exec_plan_fragment_request().version(),
                         request->exec_plan_fragment_request().compact(),
-                        [&, response, done, load_id](RuntimeState* state, 
Status* status) {
+                        [&, response, done, load_id, lock, 
is_done](RuntimeState* state,
+                                                                    Status* 
status) {
+                            std::lock_guard<std::mutex> lock1(*lock);
+                            if (*is_done) {
+                                return;
+                            }
+                            *is_done = true;
                             brpc::ClosureGuard cb_closure_guard(done);
                             response->set_label(state->import_label());
                             response->set_txn_id(state->wal_id());
@@ -2262,11 +2271,19 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
                 st = Status::Error(ErrorCode::INTERNAL_ERROR,
                                    "_exec_plan_fragment_impl meet unknown 
error");
             }
-            closure_guard.release();
             if (!st.ok()) {
                 LOG(WARNING) << "exec plan fragment failed, load_id=" << 
print_id(load_id)
                              << ", errmsg=" << st;
+                std::lock_guard<std::mutex> lock1(*lock);
+                if (*is_done) {
+                    closure_guard.release();
+                } else {
+                    *is_done = true;
+                    st.to_protobuf(response->mutable_status());
+                    _exec_env->new_load_stream_mgr()->remove(load_id);
+                }
             } else {
+                closure_guard.release();
                 for (int i = 0; i < request->data().size(); ++i) {
                     std::unique_ptr<PDataRow> row(new PDataRow());
                     row->CopyFrom(request->data(i));
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index 749da71117b..c7f30dfea36 100644
--- 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -52,4 +52,28 @@ suite("test_group_commit_error", "nonConcurrent") {
     } finally {
         GetDebugPoint().clearDebugPointsForAllBEs()
     }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr._get_query_ctx.failed")
+        sql """ set group_commit = async_mode """
+        sql """ set enable_nereids_planner = false """
+        sql """ insert into ${tableName} values (3, 3) """
+        assertTrue(false)
+    } catch (Exception e) {
+        logger.info("failed: " + e.getMessage())
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.failed")
+        sql """ set group_commit = async_mode """
+        sql """ set enable_nereids_planner = false """
+        sql """ insert into ${tableName} values (4, 4) """
+        assertTrue(false)
+    } catch (Exception e) {
+        logger.info("failed: " + e.getMessage())
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to