github-actions[bot] commented on code in PR #26914:
URL: https://github.com/apache/doris/pull/26914#discussion_r1391273923


##########
be/src/vec/sink/load_stream_stub.cpp:
##########
@@ -190,19 +199,80 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 
 // CLOSE_LOAD
 Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
-    if (--_num_open > 0) {
+    {
+        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+        _tablets_to_commit.insert(_tablets_to_commit.end(), 
tablets_to_commit.begin(),
+                                  tablets_to_commit.end());
+    }
+    if (--_use_cnt > 0) {
         return Status::OK();
     }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
     header.set_src_id(_src_id);
     header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
-    for (const auto& tablet : tablets_to_commit) {
-        *header.add_tablets_to_commit() = tablet;
+    {
+        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+        for (const auto& tablet : _tablets_to_commit) {
+            *header.add_tablets() = tablet;
+        }
+    }
+    return _encode_and_send(header);
+}
+
+// GET_SCHEMA
+Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
+    PStreamHeader header;
+    *header.mutable_load_id() = _load_id;
+    header.set_src_id(_src_id);
+    header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
+    std::ostringstream oss;
+    oss << "fetching tablet schema from stream " << _stream_id << ", load id: 
" << _load_id
+        << ", tablet id:";
+    for (const auto& tablet : tablets) {
+        *header.add_tablets() = tablet;
+        oss << " " << tablet.tablet_id();
     }
+    LOG(INFO) << oss.str();
     return _encode_and_send(header);
 }
 
+void LoadStreamStub::add_schema(const std::vector<PTabletSchemaWithIndex>& 
schemas) {
+    std::lock_guard<bthread::Mutex> lock(_mutex);
+    for (const auto& schema : schemas) {
+        auto tablet_schema = std::make_unique<TabletSchema>();
+        tablet_schema->init_from_pb(schema.tablet_schema());
+        _tablet_schema_for_index->emplace(schema.index_id(), 
std::move(tablet_schema));
+        _enable_unique_mow_for_index->emplace(schema.index_id(),
+                                              
schema.enable_unique_key_merge_on_write());
+    }
+    _schema_cv.notify_all();
+}
+
+Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
+                                       int64_t timeout_ms) {
+    if (_tablet_schema_for_index->contains(index_id)) {
+        return Status::OK();
+    }
+    PTabletID tablet;
+    tablet.set_partition_id(partition_id);
+    tablet.set_index_id(index_id);
+    tablet.set_tablet_id(tablet_id);
+    RETURN_IF_ERROR(get_schema({tablet}));
+
+    MonotonicStopWatch watch;
+    watch.start();
+    while (!_tablet_schema_for_index->contains(index_id) &&
+           watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
+        static_cast<void>(wait_for_new_schema(100));

Review Comment:
   warning: 100 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
           static_cast<void>(wait_for_new_schema(100));
                                                 ^
   ```
   



##########
be/src/runtime/load_stream.cpp:
##########
@@ -360,6 +360,32 @@ void LoadStream::_report_result(StreamId stream, const 
Status& st,
     }
 }
 
+void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {

Review Comment:
   warning: method '_report_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void LoadStream::_report_schema(StreamId stream, const PStreamHeader& 
hdr) {
   ```
   



##########
be/src/vec/sink/vtablet_sink_v2.cpp:
##########
@@ -91,7 +92,45 @@ static Status on_partitions_created(void* writer, 
TCreatePartitionResult* result
     return 
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
 }
 
-void VOlapTableSinkV2::_init_row_distribution() {
+Status VOlapTableSinkV2::_incremental_open_streams(

Review Comment:
   warning: method '_incremental_open_streams' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status VOlapTableSinkV2::_incremental_open_streams(
   ```
   



##########
be/src/vec/sink/load_stream_stub.h:
##########
@@ -176,6 +183,21 @@
         return _handler.close_wait(timeout_ms);
     }
 
+    Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
+                           int64_t timeout_ms = 60000);
+
+    Status wait_for_new_schema(int64_t timeout_ms) {

Review Comment:
   warning: method 'wait_for_new_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status wait_for_new_schema(int64_t timeout_ms) {
   ```
   



##########
be/src/vec/sink/vtablet_sink_v2.cpp:
##########
@@ -208,39 +256,44 @@
     signal::set_signal_task_id(_load_id);
 
     _build_tablet_node_mapping();
-    RETURN_IF_ERROR(_open_streams(state->backend_id()));
-    _init_row_distribution();
+    RETURN_IF_ERROR(_open_streams(_backend_id));
+    RETURN_IF_ERROR(_init_row_distribution());
 
     return Status::OK();
 }
 
 Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
     for (auto& [dst_id, _] : _tablets_for_node) {
-        auto node_info = _nodes_info->find_node(dst_id);
-        if (node_info == nullptr) {
-            return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
-        }
-        std::shared_ptr<Streams> streams;
-        streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, src_id, dst_id, _stream_per_node);
-        // get tablet schema from each backend only in the 1st stream
-        for (auto& stream : *streams | std::ranges::views::take(1)) {
-            const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-            
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
-                                         *node_info, _txn_id, *_schema, 
tablets_for_schema,
-                                         _total_streams, 
_state->enable_profile()));
-        }
-        // for the rest streams, open without getting tablet schema
-        for (auto& stream : *streams | std::ranges::views::drop(1)) {
-            
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
-                                         *node_info, _txn_id, *_schema, {}, 
_total_streams,
-                                         _state->enable_profile()));
-        }
+        auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
+        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
         _streams_for_node[dst_id] = streams;
     }
     return Status::OK();
 }
 
+Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
+                                                  
::doris::stream_load::LoadStreams& streams) {
+    auto node_info = _nodes_info->find_node(dst_id);
+    if (node_info == nullptr) {
+        return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
+    }
+    // get tablet schema from each backend only in the 1st stream
+    for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
+        const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+                                     _txn_id, *_schema, tablets_for_schema, 
_total_streams,
+                                     _state->enable_profile()));
+    }
+    // for the rest streams, open without getting tablet schema
+    for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+                                     _txn_id, *_schema, {}, _total_streams,
+                                     _state->enable_profile()));
+    }
+    return Status::OK();
+}
+

Review Comment:
   warning: method '_build_tablet_node_mapping' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/sink/vtablet_sink_v2.h:133:
   ```diff
   -     void _build_tablet_node_mapping();
   +     static void _build_tablet_node_mapping();
   ```
   



##########
be/src/vec/sink/load_stream_stub.cpp:
##########
@@ -215,21 +285,22 @@
         buf.append(slice.get_data(), slice.get_size());
     }
     bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD;
-    return _send_with_buffer(buf, eos);
+    bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA;
+    return _send_with_buffer(buf, eos || get_schema);
 }
 
-Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool eos) {
+Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {

Review Comment:
   warning: method '_send_with_buffer' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool 
sync) {
   ```
   



##########
be/src/vec/sink/vtablet_sink_v2.cpp:
##########
@@ -208,39 +256,44 @@
     signal::set_signal_task_id(_load_id);
 
     _build_tablet_node_mapping();
-    RETURN_IF_ERROR(_open_streams(state->backend_id()));
-    _init_row_distribution();
+    RETURN_IF_ERROR(_open_streams(_backend_id));
+    RETURN_IF_ERROR(_init_row_distribution());
 
     return Status::OK();
 }
 
 Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
     for (auto& [dst_id, _] : _tablets_for_node) {
-        auto node_info = _nodes_info->find_node(dst_id);
-        if (node_info == nullptr) {
-            return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
-        }
-        std::shared_ptr<Streams> streams;
-        streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, src_id, dst_id, _stream_per_node);
-        // get tablet schema from each backend only in the 1st stream
-        for (auto& stream : *streams | std::ranges::views::take(1)) {
-            const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-            
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
-                                         *node_info, _txn_id, *_schema, 
tablets_for_schema,
-                                         _total_streams, 
_state->enable_profile()));
-        }
-        // for the rest streams, open without getting tablet schema
-        for (auto& stream : *streams | std::ranges::views::drop(1)) {
-            
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
-                                         *node_info, _txn_id, *_schema, {}, 
_total_streams,
-                                         _state->enable_profile()));
-        }
+        auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
+        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
         _streams_for_node[dst_id] = streams;
     }
     return Status::OK();
 }
 
+Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
+                                                  
::doris::stream_load::LoadStreams& streams) {

Review Comment:
   warning: 'auto node_info' can be declared as 'const auto *node_info' 
[readability-qualified-auto]
   
   ```suggestion
       const auto *node_info = _nodes_info->find_node(dst_id);
   ```
   



##########
be/src/vec/sink/load_stream_stub.h:
##########
@@ -176,6 +183,21 @@ class LoadStreamStub {
         return _handler.close_wait(timeout_ms);
     }
 
+    Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
+                           int64_t timeout_ms = 60000);

Review Comment:
   warning: 60000 is a magic number; consider replacing it with a named 
constant [readability-magic-numbers]
   ```cpp
                              int64_t timeout_ms = 60000);
                                                   ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to