This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cf360fff7b [refactor](closure) remove ref count closure using auto 
release closure (#26718)
8cf360fff7b is described below

commit 8cf360fff7b4eabed64a55b262c85a6444c237f8
Author: yiguolei <676222...@qq.com>
AuthorDate: Sun Nov 12 11:57:46 2023 +0800

    [refactor](closure) remove ref count closure using auto release closure 
(#26718)
    
    1. closure should be managed by a unique ptr and released by brpc , should 
not hold by our code. If hold by our code, we need to wait brpc finished during 
cancel or close.
    2. closure should be exception safe, if any exception happens, should not 
memory leak.
    3. using a specific callback interface to be implemented by Doris's code, 
we could write any code and doris should manage callback's lifecycle.
    4. using a weak ptr between callback and closure. If callback is 
deconstruted before closure'Run, should not core.
---
 be/src/olap/delta_writer.cpp                  |   3 +-
 be/src/pipeline/exec/exchange_sink_buffer.cpp |  65 ++++---
 be/src/pipeline/exec/exchange_sink_buffer.h   |  41 +++--
 be/src/service/internal_service.cpp           |  44 +++--
 be/src/util/proto_util.h                      |  36 ++--
 be/src/util/ref_count_closure.h               |  37 +---
 be/src/vec/sink/vdata_stream_sender.cpp       |  55 +++---
 be/src/vec/sink/vdata_stream_sender.h         |  40 ++--
 be/src/vec/sink/writer/vtablet_writer.cpp     | 251 +++++++++++++-------------
 be/src/vec/sink/writer/vtablet_writer.h       |  50 +++--
 be/test/vec/runtime/vdata_stream_test.cpp     |   4 +
 11 files changed, 305 insertions(+), 321 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c78f7814c05..68b97e49cb3 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -282,8 +282,9 @@ void 
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
     
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 
1000);
     closure->cntl_->ignore_eovercrowded();
     stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), 
closure->request_.get(),
-                                           closure->response_.get(), 
closure.release());
+                                           closure->response_.get(), 
closure.get());
 
+    closure.release();
     pull_callback->join();
     if (pull_callback->cntl_->Failed()) {
         if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 29933fcdd15..46364528458 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -107,7 +107,7 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
             if (need_cancel && _instance_to_rpc_ctx.find(id) != 
_instance_to_rpc_ctx.end()) {
                 auto& rpc_ctx = _instance_to_rpc_ctx[id];
                 if (!rpc_ctx.is_cancelled) {
-                    brpc::StartCancel(rpc_ctx._closure->cntl.call_id());
+                    
brpc::StartCancel(rpc_ctx._send_callback->cntl_->call_id());
                     rpc_ctx.is_cancelled = true;
                 }
             }
@@ -245,21 +245,21 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         if (!request.exec_status.ok()) {
             
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
         }
-        auto* closure = request.channel->get_closure(id, request.eos, nullptr);
+        auto send_callback = request.channel->get_send_callback(id, 
request.eos, nullptr);
 
-        _instance_to_rpc_ctx[id]._closure = closure;
+        _instance_to_rpc_ctx[id]._send_callback = send_callback;
         _instance_to_rpc_ctx[id].is_cancelled = false;
 
-        closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::exchange_sink_ignore_eovercrowded) {
-            closure->cntl.ignore_eovercrowded();
+            send_callback->cntl_->ignore_eovercrowded();
         }
-        closure->addFailedHandler(
+        send_callback->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
-        closure->start_rpc_time = GetCurrentTimeNanos();
-        closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
-                                       const PTransmitDataResult& result,
-                                       const int64_t& start_rpc_time) {
+        send_callback->start_rpc_time = GetCurrentTimeNanos();
+        send_callback->addSuccessHandler([&](const InstanceLoId& id, const 
bool& eos,
+                                             const PTransmitDataResult& result,
+                                             const int64_t& start_rpc_time) {
             set_rpc_time(id, start_rpc_time, result.receive_time());
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
@@ -275,11 +275,17 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         });
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+            auto send_remote_block_closure =
+                    AutoReleaseClosure<PTransmitDataParams,
+                                       
pipeline::ExchangeSendCallback<PTransmitDataResult>>::
+                            create_unique(brpc_request, send_callback);
             if (enable_http_send_block(*brpc_request)) {
-                RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), 
closure, *brpc_request,
-                                                    
request.channel->_brpc_dest_addr));
+                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
+                                                      
std::move(send_remote_block_closure),
+                                                      
request.channel->_brpc_dest_addr));
             } else {
-                transmit_block(*request.channel->_brpc_stub, closure, 
*brpc_request);
+                transmit_blockv2(*request.channel->_brpc_stub,
+                                 std::move(send_remote_block_closure));
             }
         }
         if (request.block) {
@@ -303,23 +309,24 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
             auto statistic = brpc_request->mutable_query_statistics();
             _statistics->to_pb(statistic);
         }
-        auto* closure = request.channel->get_closure(id, request.eos, 
request.block_holder);
+        auto send_callback =
+                request.channel->get_send_callback(id, request.eos, 
request.block_holder);
 
         ExchangeRpcContext rpc_ctx;
-        rpc_ctx._closure = closure;
+        rpc_ctx._send_callback = send_callback;
         rpc_ctx.is_cancelled = false;
         _instance_to_rpc_ctx[id] = rpc_ctx;
 
-        closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::exchange_sink_ignore_eovercrowded) {
-            closure->cntl.ignore_eovercrowded();
+            send_callback->cntl_->ignore_eovercrowded();
         }
-        closure->addFailedHandler(
+        send_callback->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
-        closure->start_rpc_time = GetCurrentTimeNanos();
-        closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
-                                       const PTransmitDataResult& result,
-                                       const int64_t& start_rpc_time) {
+        send_callback->start_rpc_time = GetCurrentTimeNanos();
+        send_callback->addSuccessHandler([&](const InstanceLoId& id, const 
bool& eos,
+                                             const PTransmitDataResult& result,
+                                             const int64_t& start_rpc_time) {
             set_rpc_time(id, start_rpc_time, result.receive_time());
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
@@ -335,11 +342,17 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         });
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+            auto send_remote_block_closure =
+                    AutoReleaseClosure<PTransmitDataParams,
+                                       
pipeline::ExchangeSendCallback<PTransmitDataResult>>::
+                            create_unique(brpc_request, send_callback);
             if (enable_http_send_block(*brpc_request)) {
-                RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), 
closure, *brpc_request,
-                                                    
request.channel->_brpc_dest_addr));
+                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
+                                                      
std::move(send_remote_block_closure),
+                                                      
request.channel->_brpc_dest_addr));
             } else {
-                transmit_block(*request.channel->_brpc_stub, closure, 
*brpc_request);
+                transmit_blockv2(*request.channel->_brpc_stub,
+                                 std::move(send_remote_block_closure));
             }
         }
         if (request.block_holder->get_block()) {
@@ -359,7 +372,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
 
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId 
finst_id) {
-    _instance_to_request[id] = std::make_unique<PTransmitDataParams>();
+    _instance_to_request[id] = std::make_shared<PTransmitDataParams>();
     _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
     _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);
 
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c04de2a51f3..9111f553b27 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -36,6 +36,7 @@
 #include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 #include "service/backend_options.h"
