This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new eea36767912 [fix](group commit) fix group commit insert rpc may stuck (#39391) (#39458) eea36767912 is described below commit eea36767912b8174962df8465c431e0a017c067e Author: meiyi <myime...@gmail.com> AuthorDate: Fri Aug 16 13:19:00 2024 +0800 [fix](group commit) fix group commit insert rpc may stuck (#39391) (#39458) pick https://github.com/apache/doris/pull/39391 --- be/src/runtime/fragment_mgr.cpp | 2 ++ be/src/runtime/group_commit_mgr.cpp | 2 ++ be/src/service/internal_service.cpp | 23 ++++++++++++++++++--- .../group_commit/test_group_commit_error.groovy | 24 ++++++++++++++++++++++ 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 8ceeff10481..c61bb82df75 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -620,6 +620,8 @@ void FragmentMgr::remove_pipeline_context( template <typename Params> Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, std::shared_ptr<QueryContext>& query_ctx) { + DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", + { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map std::lock_guard<std::mutex> lock(_lock); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index d97b268fc27..a5bf52d2ca7 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -35,6 +35,8 @@ namespace doris { Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block, bool write_wal) { + DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed", + { return Status::InternalError("LoadBlockQueue.add_block.failed"); }); std::unique_lock l(mutex); RETURN_IF_ERROR(status); auto start = std::chrono::steady_clock::now(); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 8ab2b06a805..013af8e8030 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2229,7 +2229,10 @@ void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* TUniqueId load_id; load_id.__set_hi(request->load_id().hi()); load_id.__set_lo(request->load_id().lo()); - bool ret = _light_work_pool.try_offer([this, request, response, done, load_id]() { + std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>(); + std::shared_ptr<bool> is_done = std::make_shared<bool>(false); + bool ret = _light_work_pool.try_offer([this, request, response, done, load_id, lock, + is_done]() { brpc::ClosureGuard closure_guard(done); std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); auto pipe = std::make_shared<io::StreamLoadPipe>( @@ -2243,7 +2246,13 @@ void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* request->exec_plan_fragment_request().request(), request->exec_plan_fragment_request().version(), request->exec_plan_fragment_request().compact(), - [&, response, done, load_id](RuntimeState* state, Status* status) { + [&, response, done, load_id, lock, is_done](RuntimeState* state, + Status* status) { + std::lock_guard<std::mutex> lock1(*lock); + if (*is_done) { + return; + } + *is_done = true; brpc::ClosureGuard cb_closure_guard(done); response->set_label(state->import_label()); response->set_txn_id(state->wal_id()); @@ -2262,11 +2271,19 @@ void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* st = Status::Error(ErrorCode::INTERNAL_ERROR, "_exec_plan_fragment_impl meet unknown error"); } - closure_guard.release(); if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id) << ", errmsg=" << st; + std::lock_guard<std::mutex> lock1(*lock); + if (*is_done) { + closure_guard.release(); + } else { + *is_done = true; + st.to_protobuf(response->mutable_status()); + _exec_env->new_load_stream_mgr()->remove(load_id); + } } else { + closure_guard.release(); for (int i = 0; i < request->data().size(); ++i) { std::unique_ptr<PDataRow> row(new PDataRow()); row->CopyFrom(request->data(i)); diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy index 749da71117b..c7f30dfea36 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy @@ -52,4 +52,28 @@ suite("test_group_commit_error", "nonConcurrent") { } finally { GetDebugPoint().clearDebugPointsForAllBEs() } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr._get_query_ctx.failed") + sql """ set group_commit = async_mode """ + sql """ set enable_nereids_planner = false """ + sql """ insert into ${tableName} values (3, 3) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.failed") + sql """ set group_commit = async_mode """ + sql """ set enable_nereids_planner = false """ + sql """ insert into ${tableName} values (4, 4) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org