github-actions[bot] commented on code in PR #26718:
URL: https://github.com/apache/doris/pull/26718#discussion_r1388074816


##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -357,46 +344,47 @@
 
 void VNodeChannel::_open_internal(bool is_incremental) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    PTabletWriterOpenRequest request;
-    request.set_allocated_id(&_parent->_load_id);
-    request.set_index_id(_index_channel->_index_id);
-    request.set_txn_id(_parent->_txn_id);
-    request.set_allocated_schema(_parent->_schema->to_protobuf());
+    auto request = std::make_shared<PTabletWriterOpenRequest>();
+    request->set_allocated_id(&_parent->_load_id);
+    request->set_index_id(_index_channel->_index_id);
+    request->set_txn_id(_parent->_txn_id);
+    request->set_allocated_schema(_parent->_schema->to_protobuf());
     std::set<int64_t> deduper;
     for (auto& tablet : _all_tablets) {
         if (deduper.contains(tablet.tablet_id)) {
             continue;
         }
-        auto ptablet = request.add_tablets();
+        auto ptablet = request->add_tablets();
         ptablet->set_partition_id(tablet.partition_id);
         ptablet->set_tablet_id(tablet.tablet_id);
         deduper.insert(tablet.tablet_id);
     }
-    request.set_num_senders(_parent->_num_senders);
-    request.set_need_gen_rollup(false); // Useless but it is a required field 
in pb
-    request.set_load_mem_limit(_parent->_load_mem_limit);
-    request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
-    request.set_is_high_priority(_parent->_is_high_priority);
-    request.set_sender_ip(BackendOptions::get_localhost());
-    request.set_is_vectorized(true);
-    request.set_backend_id(_node_id);
-    request.set_enable_profile(_state->enable_profile());
-    request.set_is_incremental(is_incremental);
-
-    auto* open_closure = new RefCountClosure<PTabletWriterOpenResult> {};
-    open_closure->ref();
-
-    open_closure->ref(); // This ref is for RPC's reference
-    
open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 
1000);
+    request->set_num_senders(_parent->_num_senders);
+    request->set_need_gen_rollup(false); // Useless but it is a required field 
in pb
+    request->set_load_mem_limit(_parent->_load_mem_limit);
+    request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
+    request->set_is_high_priority(_parent->_is_high_priority);
+    request->set_sender_ip(BackendOptions::get_localhost());
+    request->set_is_vectorized(true);
+    request->set_backend_id(_node_id);
+    request->set_enable_profile(_state->enable_profile());
+    request->set_is_incremental(is_incremental);
+
+    auto open_callback = 
DummyBrpcCallback<PTabletWriterOpenResult>::create_shared();
+    auto open_closure = AutoReleaseClosure<
+            PTabletWriterOpenRequest,
+            
DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request, 
open_callback);
+    
open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec 
* 1000);

Review Comment:
   warning: 1000 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       
open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec 
* 1000);
                                                                                
         ^
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -1524,37 +1524,34 @@ void 
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
         return;
     }
 
-    PTabletWriteSlaveDoneRequest request;
-    request.set_txn_id(txn_id);
-    request.set_tablet_id(tablet_id);
-    request.set_node_id(node_id);
-    request.set_is_succeed(is_succeed);
-    RefCountClosure<PTabletWriteSlaveDoneResult>* closure =
-            new RefCountClosure<PTabletWriteSlaveDoneResult>();
-    closure->ref();
-    closure->ref();
-    closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec 
* 1000);
-    closure->cntl.ignore_eovercrowded();
-    stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, 
&closure->result, closure);
-
-    closure->join();
-    if (closure->cntl.Failed()) {
+    auto request = std::make_shared<PTabletWriteSlaveDoneRequest>();
+    request->set_txn_id(txn_id);
+    request->set_tablet_id(tablet_id);
+    request->set_node_id(node_id);
+    request->set_is_succeed(is_succeed);
+    auto pull_rowset_callback = 
DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared();
+    auto closure = AutoReleaseClosure<
+            PTabletWriteSlaveDoneRequest,
+            
DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request,
+                                                                           
pull_rowset_callback);
+    
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 
1000);

Review Comment:
   warning: 1000 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 
1000);
                                                                                
     ^
   ```
   



##########
be/src/vec/sink/writer/vtablet_writer.h:
##########
@@ -120,26 +119,21 @@
 
 // It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
 // So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after 
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after 
WriteBlockCallback deleted.
 // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, 
`clear_in_flight` after rpc failure or callback,
 // then next send will start, and it will wait for the rpc callback to 
complete when it is destroyed.
 template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
-    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
-    ~ReusableClosure() override {
-        // shouldn't delete when Run() is calling or going to be called, wait 
for current Run() done.
-        join();
-        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        cntl.Reset();
-    }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+    ENABLE_FACTORY_CREATOR(WriteBlockCallback);
 
-    static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+    WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
+    virtual ~WriteBlockCallback() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 
'override' [modernize-use-override]
   
   ```suggestion
       ~WriteBlockCallback() override = default;
   ```
   



##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -275,26 +275,13 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, 
IndexChannel* index_channel, i
           _index_channel(index_channel),
           _node_id(node_id),
           _is_incremental(is_incremental) {
+    _cur_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>();
     _node_channel_tracker = std::make_shared<MemTracker>(fmt::format(
             "NodeChannel:indexID={}:threadId={}", 
std::to_string(_index_channel->_index_id),
             thread_context()->get_thread_id()));
 }
 
-VNodeChannel::~VNodeChannel() {
-    for (auto& closure : _open_closures) {
-        if (closure != nullptr) {
-            if (closure->unref()) {
-                delete closure;
-            }
-            closure = nullptr;
-        }
-    }
-    if (_add_block_closure != nullptr) {
-        delete _add_block_closure;
-        _add_block_closure = nullptr;
-    }
-    static_cast<void>(_cur_add_block_request.release_id());
-}
+VNodeChannel::~VNodeChannel() {}

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   
   ```suggestion
   VNodeChannel::~VNodeChannel() = default;
   ```
   



##########
be/src/vec/sink/writer/vtablet_writer.h:
##########
@@ -120,26 +119,21 @@ struct AddBatchCounter {
 
 // It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
 // So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after 
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after 
WriteBlockCallback deleted.
 // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, 
`clear_in_flight` after rpc failure or callback,
 // then next send will start, and it will wait for the rpc callback to 
complete when it is destroyed.
 template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
-    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
-    ~ReusableClosure() override {
-        // shouldn't delete when Run() is calling or going to be called, wait 
for current Run() done.
-        join();
-        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        cntl.Reset();
-    }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+    ENABLE_FACTORY_CREATOR(WriteBlockCallback);
 
-    static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+    WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}

Review Comment:
   warning: use '= default' to define a trivial default constructor 
[modernize-use-equals-default]
   
   ```suggestion
       WriteBlockCallback() : cid(INVALID_BTHREAD_ID) = default;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to