This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 90d92e99b8d [fix](group commit) fix group commit insert rpc may stuck (#39391) 90d92e99b8d is described below commit 90d92e99b8daa5a19d36d0b0c1590877b90e2ff9 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Aug 15 19:49:03 2024 +0800 [fix](group commit) fix group commit insert rpc may stuck (#39391) 1. when exec_plan_fragment failed and PipelineFragmentContext is not constructed, the group_commit_insert rpc will stuck 2. the `LoadBlockQueue.add_block.back_pressure_time_out` debug point is not work --- be/src/runtime/fragment_mgr.cpp | 2 ++ be/src/runtime/group_commit_mgr.cpp | 6 ++---- be/src/service/internal_service.cpp | 23 ++++++++++++++++++--- .../group_commit/test_group_commit_error.groovy | 24 ++++++++++++++++++++++ 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 16305a8e915..8df1fc84f86 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -579,6 +579,8 @@ std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock( template <typename Params> Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, std::shared_ptr<QueryContext>& query_ctx) { + DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", + { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map std::lock_guard<std::mutex> lock(_lock); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 0384b502e0a..2383f25afc8 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -38,12 +38,10 @@ namespace doris { Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block, bool write_wal, UniqueId& load_id) { + DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed", + { return Status::InternalError("LoadBlockQueue.add_block.failed"); }); std::unique_lock l(mutex); RETURN_IF_ERROR(status); - auto start = std::chrono::steady_clock::now(); - DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", { - start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000); - }); if (UNLIKELY(runtime_state->is_cancelled())) { return runtime_state->cancel_reason(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d8cd7356ddb..2492d2a846b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2030,7 +2030,10 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont TUniqueId load_id; load_id.__set_hi(request->load_id().hi()); load_id.__set_lo(request->load_id().lo()); - bool ret = _light_work_pool.try_offer([this, request, response, done, load_id]() { + std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>(); + std::shared_ptr<bool> is_done = std::make_shared<bool>(false); + bool ret = _light_work_pool.try_offer([this, request, response, done, load_id, lock, + is_done]() { brpc::ClosureGuard closure_guard(done); std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); auto pipe = std::make_shared<io::StreamLoadPipe>( @@ -2044,7 +2047,13 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont request->exec_plan_fragment_request().request(), request->exec_plan_fragment_request().version(), request->exec_plan_fragment_request().compact(), - [&, response, done, load_id](RuntimeState* state, Status* status) { + [&, response, done, load_id, lock, is_done](RuntimeState* state, + Status* status) { + std::lock_guard<std::mutex> lock1(*lock); + if (*is_done) { + return; + } + *is_done = true; brpc::ClosureGuard cb_closure_guard(done); response->set_label(state->import_label()); response->set_txn_id(state->wal_id()); @@ -2063,11 +2072,19 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont st = Status::Error(ErrorCode::INTERNAL_ERROR, "_exec_plan_fragment_impl meet unknown error"); } - closure_guard.release(); if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id) << ", errmsg=" << st; + std::lock_guard<std::mutex> lock1(*lock); + if (*is_done) { + closure_guard.release(); + } else { + *is_done = true; + st.to_protobuf(response->mutable_status()); + _exec_env->new_load_stream_mgr()->remove(load_id); + } } else { + closure_guard.release(); for (int i = 0; i < request->data().size(); ++i) { std::unique_ptr<PDataRow> row(new PDataRow()); row->CopyFrom(request->data(i)); diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy index 1416a86e5e9..4589b38cafc 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy @@ -52,4 +52,28 @@ suite("test_group_commit_error", "nonConcurrent") { } finally { GetDebugPoint().clearDebugPointsForAllBEs() } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr._get_query_ctx.failed") + sql """ set group_commit = async_mode """ + sql """ set enable_nereids_planner = false """ + sql """ insert into ${tableName} values (3, 3) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.failed") + sql """ set group_commit = async_mode """ + sql """ set enable_nereids_planner = false """ + sql """ insert into ${tableName} values (4, 4) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org