This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch opt_memtable_speed
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/opt_memtable_speed by this 
push:
     new c667f3d911 [perf](move-memtable) only connect to nodes with tablets 
(#23213)
c667f3d911 is described below

commit c667f3d9112216ede317af5080304bddc5b74ebb
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Sun Aug 20 09:55:54 2023 +0800

    [perf](move-memtable) only connect to nodes with tablets (#23213)
    
    * [perf](move-memtable) only connect to nodes with tablets
---
 be/src/runtime/load_stream.cpp      |  6 ++--
 be/src/runtime/load_stream.h        |  4 +--
 be/src/vec/sink/vtablet_sink_v2.cpp | 61 ++++++++++++++++---------------------
 be/src/vec/sink/vtablet_sink_v2.h   | 15 +++++++--
 gensrc/proto/internal_service.proto |  7 +----
 5 files changed, 46 insertions(+), 47 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index e53d460b92..ec82c3e949 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -189,7 +189,7 @@ Status IndexStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data)
 
 Status IndexStream::close(std::vector<int64_t>* success_tablet_ids,
                         std::vector<int64_t>* failed_tablet_ids,
-                        std::vector<NeedCommitTabletInfo>* 
need_commit_tablet_info) {
+                        std::vector<PTabletWithPartition>* 
need_commit_tablet_info) {
     std::lock_guard lock_guard(_lock);
     SCOPED_TIMER(_close_wait_timer);
     // open all need commit tablets
@@ -249,7 +249,7 @@ Status LoadStream::init(const POpenStreamSinkRequest* 
request) {
 
 Status LoadStream::close(uint32_t sender_id, std::vector<int64_t>* 
success_tablet_ids,
                          std::vector<int64_t>* failed_tablet_ids,
-                         std::vector<NeedCommitTabletInfo>* 
need_commit_tablet_info) {
+                         std::vector<PTabletWithPartition>* 
need_commit_tablet_info) {
     if (sender_id >= _senders_status.size()) {
         LOG(WARNING) << "out of range sender id " << sender_id << "  num "
                      << _senders_status.size();
@@ -402,7 +402,7 @@ int LoadStream::on_received_messages(StreamId id, 
butil::IOBuf* const messages[]
             std::vector<int64_t> success_tablet_ids;
             std::vector<int64_t> failed_tablet_ids;
             auto& need_commit_tablet_info = hdr.need_commit_tablet_info();
-            std::vector<NeedCommitTabletInfo> 
info(need_commit_tablet_info.begin(), need_commit_tablet_info.end());
+            std::vector<PTabletWithPartition> 
info(need_commit_tablet_info.begin(), need_commit_tablet_info.end());
             auto st = close(hdr.sender_id(), &success_tablet_ids, 
&failed_tablet_ids,
                             &info);
             _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 61a9fab34b..6df7782284 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -79,7 +79,7 @@ public:
 
     void flush(uint32_t sender_id);
     Status close(std::vector<int64_t>* success_tablet_ids, 
std::vector<int64_t>* failed_tablet_ids,
-               std::vector<NeedCommitTabletInfo>* need_commit_tablet_info);
+               std::vector<PTabletWithPartition>* need_commit_tablet_info);
 
 private:
     int64_t _id;
@@ -110,7 +110,7 @@ public:
 
     Status close(uint32_t sender_id, std::vector<int64_t>* success_tablet_ids,
                  std::vector<int64_t>* failed_tablet_ids,
-                 std::vector<NeedCommitTabletInfo>* need_commit_tablet_info);
+                 std::vector<PTabletWithPartition>* need_commit_tablet_info);
 
     // callbacks called by brpc
     int on_received_messages(StreamId id, butil::IOBuf* const messages[], 
size_t size) override;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index a61d62749b..9a17383288 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -228,16 +228,21 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
     _stream_pool_for_node = std::make_shared<StreamPoolForNode>();
     _node_id_for_stream = std::make_shared<NodeIdForStream>();
     _delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
+    _build_tablet_node_mapping();
     RETURN_IF_ERROR(_init_stream_pools());
 
     return Status::OK();
 }
 
 Status VOlapTableSinkV2::_init_stream_pools() {
-    for (auto& [node_id, node_info] : _nodes_info->nodes_info()) {
+    for (auto& [node_id, _] : _tablets_for_node) {
+        auto node_info = _nodes_info->find_node(node_id);
+        if (node_info == nullptr) {
+            return Status::InternalError("Unknown node {} in tablet location", 
node_id);
+        }
         _stream_pool_for_node->insert({node_id, StreamPool {}});
         StreamPool& stream_pool = _stream_pool_for_node->at(node_id);
-        RETURN_IF_ERROR(_init_stream_pool(node_info, stream_pool));
+        RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool));
         for (auto stream : stream_pool) {
             _node_id_for_stream->insert({stream, node_id});
         }
@@ -276,26 +281,11 @@ Status VOlapTableSinkV2::_init_stream_pool(const 
NodeInfo& node_info, StreamPool
         request.set_allocated_schema(_schema->to_protobuf());
         if (i == 0) {
             // get tablet schema from each backend only in the 1st stream
-            for (const auto& partition : _vpartition->get_partitions()) {
-                for (const auto& index : partition->indexes) {
-                    if (_tablet_schema_for_index.contains(index.index_id)) {
-                        LOG(INFO) << "get_tablet_schema skipping index id " << 
index.index_id;
-                        // already getting tablet_schema for this index_id
-                        continue;
-                    }
-                    auto tablet_id = index.tablets[0];
-                    auto nodes = _location->find_tablet(tablet_id)->node_ids;
-                    if (std::find(nodes.begin(), nodes.end(), node_info.id) != 
nodes.end()) {
-                        auto req = request.add_tablets();
-                        req->set_tablet_id(tablet_id);
-                        req->set_index_id(index.index_id);
-                        // create an entry in the map to mark this index_id
-                        LOG(INFO) << "get_tablet_schema getting index id " << 
index.index_id;
-                        _tablet_schema_for_index[index.index_id];
-                    }
-                }
+            for (auto& tablet : _indexes_from_node[node_info.id]) {
+                auto req = request.add_tablets();
+                req->set_index_id(tablet.index_id);
+                req->set_tablet_id(tablet.tablet_id);
             }
-            _build_node_partition_tablet_mapping();
         }
         POpenStreamSinkResponse response;
         cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
@@ -323,14 +313,20 @@ Status VOlapTableSinkV2::_init_stream_pool(const 
NodeInfo& node_info, StreamPool
     return Status::OK();
 }
 
-void VOlapTableSinkV2::_build_node_partition_tablet_mapping() {
+void VOlapTableSinkV2::_build_tablet_node_mapping() {
+    std::unordered_set<int64_t> known_indexes;
     for (const auto& partition : _vpartition->get_partitions()) {
         for (const auto& index : partition->indexes) {
             for (const auto& tablet_id : index.tablets) {
                 auto nodes = _location->find_tablet(tablet_id)->node_ids;
                 for (auto& node : nodes) {
-                    
_node_partition_tablet_mapping[node][partition->id].insert(tablet_id);
+                    _tablets_for_node[node].emplace_back(partition->id, 
tablet_id);
+                }
+                if (known_indexes.contains(index.index_id)) [[likely]] {
+                    continue;
                 }
+                _indexes_from_node[nodes[0]].emplace_back(index.index_id, 
tablet_id);
+                known_indexes.insert(index.index_id);
             }
         }
     }
@@ -598,18 +594,15 @@ Status VOlapTableSinkV2::_close_load(brpc::StreamId 
stream) {
     header.set_sender_id(_sender_id);
     header.set_allocated_load_id(&_load_id);
     header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
-    auto node = _node_id_for_stream.get()->at(stream);
-    auto partition_tablet_mapping = _node_partition_tablet_mapping.at(node);
-    for (auto partition_id : _tablet_finder->partition_ids()) {
-        auto tablet_ids_it = partition_tablet_mapping.find(partition_id);
-        if (tablet_ids_it != partition_tablet_mapping.end()) {
-            for (auto& tablet_id : tablet_ids_it->second) {
-                NeedCommitTabletInfo* need_commit_tablet_info =
-                        header.add_need_commit_tablet_info();
-                need_commit_tablet_info->set_tablet_id(tablet_id);
-                need_commit_tablet_info->set_partition_id(partition_id);
-            }
+    auto node_id = _node_id_for_stream.get()->at(stream);
+    for (auto tablet : _tablets_for_node[node_id]) {
+        if (!_tablet_finder->partition_ids().contains(tablet.partition_id)) {
+            continue;
         }
+        PTabletWithPartition* need_commit_tablet_info =
+                header.add_need_commit_tablet_info();
+        need_commit_tablet_info->set_partition_id(tablet.partition_id);
+        need_commit_tablet_info->set_tablet_id(tablet.tablet_id);
     }
     size_t header_len = header.ByteSizeLong();
     buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index a87022163e..3982e9e6d3 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -87,6 +87,16 @@ using NodeIdForStream = std::unordered_map<brpc::StreamId, 
int64_t>;
 using NodePartitionTabletMapping =
         std::unordered_map<int64_t, std::unordered_map<int64_t, 
std::unordered_set<int64_t>>>;
 
+struct TabletWithPartition {
+    int64_t partition_id;
+    int64_t tablet_id;
+};
+
+struct TabletWithIndex {
+    int64_t index_id;
+    int64_t tablet_id;
+};
+
 class StreamSinkHandler : public brpc::StreamInputHandler {
 public:
     StreamSinkHandler(VOlapTableSinkV2* sink) : _sink(sink) {}
@@ -141,7 +151,7 @@ private:
 
     Status _init_stream_pools();
 
-    void _build_node_partition_tablet_mapping();
+    void _build_tablet_node_mapping();
 
     void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
                                    const VOlapTablePartition* partition, 
uint32_t tablet_index,
@@ -221,7 +231,8 @@ private:
 
     std::unordered_set<int64_t> _opened_partitions;
 
-    NodePartitionTabletMapping _node_partition_tablet_mapping;
+    std::unordered_map<int64_t, std::vector<TabletWithPartition>> 
_tablets_for_node;
+    std::unordered_map<int64_t, std::vector<TabletWithIndex>> 
_indexes_from_node;
 
     std::shared_ptr<StreamPoolForNode> _stream_pool_for_node;
     std::shared_ptr<NodeIdForStream> _node_id_for_stream;
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 1cca9612d3..e302ac4e2c 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -733,11 +733,6 @@ message SegmentStatisticsPB {
     optional KeyBoundsPB key_bounds = 4;
 }
 
-message NeedCommitTabletInfo {
-    required int64 tablet_id= 1;
-    required int64 partition_id = 2;
-}
-
 message PStreamHeader {
     enum Opcode {
         APPEND_DATA = 1;
@@ -753,7 +748,7 @@ message PStreamHeader {
     optional bool segment_eos = 7;
     optional uint32 sender_id = 8;
     optional SegmentStatisticsPB segment_statistics = 9;
-    repeated NeedCommitTabletInfo need_commit_tablet_info = 10;
+    repeated PTabletWithPartition need_commit_tablet_info = 10;
 }
 
 service PBackendService {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to