+#include "util/ref_count_closure.h"
 
 namespace doris {
 class PTransmitDataParams;
@@ -107,10 +108,12 @@ struct BroadcastTransmitInfo {
     bool eos;
 };
 
-template <typename T>
-class SelfDeleteClosure : public google::protobuf::Closure {
+template <typename Response>
+class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
+    ENABLE_FACTORY_CREATOR(ExchangeSendCallback);
+
 public:
-    SelfDeleteClosure() = default;
+    ExchangeSendCallback() = default;
 
     void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* 
data) {
         _id = id;
@@ -118,32 +121,35 @@ public:
         _data = data;
     }
 
-    ~SelfDeleteClosure() override = default;
-    SelfDeleteClosure(const SelfDeleteClosure& other) = delete;
-    SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete;
+    ~ExchangeSendCallback() override = default;
+    ExchangeSendCallback(const ExchangeSendCallback& other) = delete;
+    ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = 
delete;
     void addFailedHandler(
             const std::function<void(const InstanceLoId&, const 
std::string&)>& fail_fn) {
         _fail_fn = fail_fn;
     }
-    void addSuccessHandler(const std::function<void(const InstanceLoId&, const 
bool&, const T&,
-                                                    const int64_t&)>& suc_fn) {
+    void addSuccessHandler(const std::function<void(const InstanceLoId&, const 
bool&,
+                                                    const Response&, const 
int64_t&)>& suc_fn) {
         _suc_fn = suc_fn;
     }
 
-    void Run() noexcept override {
+    void call() noexcept override {
         try {
             if (_data) {
                 _data->unref();
             }
-            if (cntl.Failed()) {
+            if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
                 std::string err = fmt::format(
                         "failed to send brpc when exchange, error={}, 
error_text={}, client: {}, "
                         "latency = {}",
-                        berror(cntl.ErrorCode()), cntl.ErrorText(), 
BackendOptions::get_localhost(),
-                        cntl.latency_us());
+                        
berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()),
+                        
::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
+                        BackendOptions::get_localhost(),
+                        
::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
                 _fail_fn(_id, err);
             } else {
-                _suc_fn(_id, _eos, result, start_rpc_time);
+                _suc_fn(_id, _eos, 
*(::doris::DummyBrpcCallback<Response>::response_),
+                        start_rpc_time);
             }
         } catch (const std::exception& exp) {
             LOG(FATAL) << "brpc callback error: " << exp.what();
@@ -151,21 +157,18 @@ public:
             LOG(FATAL) << "brpc callback error.";
         }
     }
-
-    brpc::Controller cntl;
-    T result;
     int64_t start_rpc_time;
 
 private:
     std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
-    std::function<void(const InstanceLoId&, const bool&, const T&, const 
int64_t&)> _suc_fn;
+    std::function<void(const InstanceLoId&, const bool&, const Response&, 
const int64_t&)> _suc_fn;
     InstanceLoId _id;
     bool _eos;
     vectorized::BroadcastPBlockHolder* _data;
 };
 
 struct ExchangeRpcContext {
-    SelfDeleteClosure<PTransmitDataResult>* _closure = nullptr;
+    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback 
= nullptr;
     bool is_cancelled = false;
 };
 
@@ -208,7 +211,7 @@ private:
     // must init zero
     // TODO: make all flat_hash_map to a STRUT
     phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
-    phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> 
_instance_to_request;
+    phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> 
_instance_to_request;
     // One channel is corresponding to a downstream instance.
     phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
     // Number of busy channels;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 17f1c08da91..0c12e910c33 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1518,37 +1518,35 @@ void 
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
         return;
     }
 
-    PTabletWriteSlaveDoneRequest request;
-    request.set_txn_id(txn_id);
-    request.set_tablet_id(tablet_id);
-    request.set_node_id(node_id);
-    request.set_is_succeed(is_succeed);
-    RefCountClosure<PTabletWriteSlaveDoneResult>* closure =
-            new RefCountClosure<PTabletWriteSlaveDoneResult>();
-    closure->ref();
-    closure->ref();
-    closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec 
* 1000);
-    closure->cntl.ignore_eovercrowded();
-    stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, 
&closure->result, closure);
-
-    closure->join();
-    if (closure->cntl.Failed()) {
+    auto request = std::make_shared<PTabletWriteSlaveDoneRequest>();
+    request->set_txn_id(txn_id);
+    request->set_tablet_id(tablet_id);
+    request->set_node_id(node_id);
+    request->set_is_succeed(is_succeed);
+    auto pull_rowset_callback = 
DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared();
+    auto closure = AutoReleaseClosure<
+            PTabletWriteSlaveDoneRequest,
+            
DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request,
+                                                                           
pull_rowset_callback);
+    
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 
1000);
+    closure->cntl_->ignore_eovercrowded();
+    stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), 
closure->request_.get(),
+                                            closure->response_.get(), 
closure.get());
+    closure.release();
+
+    pull_rowset_callback->join();
+    if (pull_rowset_callback->cntl_->Failed()) {
         if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(stub, 
remote_host,
                                                                              
brpc_port)) {
             ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
-                    closure->cntl.remote_side());
+                    closure->cntl_->remote_side());
         }
         LOG(WARNING) << "failed to response result of slave replica to master 
replica, error="
-                     << berror(closure->cntl.ErrorCode())
-                     << ", error_text=" << closure->cntl.ErrorText()
+                     << berror(pull_rowset_callback->cntl_->ErrorCode())
+                     << ", error_text=" << 
pull_rowset_callback->cntl_->ErrorText()
                      << ", master host: " << remote_host << ", tablet_id=" << 
