github-actions[bot] commented on code in PR #26650: URL: https://github.com/apache/doris/pull/26650#discussion_r1387594172
########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -261,11 +261,7 @@ class Channel { _ch_cur_pb_block = &_ch_pb_block1; } - virtual ~Channel() { - if (_closure != nullptr && _closure->unref()) { - delete _closure; - } - } + virtual ~Channel() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion virtual ~Channel() = default; ``` ########## be/src/service/internal_service.cpp: ########## @@ -1524,37 +1524,34 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote return; } - PTabletWriteSlaveDoneRequest request; - request.set_txn_id(txn_id); - request.set_tablet_id(tablet_id); - request.set_node_id(node_id); - request.set_is_succeed(is_succeed); - RefCountClosure<PTabletWriteSlaveDoneResult>* closure = - new RefCountClosure<PTabletWriteSlaveDoneResult>(); - closure->ref(); - closure->ref(); - closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); - closure->cntl.ignore_eovercrowded(); - stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure); - - closure->join(); - if (closure->cntl.Failed()) { + auto request = std::make_shared<PTabletWriteSlaveDoneRequest>(); + request->set_txn_id(txn_id); + request->set_tablet_id(tablet_id); + request->set_node_id(node_id); + request->set_is_succeed(is_succeed); + auto pull_rowset_callback = DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriteSlaveDoneRequest, + DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request, + pull_rowset_callback); + closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); Review Comment: warning: 1000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); ^ ``` ########## be/src/vec/sink/writer/vtablet_writer.cpp: ########## @@ -280,21 +280,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i thread_context()->get_thread_id())); } -VNodeChannel::~VNodeChannel() { - for (auto& closure : _open_closures) { - if (closure != nullptr) { - if (closure->unref()) { - delete closure; - } - closure = nullptr; - } - } - if (_add_block_closure != nullptr) { - delete _add_block_closure; - _add_block_closure = nullptr; - } - static_cast<void>(_cur_add_block_request.release_id()); -} +VNodeChannel::~VNodeChannel() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion VNodeChannel::~VNodeChannel() = default; ``` ########## be/src/vec/sink/writer/vtablet_writer.cpp: ########## @@ -357,46 +343,47 @@ void VNodeChannel::_open_internal(bool is_incremental) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - PTabletWriterOpenRequest request; - request.set_allocated_id(&_parent->_load_id); - request.set_index_id(_index_channel->_index_id); - request.set_txn_id(_parent->_txn_id); - request.set_allocated_schema(_parent->_schema->to_protobuf()); + auto request = std::make_shared<PTabletWriterOpenRequest>(); + request->set_allocated_id(&_parent->_load_id); + request->set_index_id(_index_channel->_index_id); + request->set_txn_id(_parent->_txn_id); + request->set_allocated_schema(_parent->_schema->to_protobuf()); std::set<int64_t> deduper; for (auto& tablet : _all_tablets) { if (deduper.contains(tablet.tablet_id)) { continue; } - auto ptablet = request.add_tablets(); + auto ptablet = request->add_tablets(); ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); deduper.insert(tablet.tablet_id); } - request.set_num_senders(_parent->_num_senders); - request.set_need_gen_rollup(false); // Useless but it is a required field in pb - request.set_load_mem_limit(_parent->_load_mem_limit); - request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); - request.set_is_high_priority(_parent->_is_high_priority); - request.set_sender_ip(BackendOptions::get_localhost()); - request.set_is_vectorized(true); - request.set_backend_id(_node_id); - request.set_enable_profile(_state->enable_profile()); - request.set_is_incremental(is_incremental); - - auto* open_closure = new RefCountClosure<PTabletWriterOpenResult> {}; - open_closure->ref(); - - open_closure->ref(); // This ref is for RPC's reference - open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + request->set_num_senders(_parent->_num_senders); + request->set_need_gen_rollup(false); // Useless but it is a required field in pb + request->set_load_mem_limit(_parent->_load_mem_limit); + request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); + request->set_is_high_priority(_parent->_is_high_priority); + request->set_sender_ip(BackendOptions::get_localhost()); + request->set_is_vectorized(true); + request->set_backend_id(_node_id); + request->set_enable_profile(_state->enable_profile()); + request->set_is_incremental(is_incremental); + + auto open_callback = DummyBrpcCallback<PTabletWriterOpenResult>::create_shared(); + auto open_closure = AutoReleaseClosure< + PTabletWriterOpenRequest, + DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request, open_callback); + open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); Review Comment: warning: 1000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); ^ ``` ########## be/src/vec/sink/writer/vtablet_writer.h: ########## @@ -120,26 +119,21 @@ struct AddBatchCounter { // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template <typename T> -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} Review Comment: warning: use '= default' to define a trivial default constructor [modernize-use-equals-default] ```suggestion WriteBlockCallback() : cid(INVALID_BTHREAD_ID) = default; ``` ########## be/src/vec/sink/writer/vtablet_writer.h: ########## @@ -120,26 +119,21 @@ // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template <typename T> -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} + virtual ~WriteBlockCallback() override {} Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ~WriteBlockCallback() override {} ``` ########## be/src/vec/sink/writer/vtablet_writer.h: ########## @@ -120,26 +119,21 @@ // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template <typename T> -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} + virtual ~WriteBlockCallback() override {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion virtual ~WriteBlockCallback() override = default; ``` ########## be/src/vec/sink/writer/vtablet_writer.h: ########## @@ -120,26 +119,21 @@ // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template <typename T> -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} + virtual ~WriteBlockCallback() override {} void addFailedHandler(const std::function<void(bool)>& fn) { failed_handler = fn; } void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { success_handler = fn; } - void join() { + virtual void join() override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion void join() override { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org