This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 741807bb22b [performance](move-memtable) only call _select_streams when necessary (#35576) (#37406) 741807bb22b is described below commit 741807bb22bf64ae3ca21486551e80938ffa57b5 Author: Kaijie Chen <c...@apache.org> AuthorDate: Wed Jul 10 22:20:23 2024 +0800 [performance](move-memtable) only call _select_streams when necessary (#35576) (#37406) cherry-pick #35576 --- be/src/vec/sink/delta_writer_v2_pool.cpp | 4 +++- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 ++++++++---- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- .../fault_injection_p0/test_writer_v2_fault_injection.groovy | 4 ++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index cfb2b5294c7..87c18194127 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -37,7 +37,9 @@ std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create( return _map.at(tablet_id); } std::shared_ptr<DeltaWriterV2> writer = creator(); - _map[tablet_id] = writer; + if (writer != nullptr) { + _map[tablet_id] = writer; + } return writer; } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index c1c6e1cfc86..a6abe14f4f1 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -417,17 +417,21 @@ Status VTabletWriterV2::write(Block& input_block) { // For each tablet, send its input_rows from block to delta writer for (const auto& [tablet_id, rows] : rows_for_tablet) { - Streams streams; - RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams)); - RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams)); + RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows)); } return Status::OK(); } Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, - const Rows& rows, const Streams& streams) { + const Rows& rows) { auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { + Streams streams; + auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << st << ", load_id=" << print_id(_load_id); + return std::unique_ptr<DeltaWriterV2>(nullptr); + } WriteRequest req { .tablet_id = tablet_id, .txn_id = _txn_id, diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 5a9890cdb49..ff31e1552dd 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -140,7 +140,7 @@ private: RowsForTablet& rows_for_tablet); Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, - const Rows& rows, const Streams& streams); + const Rows& rows); Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, Streams& streams); diff --git a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy index e6e5758b2b3..eaf87127abc 100644 --- a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy @@ -88,11 +88,11 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") { // VTabletWriterV2 tablet_location is null load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location") // VTabletWriterV2 location is null - load_with_injection("VTabletWriterV2._select_streams.location_null", "unknown tablet location") + load_with_injection("VTabletWriterV2._select_streams.location_null", "failed to open DeltaWriter for tablet") // VTabletWriterV2 cancel load_with_injection("VTabletWriterV2.close.cancel", "load cancel") // DeltaWriterV2 stream_size is 0 load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema") sql """ set enable_memtable_on_sink_node=false """ -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org