tablet_id
                      << ", txn_id=" << txn_id;
     }
-
-    if (closure->unref()) {
-        delete closure;
-    }
-    closure = nullptr;
     VLOG_CRITICAL << "succeed to response the result of slave replica pull 
rowset to master "
                      "replica. master host: "
                   << remote_host << ". is_succeed=" << is_succeed << ", 
tablet_id=" << tablet_id
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 2b8b524da54..26779977642 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -38,9 +38,10 @@ constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31);
 
 // Embed column_values and brpc request serialization string in controller 
attachment.
 template <typename Params, typename Closure>
-Status request_embed_attachment_contain_block(Params* brpc_request, Closure* 
closure) {
+Status request_embed_attachment_contain_blockv2(Params* brpc_request,
+                                                std::unique_ptr<Closure>& 
closure) {
     auto block = brpc_request->block();
-    Status st = request_embed_attachment(brpc_request, block.column_values(), 
closure);
+    Status st = request_embed_attachmentv2(brpc_request, 
block.column_values(), closure);
     block.set_column_values("");
     return st;
 }
@@ -59,32 +60,37 @@ inline bool enable_http_send_block(const 
PTransmitDataParams& request) {
 }
 
 template <typename Closure>
-void transmit_block(PBackendService_Stub& stub, Closure* closure,
-                    const PTransmitDataParams& params) {
-    closure->cntl.http_request().Clear();
-    stub.transmit_block(&closure->cntl, &params, &closure->result, closure);
+void transmit_blockv2(PBackendService_Stub& stub, std::unique_ptr<Closure> 
closure) {
+    closure->cntl_->http_request().Clear();
+    stub.transmit_block(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
+                        closure.get());
+    closure.release();
 }
 
 template <typename Closure>
-Status transmit_block_http(ExecEnv* exec_env, Closure* closure, 
PTransmitDataParams& params,
-                           TNetworkAddress brpc_dest_addr) {
-    RETURN_IF_ERROR(request_embed_attachment_contain_block(&params, closure));
+Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr<Closure> 
closure,
+                             TNetworkAddress brpc_dest_addr) {
+    
RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(),
 closure));
 
     //format an ipv6 address
     std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, 
brpc_dest_addr.port);
 
     std::shared_ptr<PBackendService_Stub> brpc_http_stub =
             
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, 
"http");
-    closure->cntl.http_request().uri() = brpc_url + 
"/PInternalServiceImpl/transmit_block_by_http";
-    closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
-    closure->cntl.http_request().set_content_type("application/json");
-    brpc_http_stub->transmit_block_by_http(&closure->cntl, nullptr, 
&closure->result, closure);
+    closure->cntl_->http_request().uri() =
+            brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
+    closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
+    closure->cntl_->http_request().set_content_type("application/json");
+    brpc_http_stub->transmit_block_by_http(closure->cntl_.get(), nullptr, 
closure->response_.get(),
+                                           closure.get());
+    closure.release();
 
     return Status::OK();
 }
 
 template <typename Params, typename Closure>
-Status request_embed_attachment(Params* brpc_request, const std::string& data, 
Closure* closure) {
+Status request_embed_attachmentv2(Params* brpc_request, const std::string& 
data,
+                                  std::unique_ptr<Closure>& closure) {
     butil::IOBuf attachment;
 
     // step1: serialize brpc_request to string, and append to attachment.
@@ -106,7 +112,7 @@ Status request_embed_attachment(Params* brpc_request, const 
std::string& data, C
                                          data_size);
     }
     // step3: attachment add to closure.
-    closure->cntl.request_attachment().swap(attachment);
+    closure->cntl_->request_attachment().swap(attachment);
     return Status::OK();
 }
 
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index d844a7fc820..14a136fcfd0 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -21,39 +21,11 @@
 
 #include <atomic>
 
