This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 7d4da7a [fix](rpc) fix BE crash in SendRpcResponse when high concurrency (#7413) 7d4da7a is described below commit 7d4da7af5cc55764e52e5701c87f370e5fd990cd Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Dec 16 20:27:24 2021 +0800 [fix](rpc) fix BE crash in SendRpcResponse when high concurrency (#7413) The response is accessed when done->Run is called in transmit_data(), give response a default value to avoid null pointers in high concurrency. --- be/src/service/internal_service.cpp | 8 ++++++-- be/src/util/proto_util.h | 12 +++++------- gensrc/proto/data.proto | 1 - gensrc/proto/internal_service.proto | 4 ++++ 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 90d1488..ee6cf7e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -62,14 +62,18 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt << " node=" << request->node_id(); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl); - auto st = _exec_env->stream_mgr()->transmit_data(request, &done); + // The response is accessed when done->Run is called in transmit_data(), + // give response a default value to avoid null pointers in high concurrency. + Status st; + st.to_protobuf(response->mutable_status()); + st = _exec_env->stream_mgr()->transmit_data(request, &done); if (!st.ok()) { LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg() << ", fragment_instance_id=" << print_id(request->finst_id()) << ", node=" << request->node_id(); } - st.to_protobuf(response->mutable_status()); if (done != nullptr) { + st.to_protobuf(response->mutable_status()); done->Run(); } } diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index a9da2e1..9677a82 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -27,11 +27,11 @@ inline void request_row_batch_transfer_attachment(Params* brpc_request, Closure* if (brpc_request->has_row_batch() && config::transfer_data_by_brpc_attachment == true) { butil::IOBuf attachment; auto row_batch = brpc_request->mutable_row_batch(); - row_batch->set_transfer_by_attachment(true); attachment.append(row_batch->tuple_data()); row_batch->clear_tuple_data(); row_batch->set_tuple_data(""); closure->cntl.request_attachment().swap(attachment); + brpc_request->set_transfer_by_attachment(true); } } @@ -40,13 +40,11 @@ template <typename Params> inline void attachment_transfer_request_row_batch(const Params* brpc_request, brpc::Controller* cntl) { Params* req = const_cast<Params*>(brpc_request); - if (req->has_row_batch()) { + if (req->has_row_batch() && req->transfer_by_attachment()) { auto rb = req->mutable_row_batch(); - if (rb->transfer_by_attachment()) { - DCHECK(cntl->request_attachment().size() > 0); - const butil::IOBuf& io_buf = cntl->request_attachment(); - io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0); - } + DCHECK(cntl->request_attachment().size() > 0); + const butil::IOBuf& io_buf = cntl->request_attachment(); + io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0); } } diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index 547afff..6e4ae7d 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -40,7 +40,6 @@ message PRowBatch { repeated int32 tuple_offsets = 3; required bytes tuple_data = 4; required bool is_compressed = 5; - optional bool transfer_by_attachment = 6 [default = false]; } message PColumn { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index b88b01d..ef43625 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -45,6 +45,8 @@ message PTransmitDataParams { optional PQueryStatistics query_statistics = 8; optional PBlock block = 9; + // transfer the RowBatch to the Controller Attachment + optional bool transfer_by_attachment = 10 [default = false]; }; message PTransmitDataResult { @@ -96,6 +98,8 @@ message PTabletWriterAddBatchRequest { repeated int64 partition_ids = 8; // the backend which send this request optional int64 backend_id = 9 [default = -1]; + // transfer the RowBatch to the Controller Attachment + optional bool transfer_by_attachment = 10 [default = false]; }; message PTabletWriterAddBatchResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org