This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new ffc57c9ef44 [Bug](runtime-filter) fix brpc ctrl use after free (#37223) ffc57c9ef44 is described below commit ffc57c9ef44a5f7f199f4db1dfa15bb35349bb08 Author: Pxl <pxl...@qq.com> AuthorDate: Wed Jul 3 21:01:50 2024 +0800 [Bug](runtime-filter) fix brpc ctrl use after free (#37223) part of #35186 --- be/src/runtime/runtime_filter_mgr.cpp | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 010cb5a60e5..dfc55edc7c3 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -346,23 +346,22 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz for (auto addr : cnt_val->source_addrs) { std::shared_ptr<PBackendService_Stub> stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr)); - AsyncRPCContext<PSyncFilterSizeRequest, PSyncFilterSizeResponse> ctx; - auto* pquery_id = ctx.request.mutable_query_id(); + + auto closure = AutoReleaseClosure<PSyncFilterSizeRequest, + DummyBrpcCallback<PSyncFilterSizeResponse>>:: + create_unique(std::make_shared<PSyncFilterSizeRequest>(), + DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared()); + + auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); - ctx.request.set_filter_id(filter_id); - ctx.request.set_filter_size(cnt_val->global_size); + closure->request_->set_filter_id(filter_id); + closure->request_->set_filter_size(cnt_val->global_size); - stub->sync_filter_size(&ctx.cntl, &ctx.request, &ctx.response, brpc::DoNothing()); - brpc::Join(ctx.cntl.call_id()); - if (auto status = Status::create(ctx.response.status()); !status) { - return status; - } - if (ctx.cntl.Failed()) { - ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(ctx.cntl.remote_side()); - return Status::InternalError(ctx.cntl.ErrorText()); - } + stub->sync_filter_size(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), brpc::DoNothing()); + closure.release(); } } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org