-#include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "service/brpc.h"
 
 namespace doris {
 
-template <typename T>
-class RefCountClosure : public google::protobuf::Closure {
-public:
-    RefCountClosure() : _refs(0) {}
-    ~RefCountClosure() override = default;
-
-    void ref() { _refs.fetch_add(1); }
-
-    // If unref() returns true, this object should be delete
-    bool unref() { return _refs.fetch_sub(1) == 1; }
-
-    void Run() override {
-        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        if (unref()) {
-            delete this;
-        }
-    }
-
-    void join() { brpc::Join(cntl.call_id()); }
-
-    brpc::Controller cntl;
-    T result;
-
-private:
-    std::atomic<int> _refs;
-};
-
 template <typename Response>
 class DummyBrpcCallback {
     ENABLE_FACTORY_CREATOR(DummyBrpcCallback);
@@ -65,9 +37,11 @@ public:
         response_ = std::make_shared<Response>();
     }
 
-    void call() {}
+    virtual ~DummyBrpcCallback() = default;
+
+    virtual void call() {}
 
-    void join() { brpc::Join(cntl_->call_id()); }
+    virtual void join() { brpc::Join(cntl_->call_id()); }
 
     // controller has to be the same lifecycle with the closure, because brpc 
may use
     // it in any stage of the rpc.
@@ -102,7 +76,7 @@ class AutoReleaseClosure : public google::protobuf::Closure {
 
 public:
     AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> 
callback)
-            : callback_(callback) {
+            : request_(req), callback_(callback) {
         this->cntl_ = callback->cntl_;
         this->response_ = callback->response_;
     }
@@ -111,7 +85,6 @@ public:
 
     //  Will delete itself
     void Run() override {
-        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
         Defer defer {[&]() { delete this; }};
         // If lock failed, it means the callback object is deconstructed, then 
no need
         // to deal with the callback any more.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index f5397e831d3..c25f4a9ea61 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -57,19 +57,19 @@ Status Channel<Parent>::init(RuntimeState* state) {
                         ", maybe version is not compatible.";
         return Status::InternalError("no brpc destination");
     }
-
+    _brpc_request = std::make_shared<PTransmitDataParams>();
     // initialize brpc request
-    _brpc_request.mutable_finst_id()->set_hi(_fragment_instance_id.hi);
-    _brpc_request.mutable_finst_id()->set_lo(_fragment_instance_id.lo);
-    _finst_id = _brpc_request.finst_id();
+    _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
+    _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
+    _finst_id = _brpc_request->finst_id();
 
-    _brpc_request.mutable_query_id()->set_hi(state->query_id().hi);
-    _brpc_request.mutable_query_id()->set_lo(state->query_id().lo);
-    _query_id = _brpc_request.query_id();
+    _brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
+    _brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
+    _query_id = _brpc_request->query_id();
 
-    _brpc_request.set_node_id(_dest_node_id);
-    _brpc_request.set_sender_id(_parent->sender_id());
-    _brpc_request.set_be_number(_be_number);
+    _brpc_request->set_node_id(_dest_node_id);
+    _brpc_request->set_sender_id(_parent->sender_id());
+    _brpc_request->set_be_number(_be_number);
 
     _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
 
@@ -174,49 +174,50 @@ Status Channel<Parent>::send_remote_block(PBlock* block, 
bool eos, Status exec_s
     }
     SCOPED_TIMER(_parent->brpc_send_timer());
 
-    if (_closure == nullptr) {
-        _closure = new RefCountClosure<PTransmitDataResult>();
-        _closure->ref();
+    if (_send_remote_block_callback == nullptr) {
+        _send_remote_block_callback = 
DummyBrpcCallback<PTransmitDataResult>::create_shared();
     } else {
         RETURN_IF_ERROR(_wait_last_brpc());
         SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        _closure->cntl.Reset();
+        _send_remote_block_callback->cntl_->Reset();
     }
     VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << 
print_id(_fragment_instance_id)
              << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
              << " _packet_seq=" << _packet_seq << " row_desc=" << 
_row_desc.debug_string();
     if (_is_transfer_chain && (_send_query_statistics_with_every_batch || 
eos)) {
-        auto statistic = _brpc_request.mutable_query_statistics();
+        auto statistic = _brpc_request->mutable_query_statistics();
         _parent->query_statistics()->to_pb(statistic);
     }
 
-    _brpc_request.set_eos(eos);
+    _brpc_request->set_eos(eos);
     if (!exec_status.ok()) {
-        exec_status.to_protobuf(_brpc_request.mutable_exec_status());
+        exec_status.to_protobuf(_brpc_request->mutable_exec_status());
     }
     if (block != nullptr) {
-        _brpc_request.set_allocated_block(block);
+        _brpc_request->set_allocated_block(block);
     }
-    _brpc_request.set_packet_seq(_packet_seq++);
+    _brpc_request->set_packet_seq(_packet_seq++);
 
-    _closure->ref();
-    _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
+    _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
     if (config::exchange_sink_ignore_eovercrowded) {
-        _closure->cntl.ignore_eovercrowded();
+        _send_remote_block_callback->cntl_->ignore_eovercrowded();
     }
 
     {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
-        if (enable_http_send_block(_brpc_request)) {
-            RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure, 
_brpc_request,
-                                                _brpc_dest_addr));
+        auto send_remote_block_closure =
+                AutoReleaseClosure<PTransmitDataParams, 
DummyBrpcCallback<PTransmitDataResult>>::
+                        create_unique(_brpc_request, 
_send_remote_block_callback);
+        if (enable_http_send_block(*_brpc_request)) {
+            RETURN_IF_ERROR(transmit_block_httpv2(
+                    _state->exec_env(), std::move(send_remote_block_closure), 
_brpc_dest_addr));
         } else {
-            transmit_block(*_brpc_stub, _closure, _brpc_request);
+            transmit_blockv2(*_brpc_stub, 
std::move(send_remote_block_closure));
         }
     }
 
     if (block != nullptr) {
-        static_cast<void>(_brpc_request.release_block());
+        static_cast<void>(_brpc_request->release_block());
     }
     return Status::OK();
 }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 10116be0334..bcc050e25bd 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -261,11 +261,7 @@ public:
         _ch_cur_pb_block = &_ch_pb_block1;
     }
 
-    virtual ~Channel() {
-        if (_closure != nullptr && _closure->unref()) {
-            delete _closure;
-        }
-    }
+    virtual ~Channel() = default;
 
     // Initialize channel.
     // Returns OK if successful, error indication otherwise.
@@ -337,22 +333,22 @@ protected:
 
     Status _wait_last_brpc() {
         SCOPED_TIMER(_parent->brpc_wait_timer());
-        if (_closure == nullptr) {
+        if (_send_remote_block_callback == nullptr) {
             return Status::OK();
         }
-        auto cntl = &_closure->cntl;
-        auto call_id = _closure->cntl.call_id();
-        brpc::Join(call_id);
-        _receiver_status = Status::create(_closure->result.status());
-        if (cntl->Failed()) {
+        _send_remote_block_callback->join();
+        if (_send_remote_block_callback->cntl_->Failed()) {
             std::string err = fmt::format(
                     "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
                     "latency = {}",
-                    berror(cntl->ErrorCode()), cntl->ErrorText(), 
BackendOptions::get_localhost(),
-                    cntl->latency_us());
+                    berror(_send_remote_block_callback->cntl_->ErrorCode()),
+                    _send_remote_block_callback->cntl_->ErrorText(),
+                    BackendOptions::get_localhost(),
+                    _send_remote_block_callback->cntl_->latency_us());
             LOG(WARNING) << err;
             return Status::RpcError(err);
         }
+        _receiver_status = 
Status::create(_send_remote_block_callback->response_->status());
         return _receiver_status;
     }
 
@@ -380,9 +376,9 @@ protected:
     PUniqueId _finst_id;
     PUniqueId _query_id;
     PBlock _pb_block;
-    PTransmitDataParams _brpc_request;
+    std::shared_ptr<PTransmitDataParams> _brpc_request;
     std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
-    RefCountClosure<PTransmitDataResult>* _closure = nullptr;
+    std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> 
_send_remote_block_callback = nullptr;
     Status _receiver_status;
     int32_t _brpc_timeout_ms = 500;
     // whether the dest can be treated as query statistics transfer chain.
@@ -540,15 +536,15 @@ public:
         _buffer->register_sink(Channel<Parent>::_fragment_instance_id);
     }
 
