This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d4da7a  [fix](rpc) fix BE crash in SendRpcResponse when high 
concurrency (#7413)
7d4da7a is described below

commit 7d4da7af5cc55764e52e5701c87f370e5fd990cd
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Dec 16 20:27:24 2021 +0800

    [fix](rpc) fix BE crash in SendRpcResponse when high concurrency (#7413)
    
    The response is accessed when done->Run is called in transmit_data(),
    give response a default value to avoid null pointers in high concurrency.
---
 be/src/service/internal_service.cpp |  8 ++++++--
 be/src/util/proto_util.h            | 12 +++++-------
 gensrc/proto/data.proto             |  1 -
 gensrc/proto/internal_service.proto |  4 ++++
 4 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 90d1488..ee6cf7e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -62,14 +62,18 @@ void 
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
-    auto st = _exec_env->stream_mgr()->transmit_data(request, &done);
+    // The response is accessed when done->Run is called in transmit_data(),
+    // give response a default value to avoid null pointers in high 
concurrency.
+    Status st;
+    st.to_protobuf(response->mutable_status());
+    st = _exec_env->stream_mgr()->transmit_data(request, &done);
     if (!st.ok()) {
         LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
                      << ", fragment_instance_id=" << 
print_id(request->finst_id())
                      << ", node=" << request->node_id();
     }
-    st.to_protobuf(response->mutable_status());
     if (done != nullptr) {
+        st.to_protobuf(response->mutable_status());
         done->Run();
     }
 }
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index a9da2e1..9677a82 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -27,11 +27,11 @@ inline void request_row_batch_transfer_attachment(Params* 
brpc_request, Closure*
     if (brpc_request->has_row_batch() && 
config::transfer_data_by_brpc_attachment == true) {
         butil::IOBuf attachment;
         auto row_batch = brpc_request->mutable_row_batch();
-        row_batch->set_transfer_by_attachment(true);
         attachment.append(row_batch->tuple_data());
         row_batch->clear_tuple_data();
         row_batch->set_tuple_data("");
         closure->cntl.request_attachment().swap(attachment);
+        brpc_request->set_transfer_by_attachment(true);
     }
 }
 
@@ -40,13 +40,11 @@ template <typename Params>
 inline void attachment_transfer_request_row_batch(const Params* brpc_request,
                                                   brpc::Controller* cntl) {
     Params* req = const_cast<Params*>(brpc_request);
-    if (req->has_row_batch()) {
+    if (req->has_row_batch() && req->transfer_by_attachment()) {
         auto rb = req->mutable_row_batch();
-        if (rb->transfer_by_attachment()) {
-            DCHECK(cntl->request_attachment().size() > 0);
-            const butil::IOBuf& io_buf = cntl->request_attachment();
-            io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
-        }
+        DCHECK(cntl->request_attachment().size() > 0);
+        const butil::IOBuf& io_buf = cntl->request_attachment();
+        io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
     }
 }
 
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 547afff..6e4ae7d 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -40,7 +40,6 @@ message PRowBatch {
     repeated int32 tuple_offsets = 3;
     required bytes tuple_data = 4;
     required bool is_compressed = 5;
-    optional bool transfer_by_attachment = 6 [default = false];
 }
 
 message PColumn {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index b88b01d..ef43625 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -45,6 +45,8 @@ message PTransmitDataParams {
     optional PQueryStatistics query_statistics = 8;
 
     optional PBlock block = 9;
+    // transfer the RowBatch to the Controller Attachment
+    optional bool transfer_by_attachment = 10 [default = false];
 };
 
 message PTransmitDataResult {
@@ -96,6 +98,8 @@ message PTabletWriterAddBatchRequest {
     repeated int64 partition_ids = 8;
     // the backend which send this request
     optional int64 backend_id = 9 [default = -1];
+    // transfer the RowBatch to the Controller Attachment
+    optional bool transfer_by_attachment = 10 [default = false];
 };
 
 message PTabletWriterAddBatchResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to