This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e2fc231b7b8c90715019f20086f80d40c02b80d3 Author: Kaijie Chen <c...@apache.org> AuthorDate: Wed May 8 09:49:07 2024 +0800 [refactor](move-memtable) simplify LoadStreamStub::open (#34488) --- be/src/vec/sink/load_stream_stub.cpp | 5 ++--- be/src/vec/sink/load_stream_stub.h | 5 ++--- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 ++++++------ 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 78e1bc691cc..155ce2de349 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -141,8 +141,7 @@ LoadStreamStub::~LoadStreamStub() { } // open_load_stream -Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self, - BrpcClientCache<PBackendService_Stub>* client_cache, +Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector<PTabletID>& tablets_for_schema, int total_streams, @@ -157,7 +156,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self, opt.max_buf_size = config::load_stream_max_buf_size; opt.idle_timeout_ms = idle_timeout_ms; opt.messages_in_batch = config::load_stream_messages_in_batch; - opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self); + opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, shared_from_this()); brpc::Controller cntl; if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) { delete opt.handler; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 8ef40b84145..1f0d2e459d3 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -104,7 +104,7 @@ private: std::weak_ptr<LoadStreamStub> _stub; }; -class LoadStreamStub { +class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> { friend class LoadStreamReplyHandler; public: @@ -125,8 +125,7 @@ public: ~LoadStreamStub(); // open_load_stream - Status open(std::shared_ptr<LoadStreamStub> self, - BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, + Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector<PTabletID>& tablets_for_schema, int total_streams, int64_t idle_timeout_ms, bool enable_profile); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 21a87c150b8..ea7fed96d6d 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -277,15 +277,15 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream // get tablet schema from each backend only in the 1st stream for (auto& stream : streams | std::ranges::views::take(1)) { const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), - *node_info, _txn_id, *_schema, tablets_for_schema, - _total_streams, idle_timeout_ms, _state->enable_profile())); + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + _txn_id, *_schema, tablets_for_schema, _total_streams, + idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema for (auto& stream : streams | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), - *node_info, _txn_id, *_schema, {}, _total_streams, - idle_timeout_ms, _state->enable_profile())); + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, + _state->enable_profile())); } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org