-    pipeline::SelfDeleteClosure<PTransmitDataResult>* get_closure(
+    std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
             InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* 
data) {
-        if (!_closure) {
-            _closure.reset(new 
pipeline::SelfDeleteClosure<PTransmitDataResult>());
+        if (!_send_callback) {
+            _send_callback = 
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
         } else {
-            _closure->cntl.Reset();
+            _send_callback->cntl_->Reset();
         }
-        _closure->init(id, eos, data);
-        return _closure.get();
+        _send_callback->init(id, eos, data);
+        return _send_callback;
     }
 
     std::shared_ptr<pipeline::LocalExchangeChannelDependency> 
get_local_channel_dependency() {
@@ -564,7 +560,7 @@ private:
 
     pipeline::ExchangeSinkBuffer<Parent>* _buffer = nullptr;
     bool _eos_send = false;
-    std::unique_ptr<pipeline::SelfDeleteClosure<PTransmitDataResult>> _closure 
= nullptr;
+    std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
_send_callback = nullptr;
     std::unique_ptr<PBlock> _pblock;
 };
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 75e813b1347..f3270c4b9ff 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -273,26 +273,13 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, 
IndexChannel* index_channel, i
           _index_channel(index_channel),
           _node_id(node_id),
           _is_incremental(is_incremental) {
+    _cur_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>();
     _node_channel_tracker = std::make_shared<MemTracker>(fmt::format(
             "NodeChannel:indexID={}:threadId={}", 
std::to_string(_index_channel->_index_id),
             thread_context()->get_thread_id()));
 }
 
-VNodeChannel::~VNodeChannel() {
-    for (auto& closure : _open_closures) {
-        if (closure != nullptr) {
-            if (closure->unref()) {
-                delete closure;
-            }
-            closure = nullptr;
-        }
-    }
-    if (_add_block_closure != nullptr) {
-        delete _add_block_closure;
-        _add_block_closure = nullptr;
-    }
-    static_cast<void>(_cur_add_block_request.release_id());
-}
+VNodeChannel::~VNodeChannel() = default;
 
 void VNodeChannel::clear_all_blocks() {
     std::lock_guard<std::mutex> lg(_pending_batches_lock);
@@ -334,13 +321,13 @@ Status VNodeChannel::init(RuntimeState* state) {
     _timeout_watch.start();
 
     // Initialize _cur_add_block_request
-    if (!_cur_add_block_request.has_id()) {
-        _cur_add_block_request.set_allocated_id(&_parent->_load_id);
+    if (!_cur_add_block_request->has_id()) {
+        *(_cur_add_block_request->mutable_id()) = _parent->_load_id;
     }
-    _cur_add_block_request.set_index_id(_index_channel->_index_id);
-    _cur_add_block_request.set_sender_id(_parent->_sender_id);
-    _cur_add_block_request.set_backend_id(_node_id);
-    _cur_add_block_request.set_eos(false);
+    _cur_add_block_request->set_index_id(_index_channel->_index_id);
+    _cur_add_block_request->set_sender_id(_parent->_sender_id);
+    _cur_add_block_request->set_backend_id(_node_id);
+    _cur_add_block_request->set_eos(false);
 
     _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, 
_node_id);
     // The node channel will send _batch_size rows of data each rpc. When the
@@ -355,46 +342,48 @@ Status VNodeChannel::init(RuntimeState* state) {
 
 void VNodeChannel::_open_internal(bool is_incremental) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    PTabletWriterOpenRequest request;
-    request.set_allocated_id(&_parent->_load_id);
-    request.set_index_id(_index_channel->_index_id);
-    request.set_txn_id(_parent->_txn_id);
-    request.set_allocated_schema(_parent->_schema->to_protobuf());
+    auto request = std::make_shared<PTabletWriterOpenRequest>();
+    request->set_allocated_id(&_parent->_load_id);
+    request->set_index_id(_index_channel->_index_id);
+    request->set_txn_id(_parent->_txn_id);
+    request->set_allocated_schema(_parent->_schema->to_protobuf());
     std::set<int64_t> deduper;
     for (auto& tablet : _all_tablets) {
         if (deduper.contains(tablet.tablet_id)) {
             continue;
         }
-        auto ptablet = request.add_tablets();
+        auto ptablet = request->add_tablets();
         ptablet->set_partition_id(tablet.partition_id);
         ptablet->set_tablet_id(tablet.tablet_id);
         deduper.insert(tablet.tablet_id);
     }
-    request.set_num_senders(_parent->_num_senders);
-    request.set_need_gen_rollup(false); // Useless but it is a required field 
in pb
-    request.set_load_mem_limit(_parent->_load_mem_limit);
-    request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
-    request.set_is_high_priority(_parent->_is_high_priority);
-    request.set_sender_ip(BackendOptions::get_localhost());
-    request.set_is_vectorized(true);
-    request.set_backend_id(_node_id);
-    request.set_enable_profile(_state->enable_profile());
-    request.set_is_incremental(is_incremental);
-
-    auto* open_closure = new RefCountClosure<PTabletWriterOpenResult> {};
-    open_closure->ref();
-
-    open_closure->ref(); // This ref is for RPC's reference
-    
open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 
1000);
+    request->set_num_senders(_parent->_num_senders);
+    request->set_need_gen_rollup(false); // Useless but it is a required field 
in pb
+    request->set_load_mem_limit(_parent->_load_mem_limit);
+    request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
+    request->set_is_high_priority(_parent->_is_high_priority);
+    request->set_sender_ip(BackendOptions::get_localhost());
+    request->set_is_vectorized(true);
+    request->set_backend_id(_node_id);
+    request->set_enable_profile(_state->enable_profile());
+    request->set_is_incremental(is_incremental);
+
+    auto open_callback = 
DummyBrpcCallback<PTabletWriterOpenResult>::create_shared();
+    auto open_closure = AutoReleaseClosure<
+            PTabletWriterOpenRequest,
+            
DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request, 
open_callback);
+    
open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec 
* 1000);
     if (config::tablet_writer_ignore_eovercrowded) {
-        open_closure->cntl.ignore_eovercrowded();
+        open_callback->cntl_->ignore_eovercrowded();
     }
     // the real transmission here. the corresponding BE's load mgr will open 
load channel for it.
-    _stub->tablet_writer_open(&open_closure->cntl, &request, 
&open_closure->result, open_closure);
-    _open_closures.push_back(open_closure);
+    _stub->tablet_writer_open(open_closure->cntl_.get(), 
open_closure->request_.get(),
+                              open_closure->response_.get(), 
open_closure.get());
+    open_closure.release();
+    _open_callbacks.push_back(open_callback);
 
-    static_cast<void>(request.release_id());
-    static_cast<void>(request.release_schema());
+    static_cast<void>(request->release_id());
+    static_cast<void>(request->release_schema());
 }
 
 void VNodeChannel::open() {
@@ -407,36 +396,28 @@ void VNodeChannel::incremental_open() {
 
 Status VNodeChannel::open_wait() {
     Status status;
-    for (auto& open_closure : _open_closures) {
+    for (auto& open_callback : _open_callbacks) {
         // because of incremental open, we will wait multi times. so skip the 
closures which have been checked and set to nullptr in previous rounds
-        if (open_closure == nullptr) {
+        if (open_callback == nullptr) {
             continue;
         }
 
-        open_closure->join();
+        open_callback->join();
         SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-        if (open_closure->cntl.Failed()) {
+        if (open_callback->cntl_->Failed()) {
             if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
                         _stub, _node_info.host, _node_info.brpc_port)) {
                 ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
-                        open_closure->cntl.remote_side());
+                        open_callback->cntl_->remote_side());
             }
             _cancelled = true;
-            auto error_code = open_closure->cntl.ErrorCode();
-            auto error_text = open_closure->cntl.ErrorText();
-            if (open_closure->unref()) {
-                delete open_closure;
-            }
-            open_closure = nullptr;
+            auto error_code = open_callback->cntl_->ErrorCode();
+            auto error_text = open_callback->cntl_->ErrorText();
             return Status::InternalError(
                     "failed to open tablet writer, error={}, error_text={}, 
info={}",
                     berror(error_code), error_text, channel_info());
         }
-        status = Status::create(open_closure->result.status());
-        if (open_closure->unref()) {
-            delete open_closure;
-        }
-        open_closure = nullptr;
+        status = Status::create(open_callback->response_->status());
 
         if (!status.ok()) {
             _cancelled = true;
@@ -445,11 +426,11 @@ Status VNodeChannel::open_wait() {
     }
 
     // add block closure
-    _add_block_closure = 
ReusableClosure<PTabletWriterAddBlockResult>::create();
-    _add_block_closure->addFailedHandler(
+    _send_block_callback = 
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
+    _send_block_callback->addFailedHandler(
             [this](bool is_last_rpc) { 
_add_block_failed_callback(is_last_rpc); });
 
-    _add_block_closure->addSuccessHandler(
+    _send_block_callback->addSuccessHandler(
             [this](const PTabletWriterAddBlockResult& result, bool 
is_last_rpc) {
                 _add_block_success_callback(result, is_last_rpc);
             });
@@ -501,12 +482,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
         for (auto column : block->get_columns()) {
             columns.push_back(std::move(*column).mutate());
         }
-        *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), 
tablets.end()};
-        _cur_add_block_request.set_is_single_tablet_block(true);
+        *_cur_add_block_request->mutable_tablet_ids() = {tablets.begin(), 
tablets.end()};
+        _cur_add_block_request->set_is_single_tablet_block(true);
     } else {
         block->append_to_block_by_selector(_cur_mutable_block.get(), 
*(payload->first));
         for (auto tablet_id : payload->second) {
-            _cur_add_block_request.add_tablet_ids(tablet_id);
+            _cur_add_block_request->add_tablet_ids(tablet_id);
         }
     }
 
@@ -517,9 +498,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
             std::lock_guard<std::mutex> l(_pending_batches_lock);
             // To simplify the add_row logic, postpone adding block into req 
until the time of sending req
             _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
-            _cur_add_block_request.set_eos(
+            _cur_add_block_request->set_eos(
                     false); // for multi-add, only when marking close we set 
it eos.
-            _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
+            // Copy the request to tmp request to add to pend block queue
+            auto tmp_add_block_request = 
std::make_shared<PTabletWriterAddBlockRequest>();
+            *tmp_add_block_request = *_cur_add_block_request;
+            _pending_blocks.emplace(std::move(_cur_mutable_block), 
tmp_add_block_request);
             _pending_batches_num++;
             VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << 
this
                        << " pending_batches_bytes:" << _pending_batches_bytes
@@ -527,7 +511,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
                        << " loadinfo:" << _load_info;
         }
         _cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
-        _cur_add_block_request.clear_tablet_ids();
+        _cur_add_block_request->clear_tablet_ids();
     }
 
     return Status::OK();
@@ -540,7 +524,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
     }
 
     // set closure for sending block.
-    if (!_add_block_closure->try_set_in_flight()) {
+    if (!_send_block_callback->try_set_in_flight()) {
         // There is packet in flight, skip.
         return _send_finished ? 0 : 1;
     }
@@ -551,12 +535,12 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
         if (!s.ok()) {
             _cancel_with_msg("submit send_batch task to send_batch_thread_pool 
failed");
             // sending finished. clear in flight
-            _add_block_closure->clear_in_flight();
+            _send_block_callback->clear_in_flight();
         }
         // in_flight is cleared in closure::Run
     } else {
         // sending finished. clear in flight
-        _add_block_closure->clear_in_flight();
+        _send_block_callback->clear_in_flight();
     }
     return _send_finished ? 0 : 1;
 }
@@ -607,20 +591,21 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
     auto request = std::move(send_block.second); // doesn't need to be saved 
in heap
 
     // tablet_ids has already set when add row
-    request.set_packet_seq(_next_packet_seq);
+    request->set_packet_seq(_next_packet_seq);
     auto block = mutable_block->to_block();
-    CHECK(block.rows() == request.tablet_ids_size())
-            << "block rows: " << block.rows() << ", tablet_ids_size: " << 
request.tablet_ids_size();
+    CHECK(block.rows() == request->tablet_ids_size())
+            << "block rows: " << block.rows()
+            << ", tablet_ids_size: " << request->tablet_ids_size();
     if (block.rows() > 0) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        Status st = block.serialize(state->be_exec_version(), 
request.mutable_block(),
+        Status st = block.serialize(state->be_exec_version(), 
request->mutable_block(),
                                     &uncompressed_bytes, &compressed_bytes,
                                     
state->fragement_transmission_compression_type(),
                                     _parent->_transfer_large_data_by_brpc);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
-            _add_block_closure->clear_in_flight();
+            _send_block_callback->clear_in_flight();
             return;
         }
         if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
@@ -632,29 +617,29 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
 
     int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
-        if (remain_ms <= 0 && !request.eos()) {
+        if (remain_ms <= 0 && !request->eos()) {
             cancel(fmt::format("{}, err: timeout", channel_info()));
-            _add_block_closure->clear_in_flight();
+            _send_block_callback->clear_in_flight();
             return;
         } else {
             remain_ms = config::min_load_rpc_timeout_ms;
         }
     }
 
-    _add_block_closure->reset();
-    _add_block_closure->cntl.set_timeout_ms(remain_ms);
+    _send_block_callback->reset();
+    _send_block_callback->cntl_->set_timeout_ms(remain_ms);
     if (config::tablet_writer_ignore_eovercrowded) {
-        _add_block_closure->cntl.ignore_eovercrowded();
+        _send_block_callback->cntl_->ignore_eovercrowded();
     }
 
-    if (request.eos()) {
+    if (request->eos()) {
         for (auto pid : _parent->_tablet_finder->partition_ids()) {
-            request.add_partition_ids(pid);
+            request->add_partition_ids(pid);
         }
 
-        request.set_write_single_replica(false);
+        request->set_write_single_replica(false);
         if (_parent->_write_single_replica) {
-            request.set_write_single_replica(true);
+            request->set_write_single_replica(true);
             for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator 
iter =
                          _slave_tablet_nodes.begin();
                  iter != _slave_tablet_nodes.end(); iter++) {
@@ -670,24 +655,27 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
                     pnode->set_host(node->host);
                     pnode->set_async_internal_port(node->brpc_port);
                 }
-                request.mutable_slave_tablet_nodes()->insert({iter->first, 
slave_tablet_nodes});
+                request->mutable_slave_tablet_nodes()->insert({iter->first, 
slave_tablet_nodes});
             }
         }
 
-        // eos request must be the last request. it's a signal makeing 
callback function to set _add_batch_finished true.
-        _add_block_closure->end_mark();
+        // eos request must be the last request-> it's a signal makeing 
callback function to set _add_batch_finished true.
+        _send_block_callback->end_mark();
         _send_finished = true;
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
     }
 
