This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e2fc231b7b8c90715019f20086f80d40c02b80d3
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Wed May 8 09:49:07 2024 +0800

    [refactor](move-memtable) simplify LoadStreamStub::open (#34488)
---
 be/src/vec/sink/load_stream_stub.cpp         |  5 ++---
 be/src/vec/sink/load_stream_stub.h           |  5 ++---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 ++++++------
 3 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 78e1bc691cc..155ce2de349 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -141,8 +141,7 @@ LoadStreamStub::~LoadStreamStub() {
 }
 
 // open_load_stream
-Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
-                            BrpcClientCache<PBackendService_Stub>* 
client_cache,
+Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
                             const NodeInfo& node_info, int64_t txn_id,
                             const OlapTableSchemaParam& schema,
                             const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
@@ -157,7 +156,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> 
self,
     opt.max_buf_size = config::load_stream_max_buf_size;
     opt.idle_timeout_ms = idle_timeout_ms;
     opt.messages_in_batch = config::load_stream_messages_in_batch;
-    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
+    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, 
shared_from_this());
     brpc::Controller cntl;
     if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
         delete opt.handler;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 8ef40b84145..1f0d2e459d3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -104,7 +104,7 @@ private:
     std::weak_ptr<LoadStreamStub> _stub;
 };
 
-class LoadStreamStub {
+class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
     friend class LoadStreamReplyHandler;
 
 public:
@@ -125,8 +125,7 @@ public:
             ~LoadStreamStub();
 
     // open_load_stream
-    Status open(std::shared_ptr<LoadStreamStub> self,
-                BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
+    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
                 int64_t txn_id, const OlapTableSchemaParam& schema,
                 const std::vector<PTabletID>& tablets_for_schema, int 
total_streams,
                 int64_t idle_timeout_ms, bool enable_profile);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 21a87c150b8..ea7fed96d6d 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -277,15 +277,15 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, Streams& stream
     // get tablet schema from each backend only in the 1st stream
     for (auto& stream : streams | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-        RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
-                                     *node_info, _txn_id, *_schema, 
tablets_for_schema,
-                                     _total_streams, idle_timeout_ms, 
_state->enable_profile()));
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+                                     _txn_id, *_schema, tablets_for_schema, 
_total_streams,
+                                     idle_timeout_ms, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
     for (auto& stream : streams | std::ranges::views::drop(1)) {
-        RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
-                                     *node_info, _txn_id, *_schema, {}, 
_total_streams,
-                                     idle_timeout_ms, 
_state->enable_profile()));
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+                                     _txn_id, *_schema, {}, _total_streams, 
idle_timeout_ms,
+                                     _state->enable_profile()));
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to