github-actions[bot] commented on code in PR #26718: URL: https://github.com/apache/doris/pull/26718#discussion_r1388074816
########## be/src/vec/sink/writer/vtablet_writer.cpp: ########## @@ -357,46 +344,47 @@ void VNodeChannel::_open_internal(bool is_incremental) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - PTabletWriterOpenRequest request; - request.set_allocated_id(&_parent->_load_id); - request.set_index_id(_index_channel->_index_id); - request.set_txn_id(_parent->_txn_id); - request.set_allocated_schema(_parent->_schema->to_protobuf()); + auto request = std::make_shared<PTabletWriterOpenRequest>(); + request->set_allocated_id(&_parent->_load_id); + request->set_index_id(_index_channel->_index_id); + request->set_txn_id(_parent->_txn_id); + request->set_allocated_schema(_parent->_schema->to_protobuf()); std::set<int64_t> deduper; for (auto& tablet : _all_tablets) { if (deduper.contains(tablet.tablet_id)) { continue; } - auto ptablet = request.add_tablets(); + auto ptablet = request->add_tablets(); ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); deduper.insert(tablet.tablet_id); } - request.set_num_senders(_parent->_num_senders); - request.set_need_gen_rollup(false); // Useless but it is a required field in pb - request.set_load_mem_limit(_parent->_load_mem_limit); - request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); - request.set_is_high_priority(_parent->_is_high_priority); - request.set_sender_ip(BackendOptions::get_localhost()); - request.set_is_vectorized(true); - request.set_backend_id(_node_id); - request.set_enable_profile(_state->enable_profile()); - request.set_is_incremental(is_incremental); - - auto* open_closure = new RefCountClosure<PTabletWriterOpenResult> {}; - open_closure->ref(); - - open_closure->ref(); // This ref is for RPC's reference - open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + request->set_num_senders(_parent->_num_senders); + request->set_need_gen_rollup(false); // Useless but it is a required field in pb + request->set_load_mem_limit(_parent->_load_mem_limit); + request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); + request->set_is_high_priority(_parent->_is_high_priority); + request->set_sender_ip(BackendOptions::get_localhost()); + request->set_is_vectorized(true); + request->set_backend_id(_node_id); + request->set_enable_profile(_state->enable_profile()); + request->set_is_incremental(is_incremental); + + auto open_callback = DummyBrpcCallback<PTabletWriterOpenResult>::create_shared(); + auto open_closure = AutoReleaseClosure< + PTabletWriterOpenRequest, + DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request, open_callback); + open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); Review Comment: warning: 1000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); ^ ``` ########## be/src/service/internal_service.cpp: ########## @@ -1524,37 +1524,34 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote return; } - PTabletWriteSlaveDoneRequest request; - request.set_txn_id(txn_id); - request.set_tablet_id(tablet_id); - request.set_node_id(node_id); - request.set_is_succeed(is_succeed); - RefCountClosure<PTabletWriteSlaveDoneResult>* closure = - new RefCountClosure<PTabletWriteSlaveDoneResult>(); - closure->ref(); - closure->ref(); - closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); - closure->cntl.ignore_eovercrowded(); - stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure); - - closure->join(); - if (closure->cntl.Failed()) { + auto request = std::make_shared<PTabletWriteSlaveDoneRequest>(); + request->set_txn_id(txn_id); + request->set_tablet_id(tablet_id); + request->set_node_id(node_id); + request->set_is_succeed(is_succeed); + auto pull_rowset_callback = DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriteSlaveDoneRequest, + DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request, + pull_rowset_callback); + closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); Review Comment: warning: 1000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); ^ ``` ########## be/src/vec/sink/writer/vtablet_writer.h: ########## @@ -120,26 +119,21 @@ // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template <typename T> -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} + virtual ~WriteBlockCallback() override = default; Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ~WriteBlockCallback() override = default; ``` ########## be/src/vec/sink/writer/vtablet_writer.cpp: ########## @@ -275,26 +275,13 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i _index_channel(index_channel), _node_id(node_id), _is_incremental(is_incremental) { + _cur_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>(); _node_channel_tracker = std::make_shared<MemTracker>(fmt::format( "NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), thread_context()->get_thread_id())); } -VNodeChannel::~VNodeChannel() { - for (auto& closure : _open_closures) { - if (closure != nullptr) { - if (closure->unref()) { - delete closure; - } - closure = nullptr; - } - } - if (_add_block_closure != nullptr) { - delete _add_block_closure; - _add_block_closure = nullptr; - } - static_cast<void>(_cur_add_block_request.release_id()); -} +VNodeChannel::~VNodeChannel() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion VNodeChannel::~VNodeChannel() = default; ``` ########## be/src/vec/sink/writer/vtablet_writer.h: ########## @@ -120,26 +119,21 @@ struct AddBatchCounter { // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template <typename T> -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} Review Comment: warning: use '= default' to define a trivial default constructor [modernize-use-equals-default] ```suggestion WriteBlockCallback() : cid(INVALID_BTHREAD_ID) = default; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org