-    if (_parent->_transfer_large_data_by_brpc && request.has_block() &&
-        request.block().has_column_values() && request.ByteSizeLong() > 
MIN_HTTP_BRPC_SIZE) {
-        Status st = request_embed_attachment_contain_block<
-                PTabletWriterAddBlockRequest, 
ReusableClosure<PTabletWriterAddBlockResult>>(
-                &request, _add_block_closure);
+    auto send_block_closure = AutoReleaseClosure<
+            PTabletWriterAddBlockRequest,
+            
WriteBlockCallback<PTabletWriterAddBlockResult>>::create_unique(request,
+                                                                            
_send_block_callback);
+    if (_parent->_transfer_large_data_by_brpc && request->has_block() &&
+        request->block().has_column_values() && request->ByteSizeLong() > 
MIN_HTTP_BRPC_SIZE) {
+        Status st = 
request_embed_attachment_contain_blockv2(send_block_closure->request_.get(),
+                                                             
send_block_closure);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
-            _add_block_closure->clear_in_flight();
+            _send_block_callback->clear_in_flight();
             return;
         }
 
@@ -696,23 +684,26 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
         std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
                 
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
                                                                                
           "http");
-        _add_block_closure->cntl.http_request().uri() =
+        _send_block_callback->cntl_->http_request().uri() =
                 brpc_url + 
