This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 25b1bc76c0a [fix](scan) Fix incorrect query results due to data race 
of compaction and parallel scanners building (#40552)
25b1bc76c0a is described below

commit 25b1bc76c0adfee4442083902b8008b703261210
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Wed Sep 11 21:59:31 2024 +0800

    [fix](scan) Fix incorrect query results due to data race of compaction and 
parallel scanners building (#40552)
    
    ## Proposed changes
    
    Capture rowset splits and delete predicates atomicly in
    `ParallelScannerBuilder::_load` as a single read source.
    
    In this way, we could prevent reading stale rowsets with the delete
    predicates eliminated by (base) compaction.
---
 be/src/olap/parallel_scanner_builder.cpp | 66 +++++++++++++++-----------------
 be/src/olap/parallel_scanner_builder.h   |  6 ++-
 2 files changed, 35 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index 10bd61cd8d5..33e2762aa44 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -17,9 +17,12 @@
 
 #include "parallel_scanner_builder.h"
 
+#include <cstddef>
+
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet_hotspot.h"
 #include "cloud/config.h"
+#include "common/status.h"
 #include "olap/rowset/beta_rowset.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "vec/exec/scan/new_olap_scanner.h"
@@ -42,35 +45,28 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
     DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
 
     for (auto&& [tablet, version] : _tablets) {
-        DCHECK(_all_rowsets.contains(tablet->tablet_id()));
-        auto& rowsets = _all_rowsets[tablet->tablet_id()];
-
-        TabletReader::ReadSource reade_source_with_delete_info;
+        DCHECK(_all_read_sources.contains(tablet->tablet_id()));
+        auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
 
         if (config::is_cloud_mode()) {
             // FIXME(plat1ko): Avoid pointer cast
             
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
         }
 
-        if (!_state->skip_delete_predicate()) {
-            RETURN_IF_ERROR(tablet->capture_rs_readers(
-                    {0, version}, &reade_source_with_delete_info.rs_splits, 
false));
-            reade_source_with_delete_info.fill_delete_predicates();
-        }
-
-        TabletReader::ReadSource read_source;
-
+        // `rs_splits` in `entire read source` will be devided into several 
partitial read sources
+        // to build several parallel scanners, based on segment rows number. 
All the partitial read sources
+        // share the same delete predicates from their corresponding entire 
read source.
+        TabletReader::ReadSource partitial_read_source;
         int64_t rows_collected = 0;
-        for (auto& rowset : rowsets) {
-            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
-            RowsetReaderSharedPtr reader;
-            RETURN_IF_ERROR(beta_rowset->create_reader(&reader));
-            const auto rowset_id = beta_rowset->rowset_id();
+        for (auto& rs_split : entire_read_source.rs_splits) {
+            auto reader = rs_split.rs_reader;
+            auto rowset = reader->rowset();
+            const auto rowset_id = rowset->rowset_id();
 
             DCHECK(_segment_cache_handles.contains(rowset_id));
             auto& segment_cache_handle = _segment_cache_handles[rowset_id];
 
-            if (beta_rowset->num_rows() == 0) {
+            if (rowset->num_rows() == 0) {
                 continue;
             }
 
@@ -110,14 +106,14 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
                         DCHECK_EQ(split.segment_offsets.second - 
split.segment_offsets.first,
                                   split.segment_row_ranges.size());
 
-                        read_source.rs_splits.emplace_back(std::move(split));
+                        
partitial_read_source.rs_splits.emplace_back(std::move(split));
 
                         scanners.emplace_back(
                                 _build_scanner(tablet, version, _key_ranges,
-                                               
{std::move(read_source.rs_splits),
-                                                
reade_source_with_delete_info.delete_predicates}));
+                                               
{std::move(partitial_read_source.rs_splits),
+                                                
entire_read_source.delete_predicates}));
 
-                        read_source = TabletReader::ReadSource();
+                        partitial_read_source = {};
                         split = RowSetSplits(reader->clone());
                         row_ranges = RowRanges();
 
@@ -141,25 +137,24 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
                 DCHECK_GT(split.segment_offsets.second, 
split.segment_offsets.first);
                 DCHECK_EQ(split.segment_row_ranges.size(),
                           split.segment_offsets.second - 
split.segment_offsets.first);
-                read_source.rs_splits.emplace_back(std::move(split));
+                partitial_read_source.rs_splits.emplace_back(std::move(split));
             }
         } // end `for (auto& rowset : rowsets)`
 
         DCHECK_LE(rows_collected, _rows_per_scanner);
         if (rows_collected > 0) {
-            DCHECK_GT(read_source.rs_splits.size(), 0);
+            DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
 #ifndef NDEBUG
-            for (auto& split : read_source.rs_splits) {
+            for (auto& split : partitial_read_source.rs_splits) {
                 DCHECK(split.rs_reader != nullptr);
                 DCHECK_LT(split.segment_offsets.first, 
split.segment_offsets.second);
                 DCHECK_EQ(split.segment_row_ranges.size(),
                           split.segment_offsets.second - 
split.segment_offsets.first);
             }
 #endif
-            scanners.emplace_back(
-                    _build_scanner(tablet, version, _key_ranges,
-                                   {std::move(read_source.rs_splits),
-                                    
reade_source_with_delete_info.delete_predicates}));
+            scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
+                                                 
{std::move(partitial_read_source.rs_splits),
+                                                  
entire_read_source.delete_predicates}));
         }
     }
 
@@ -173,16 +168,17 @@ Status ParallelScannerBuilder::_load() {
     _total_rows = 0;
     for (auto&& [tablet, version] : _tablets) {
         const auto tablet_id = tablet->tablet_id();
-        auto& rowsets = _all_rowsets[tablet_id];
-        {
-            std::shared_lock read_lock(tablet->get_header_lock());
-            RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, 
version}, &rowsets));
+        auto& read_source = _all_read_sources[tablet_id];
+        RETURN_IF_ERROR(tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false));
+        if (!_state->skip_delete_predicate()) {
+            read_source.fill_delete_predicates();
         }
-
         bool enable_segment_cache = 
_state->query_options().__isset.enable_segment_cache
                                             ? 
_state->query_options().enable_segment_cache
                                             : true;
-        for (auto& rowset : rowsets) {
+
+        for (auto& rs_split : read_source.rs_splits) {
+            auto rowset = rs_split.rs_reader->rowset();
             RETURN_IF_ERROR(rowset->load());
             const auto rowset_id = rowset->rowset_id();
             auto& segment_cache_handle = _segment_cache_handles[rowset_id];
diff --git a/be/src/olap/parallel_scanner_builder.h 
b/be/src/olap/parallel_scanner_builder.h
index eb25e183df2..934d769ed59 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -19,8 +19,10 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <utility>
 
+#include "olap/rowset/rowset_fwd.h"
 #include "olap/rowset/segment_v2/row_ranges.h"
 #include "olap/segment_loader.h"
 #include "olap/tablet.h"
@@ -90,7 +92,7 @@ private:
     bool _is_preaggregation;
     std::vector<TabletWithVersion> _tablets;
     std::vector<OlapScanRange*> _key_ranges;
-    std::unordered_map<int64_t, std::vector<RowsetSharedPtr>> _all_rowsets;
+    std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to