This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 741807bb22b [performance](move-memtable) only call _select_streams 
when necessary (#35576) (#37406)
741807bb22b is described below

commit 741807bb22bf64ae3ca21486551e80938ffa57b5
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Wed Jul 10 22:20:23 2024 +0800

    [performance](move-memtable) only call _select_streams when necessary 
(#35576) (#37406)
    
    cherry-pick #35576
---
 be/src/vec/sink/delta_writer_v2_pool.cpp                     |  4 +++-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp                 | 12 ++++++++----
 be/src/vec/sink/writer/vtablet_writer_v2.h                   |  2 +-
 .../fault_injection_p0/test_writer_v2_fault_injection.groovy |  4 ++--
 4 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index cfb2b5294c7..87c18194127 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -37,7 +37,9 @@ std::shared_ptr<DeltaWriterV2> 
DeltaWriterV2Map::get_or_create(
         return _map.at(tablet_id);
     }
     std::shared_ptr<DeltaWriterV2> writer = creator();
-    _map[tablet_id] = writer;
+    if (writer != nullptr) {
+        _map[tablet_id] = writer;
+    }
     return writer;
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c1c6e1cfc86..a6abe14f4f1 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -417,17 +417,21 @@ Status VTabletWriterV2::write(Block& input_block) {
 
     // For each tablet, send its input_rows from block to delta writer
     for (const auto& [tablet_id, rows] : rows_for_tablet) {
-        Streams streams;
-        RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, 
rows.index_id, streams));
-        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
+        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
     }
 
     return Status::OK();
 }
 
 Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block, int64_t tablet_id,
-                                        const Rows& rows, const Streams& 
streams) {
+                                        const Rows& rows) {
     auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, 
[&]() {
+        Streams streams;
+        auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, 
streams);
+        if (!st.ok()) [[unlikely]] {
+            LOG(WARNING) << st << ", load_id=" << print_id(_load_id);
+            return std::unique_ptr<DeltaWriterV2>(nullptr);
+        }
         WriteRequest req {
                 .tablet_id = tablet_id,
                 .txn_id = _txn_id,
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 5a9890cdb49..ff31e1552dd 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -140,7 +140,7 @@ private:
                                    RowsForTablet& rows_for_tablet);
 
     Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t 
tablet_id,
-                           const Rows& rows, const Streams& streams);
+                           const Rows& rows);
 
     Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t 
index_id,
                            Streams& streams);
diff --git 
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
index e6e5758b2b3..eaf87127abc 100644
--- 
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
@@ -88,11 +88,11 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
     // VTabletWriterV2 tablet_location is null
     
load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
 "unknown tablet location")
     // VTabletWriterV2 location is null
-    load_with_injection("VTabletWriterV2._select_streams.location_null", 
"unknown tablet location")
+    load_with_injection("VTabletWriterV2._select_streams.location_null", 
"failed to open DeltaWriter for tablet")
     // VTabletWriterV2 cancel
     load_with_injection("VTabletWriterV2.close.cancel", "load cancel")
     // DeltaWriterV2 stream_size is 0
     load_with_injection("DeltaWriterV2.init.stream_size", "failed to find 
tablet schema")
 
     sql """ set enable_memtable_on_sink_node=false """
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to