"/PInternalServiceImpl/tablet_writer_add_block_by_http";
-        
_add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
-        
_add_block_closure->cntl.http_request().set_content_type("application/json");
+        
_send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
+        
_send_block_callback->cntl_->http_request().set_content_type("application/json");
 
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
-            
_brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, 
nullptr,
-                                                             
&_add_block_closure->result,
-                                                             
_add_block_closure);
+            _brpc_http_stub->tablet_writer_add_block_by_http(
+                    send_block_closure->cntl_.get(), nullptr, 
send_block_closure->response_.get(),
+                    send_block_closure.get());
+            send_block_closure.release();
         }
     } else {
-        _add_block_closure->cntl.http_request().Clear();
+        _send_block_callback->cntl_->http_request().Clear();
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
-            _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
-                                           &_add_block_closure->result, 
_add_block_closure);
+            _stub->tablet_writer_add_block(
+                    send_block_closure->cntl_.get(), 
send_block_closure->request_.get(),
+                    send_block_closure->response_.get(), 
send_block_closure.get());
+            send_block_closure.release();
         }
     }
 
@@ -805,11 +796,11 @@ void VNodeChannel::_add_block_failed_callback(bool 
is_last_rpc) {
     }
     SCOPED_ATTACH_TASK(_state);
     // If rpc failed, mark all tablets on this node channel as failed
-    _index_channel->mark_as_failed(
-            this,
-            fmt::format("rpc failed, error coed:{}, error text:{}",
-                        _add_block_closure->cntl.ErrorCode(), 
_add_block_closure->cntl.ErrorText()),
-            -1);
+    _index_channel->mark_as_failed(this,
+                                   fmt::format("rpc failed, error coed:{}, 
error text:{}",
+                                               
_send_block_callback->cntl_->ErrorCode(),
+                                               
_send_block_callback->cntl_->ErrorText()),
+                                   -1);
     Status st = _index_channel->check_intolerable_failure();
     if (!st.ok()) {
         _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), 
st.to_string()));
@@ -835,24 +826,28 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
     // But do we need brpc::StartCancel(call_id)?
     _cancel_with_msg(cancel_msg);
 
-    PTabletWriterCancelRequest request;
-    request.set_allocated_id(&_parent->_load_id);
-    request.set_index_id(_index_channel->_index_id);
-    request.set_sender_id(_parent->_sender_id);
+    auto request = std::make_shared<PTabletWriterCancelRequest>();
+    request->set_allocated_id(&_parent->_load_id);
+    request->set_index_id(_index_channel->_index_id);
+    request->set_sender_id(_parent->_sender_id);
 
-    auto closure = new RefCountClosure<PTabletWriterCancelResult>();
+    auto cancel_callback = 
DummyBrpcCallback<PTabletWriterCancelResult>::create_shared();
+    auto closure = AutoReleaseClosure<
+            PTabletWriterCancelRequest,
+            
DummyBrpcCallback<PTabletWriterCancelResult>>::create_unique(request, 
cancel_callback);
 
-    closure->ref();
     int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         remain_ms = config::min_load_rpc_timeout_ms;
     }
-    closure->cntl.set_timeout_ms(remain_ms);
+    cancel_callback->cntl_->set_timeout_ms(remain_ms);
     if (config::tablet_writer_ignore_eovercrowded) {
-        closure->cntl.ignore_eovercrowded();
+        closure->cntl_->ignore_eovercrowded();
     }
