This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch opt_memtable_speed in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/opt_memtable_speed by this push: new c667f3d911 [perf](move-memtable) only connect to nodes with tablets (#23213) c667f3d911 is described below commit c667f3d9112216ede317af5080304bddc5b74ebb Author: Kaijie Chen <c...@apache.org> AuthorDate: Sun Aug 20 09:55:54 2023 +0800 [perf](move-memtable) only connect to nodes with tablets (#23213) * [perf](move-memtable) only connect to nodes with tablets --- be/src/runtime/load_stream.cpp | 6 ++-- be/src/runtime/load_stream.h | 4 +-- be/src/vec/sink/vtablet_sink_v2.cpp | 61 ++++++++++++++++--------------------- be/src/vec/sink/vtablet_sink_v2.h | 15 +++++++-- gensrc/proto/internal_service.proto | 7 +---- 5 files changed, 46 insertions(+), 47 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index e53d460b92..ec82c3e949 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -189,7 +189,7 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) Status IndexStream::close(std::vector<int64_t>* success_tablet_ids, std::vector<int64_t>* failed_tablet_ids, - std::vector<NeedCommitTabletInfo>* need_commit_tablet_info) { + std::vector<PTabletWithPartition>* need_commit_tablet_info) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); // open all need commit tablets @@ -249,7 +249,7 @@ Status LoadStream::init(const POpenStreamSinkRequest* request) { Status LoadStream::close(uint32_t sender_id, std::vector<int64_t>* success_tablet_ids, std::vector<int64_t>* failed_tablet_ids, - std::vector<NeedCommitTabletInfo>* need_commit_tablet_info) { + std::vector<PTabletWithPartition>* need_commit_tablet_info) { if (sender_id >= _senders_status.size()) { LOG(WARNING) << "out of range sender id " << sender_id << " num " << _senders_status.size(); @@ -402,7 +402,7 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[] std::vector<int64_t> success_tablet_ids; std::vector<int64_t> failed_tablet_ids; auto& need_commit_tablet_info = hdr.need_commit_tablet_info(); - std::vector<NeedCommitTabletInfo> info(need_commit_tablet_info.begin(), need_commit_tablet_info.end()); + std::vector<PTabletWithPartition> info(need_commit_tablet_info.begin(), need_commit_tablet_info.end()); auto st = close(hdr.sender_id(), &success_tablet_ids, &failed_tablet_ids, &info); _report_result(id, st, &success_tablet_ids, &failed_tablet_ids); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 61a9fab34b..6df7782284 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -79,7 +79,7 @@ public: void flush(uint32_t sender_id); Status close(std::vector<int64_t>* success_tablet_ids, std::vector<int64_t>* failed_tablet_ids, - std::vector<NeedCommitTabletInfo>* need_commit_tablet_info); + std::vector<PTabletWithPartition>* need_commit_tablet_info); private: int64_t _id; @@ -110,7 +110,7 @@ public: Status close(uint32_t sender_id, std::vector<int64_t>* success_tablet_ids, std::vector<int64_t>* failed_tablet_ids, - std::vector<NeedCommitTabletInfo>* need_commit_tablet_info); + std::vector<PTabletWithPartition>* need_commit_tablet_info); // callbacks called by brpc int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override; diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index a61d62749b..9a17383288 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -228,16 +228,21 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { _stream_pool_for_node = std::make_shared<StreamPoolForNode>(); _node_id_for_stream = std::make_shared<NodeIdForStream>(); _delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>(); + _build_tablet_node_mapping(); RETURN_IF_ERROR(_init_stream_pools()); return Status::OK(); } Status VOlapTableSinkV2::_init_stream_pools() { - for (auto& [node_id, node_info] : _nodes_info->nodes_info()) { + for (auto& [node_id, _] : _tablets_for_node) { + auto node_info = _nodes_info->find_node(node_id); + if (node_info == nullptr) { + return Status::InternalError("Unknown node {} in tablet location", node_id); + } _stream_pool_for_node->insert({node_id, StreamPool {}}); StreamPool& stream_pool = _stream_pool_for_node->at(node_id); - RETURN_IF_ERROR(_init_stream_pool(node_info, stream_pool)); + RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool)); for (auto stream : stream_pool) { _node_id_for_stream->insert({stream, node_id}); } @@ -276,26 +281,11 @@ Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, StreamPool request.set_allocated_schema(_schema->to_protobuf()); if (i == 0) { // get tablet schema from each backend only in the 1st stream - for (const auto& partition : _vpartition->get_partitions()) { - for (const auto& index : partition->indexes) { - if (_tablet_schema_for_index.contains(index.index_id)) { - LOG(INFO) << "get_tablet_schema skipping index id " << index.index_id; - // already getting tablet_schema for this index_id - continue; - } - auto tablet_id = index.tablets[0]; - auto nodes = _location->find_tablet(tablet_id)->node_ids; - if (std::find(nodes.begin(), nodes.end(), node_info.id) != nodes.end()) { - auto req = request.add_tablets(); - req->set_tablet_id(tablet_id); - req->set_index_id(index.index_id); - // create an entry in the map to mark this index_id - LOG(INFO) << "get_tablet_schema getting index id " << index.index_id; - _tablet_schema_for_index[index.index_id]; - } - } + for (auto& tablet : _indexes_from_node[node_info.id]) { + auto req = request.add_tablets(); + req->set_index_id(tablet.index_id); + req->set_tablet_id(tablet.tablet_id); } - _build_node_partition_tablet_mapping(); } POpenStreamSinkResponse response; cntl.set_timeout_ms(config::open_stream_sink_timeout_ms); @@ -323,14 +313,20 @@ Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, StreamPool return Status::OK(); } -void VOlapTableSinkV2::_build_node_partition_tablet_mapping() { +void VOlapTableSinkV2::_build_tablet_node_mapping() { + std::unordered_set<int64_t> known_indexes; for (const auto& partition : _vpartition->get_partitions()) { for (const auto& index : partition->indexes) { for (const auto& tablet_id : index.tablets) { auto nodes = _location->find_tablet(tablet_id)->node_ids; for (auto& node : nodes) { - _node_partition_tablet_mapping[node][partition->id].insert(tablet_id); + _tablets_for_node[node].emplace_back(partition->id, tablet_id); + } + if (known_indexes.contains(index.index_id)) [[likely]] { + continue; } + _indexes_from_node[nodes[0]].emplace_back(index.index_id, tablet_id); + known_indexes.insert(index.index_id); } } } @@ -598,18 +594,15 @@ Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) { header.set_sender_id(_sender_id); header.set_allocated_load_id(&_load_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - auto node = _node_id_for_stream.get()->at(stream); - auto partition_tablet_mapping = _node_partition_tablet_mapping.at(node); - for (auto partition_id : _tablet_finder->partition_ids()) { - auto tablet_ids_it = partition_tablet_mapping.find(partition_id); - if (tablet_ids_it != partition_tablet_mapping.end()) { - for (auto& tablet_id : tablet_ids_it->second) { - NeedCommitTabletInfo* need_commit_tablet_info = - header.add_need_commit_tablet_info(); - need_commit_tablet_info->set_tablet_id(tablet_id); - need_commit_tablet_info->set_partition_id(partition_id); - } + auto node_id = _node_id_for_stream.get()->at(stream); + for (auto tablet : _tablets_for_node[node_id]) { + if (!_tablet_finder->partition_ids().contains(tablet.partition_id)) { + continue; } + PTabletWithPartition* need_commit_tablet_info = + header.add_need_commit_tablet_info(); + need_commit_tablet_info->set_partition_id(tablet.partition_id); + need_commit_tablet_info->set_tablet_id(tablet.tablet_id); } size_t header_len = header.ByteSizeLong(); buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index a87022163e..3982e9e6d3 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -87,6 +87,16 @@ using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>; using NodePartitionTabletMapping = std::unordered_map<int64_t, std::unordered_map<int64_t, std::unordered_set<int64_t>>>; +struct TabletWithPartition { + int64_t partition_id; + int64_t tablet_id; +}; + +struct TabletWithIndex { + int64_t index_id; + int64_t tablet_id; +}; + class StreamSinkHandler : public brpc::StreamInputHandler { public: StreamSinkHandler(VOlapTableSinkV2* sink) : _sink(sink) {} @@ -141,7 +151,7 @@ private: Status _init_stream_pools(); - void _build_node_partition_tablet_mapping(); + void _build_tablet_node_mapping(); void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet, const VOlapTablePartition* partition, uint32_t tablet_index, @@ -221,7 +231,8 @@ private: std::unordered_set<int64_t> _opened_partitions; - NodePartitionTabletMapping _node_partition_tablet_mapping; + std::unordered_map<int64_t, std::vector<TabletWithPartition>> _tablets_for_node; + std::unordered_map<int64_t, std::vector<TabletWithIndex>> _indexes_from_node; std::shared_ptr<StreamPoolForNode> _stream_pool_for_node; std::shared_ptr<NodeIdForStream> _node_id_for_stream; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1cca9612d3..e302ac4e2c 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -733,11 +733,6 @@ message SegmentStatisticsPB { optional KeyBoundsPB key_bounds = 4; } -message NeedCommitTabletInfo { - required int64 tablet_id= 1; - required int64 partition_id = 2; -} - message PStreamHeader { enum Opcode { APPEND_DATA = 1; @@ -753,7 +748,7 @@ message PStreamHeader { optional bool segment_eos = 7; optional uint32 sender_id = 8; optional SegmentStatisticsPB segment_statistics = 9; - repeated NeedCommitTabletInfo need_commit_tablet_info = 10; + repeated PTabletWithPartition need_commit_tablet_info = 10; } service PBackendService { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org