-    _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, 
closure);
-    static_cast<void>(request.release_id());
+    _stub->tablet_writer_cancel(closure->cntl_.get(), closure->request_.get(),
+                                closure->response_.get(), closure.get());
+    closure.release();
+    static_cast<void>(request->release_id());
 }
 
 bool VNodeChannel::is_send_data_rpc_done() const {
@@ -912,17 +907,19 @@ void VNodeChannel::mark_close() {
         return;
     }
 
-    _cur_add_block_request.set_eos(true);
+    _cur_add_block_request->set_eos(true);
     {
         std::lock_guard<std::mutex> l(_pending_batches_lock);
         if (!_cur_mutable_block) [[unlikely]] {
             // add a dummy block
             _cur_mutable_block = vectorized::MutableBlock::create_unique();
         }
+        auto tmp_add_block_request =
+                
std::make_shared<PTabletWriterAddBlockRequest>(*_cur_add_block_request);
         // when prepare to close, add block to queue so that 
try_send_pending_block thread will send it.
-        _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
+        _pending_blocks.emplace(std::move(_cur_mutable_block), 
tmp_add_block_request);
         _pending_batches_num++;
-        DCHECK(_pending_blocks.back().second.eos());
+        DCHECK(_pending_blocks.back().second->eos());
         _close_time_ms = UnixMillis();
         LOG(INFO) << channel_info()
                   << " mark closed, left pending batch size: " << 
_pending_blocks.size();
@@ -1391,7 +1388,7 @@ Status VTabletWriter::close(Status exec_status) {
     SCOPED_TIMER(_close_timer);
     SCOPED_TIMER(_profile->total_time_counter());
 
-    // will make the last batch of request. close_wait will wait this finished.
+    // will make the last batch of request-> close_wait will wait this 
finished.
     static_cast<void>(try_close(_state, exec_status));
 
     // If _close_status is not ok, all nodes have been canceled in try_close.
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 7fee45be371..ea998a0f0b4 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -65,6 +65,7 @@
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
 #include "util/countdown_latch.h"
+#include "util/ref_count_closure.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 #include "util/stopwatch.hpp"
@@ -88,8 +89,6 @@ class TExpr;
 class Thread;
 class ThreadPoolToken;
 class TupleDescriptor;
-template <typename T>
-class RefCountClosure;
 
 namespace vectorized {
 
@@ -120,26 +119,21 @@ struct AddBatchCounter {
 
 // It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
 // So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after 
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after 
WriteBlockCallback deleted.
 // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, 
`clear_in_flight` after rpc failure or callback,
 // then next send will start, and it will wait for the rpc callback to 
complete when it is destroyed.
 template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
-    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
-    ~ReusableClosure() override {
-        // shouldn't delete when Run() is calling or going to be called, wait 
for current Run() done.
-        join();
-        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        cntl.Reset();
-    }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+    ENABLE_FACTORY_CREATOR(WriteBlockCallback);
 
-    static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+    WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
+    ~WriteBlockCallback() override = default;
 
     void addFailedHandler(const std::function<void(bool)>& fn) { 
failed_handler = fn; }
     void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { 
success_handler = fn; }
 
-    void join() {
+    void join() override {
         // We rely on in_flight to assure one rpc is running,
         // while cid is not reliable due to memory order.
         // in_flight is written before getting callid,
@@ -159,8 +153,8 @@ public:
     // plz follow this order: reset() -> set_in_flight() -> send brpc batch
     void reset() {
         SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        cntl.Reset();
-        cid = cntl.call_id();
+        ::doris::DummyBrpcCallback<T>::cntl_->Reset();
+        cid = ::doris::DummyBrpcCallback<T>::cntl_->call_id();
     }
 
     // if _packet_in_flight == false, set it to true. Return true.
@@ -179,21 +173,19 @@ public:
         _is_last_rpc = true;
     }
 
-    void Run() override {
+    void call() override {
         DCHECK(_packet_in_flight);
-        if (cntl.Failed()) {
-            LOG(WARNING) << "failed to send brpc batch, error=" << 
berror(cntl.ErrorCode())
-                         << ", error_text=" << cntl.ErrorText();
+        if (::doris::DummyBrpcCallback<T>::cntl_->Failed()) {
+            LOG(WARNING) << "failed to send brpc batch, error="
+                         << 
berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
+                         << ", error_text=" << 
::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
             failed_handler(_is_last_rpc);
         } else {
-            success_handler(result, _is_last_rpc);
+            success_handler(*(::doris::DummyBrpcCallback<T>::response_), 
_is_last_rpc);
         }
         clear_in_flight();
     }
 
-    brpc::Controller cntl;
-    T result;
-
 private:
     brpc::CallId cid;
     std::atomic<bool> _packet_in_flight {false};
@@ -381,7 +373,7 @@ protected:
 
     std::shared_ptr<PBackendService_Stub> _stub = nullptr;
     // because we have incremantal open, we should keep one relative closure 
for one request. it's similarly for adding block.
-    std::vector<RefCountClosure<PTabletWriterOpenResult>*> _open_closures;
+    std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> 
_open_callbacks;
 
     std::vector<TTabletWithPartition> _all_tablets;
     // map from tablet_id to node_id where slave replicas locate in
@@ -413,12 +405,12 @@ protected:
 
     // build a _cur_mutable_block and push into _pending_blocks. when not 
building, this block is empty.
     std::unique_ptr<vectorized::MutableBlock> _cur_mutable_block;
-    PTabletWriterAddBlockRequest _cur_add_block_request;
+    std::shared_ptr<PTabletWriterAddBlockRequest> _cur_add_block_request;
 
-    using AddBlockReq =
-            std::pair<std::unique_ptr<vectorized::MutableBlock>, 
PTabletWriterAddBlockRequest>;
+    using AddBlockReq = std::pair<std::unique_ptr<vectorized::MutableBlock>,
+                                  
std::shared_ptr<PTabletWriterAddBlockRequest>>;
     std::queue<AddBlockReq> _pending_blocks;
-    ReusableClosure<PTabletWriterAddBlockResult>* _add_block_closure = nullptr;
+    std::shared_ptr<WriteBlockCallback<PTabletWriterAddBlockResult>> 
_send_block_callback = nullptr;
 
     bool _is_incremental;
 };
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp 
b/be/test/vec/runtime/vdata_stream_test.cpp
index 80adee73749..c1ce87fe56d 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -84,6 +84,10 @@ public:
                          << ", fragment_instance_id=" << 
print_id(request->finst_id())
                          << ", node=" << request->node_id();
         }
+        if (done != nullptr) {
+            st.to_protobuf(response->mutable_status());
+            done->Run();
+        }
     }
 
 private:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to