This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 25b1bc76c0a [fix](scan) Fix incorrect query results due to data race of compaction and parallel scanners building (#40552) 25b1bc76c0a is described below commit 25b1bc76c0adfee4442083902b8008b703261210 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Wed Sep 11 21:59:31 2024 +0800 [fix](scan) Fix incorrect query results due to data race of compaction and parallel scanners building (#40552) ## Proposed changes Capture rowset splits and delete predicates atomicly in `ParallelScannerBuilder::_load` as a single read source. In this way, we could prevent reading stale rowsets with the delete predicates eliminated by (base) compaction. --- be/src/olap/parallel_scanner_builder.cpp | 66 +++++++++++++++----------------- be/src/olap/parallel_scanner_builder.h | 6 ++- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 10bd61cd8d5..33e2762aa44 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -17,9 +17,12 @@ #include "parallel_scanner_builder.h" +#include <cstddef> + #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet_hotspot.h" #include "cloud/config.h" +#include "common/status.h" #include "olap/rowset/beta_rowset.h" #include "pipeline/exec/olap_scan_operator.h" #include "vec/exec/scan/new_olap_scanner.h" @@ -42,35 +45,28 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>& DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); for (auto&& [tablet, version] : _tablets) { - DCHECK(_all_rowsets.contains(tablet->tablet_id())); - auto& rowsets = _all_rowsets[tablet->tablet_id()]; - - TabletReader::ReadSource reade_source_with_delete_info; + DCHECK(_all_read_sources.contains(tablet->tablet_id())); + auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; if (config::is_cloud_mode()) { // FIXME(plat1ko): Avoid pointer cast ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); } - if (!_state->skip_delete_predicate()) { - RETURN_IF_ERROR(tablet->capture_rs_readers( - {0, version}, &reade_source_with_delete_info.rs_splits, false)); - reade_source_with_delete_info.fill_delete_predicates(); - } - - TabletReader::ReadSource read_source; - + // `rs_splits` in `entire read source` will be devided into several partitial read sources + // to build several parallel scanners, based on segment rows number. All the partitial read sources + // share the same delete predicates from their corresponding entire read source. + TabletReader::ReadSource partitial_read_source; int64_t rows_collected = 0; - for (auto& rowset : rowsets) { - auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset); - RowsetReaderSharedPtr reader; - RETURN_IF_ERROR(beta_rowset->create_reader(&reader)); - const auto rowset_id = beta_rowset->rowset_id(); + for (auto& rs_split : entire_read_source.rs_splits) { + auto reader = rs_split.rs_reader; + auto rowset = reader->rowset(); + const auto rowset_id = rowset->rowset_id(); DCHECK(_segment_cache_handles.contains(rowset_id)); auto& segment_cache_handle = _segment_cache_handles[rowset_id]; - if (beta_rowset->num_rows() == 0) { + if (rowset->num_rows() == 0) { continue; } @@ -110,14 +106,14 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>& DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, split.segment_row_ranges.size()); - read_source.rs_splits.emplace_back(std::move(split)); + partitial_read_source.rs_splits.emplace_back(std::move(split)); scanners.emplace_back( _build_scanner(tablet, version, _key_ranges, - {std::move(read_source.rs_splits), - reade_source_with_delete_info.delete_predicates})); + {std::move(partitial_read_source.rs_splits), + entire_read_source.delete_predicates})); - read_source = TabletReader::ReadSource(); + partitial_read_source = {}; split = RowSetSplits(reader->clone()); row_ranges = RowRanges(); @@ -141,25 +137,24 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>& DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); DCHECK_EQ(split.segment_row_ranges.size(), split.segment_offsets.second - split.segment_offsets.first); - read_source.rs_splits.emplace_back(std::move(split)); + partitial_read_source.rs_splits.emplace_back(std::move(split)); } } // end `for (auto& rowset : rowsets)` DCHECK_LE(rows_collected, _rows_per_scanner); if (rows_collected > 0) { - DCHECK_GT(read_source.rs_splits.size(), 0); + DCHECK_GT(partitial_read_source.rs_splits.size(), 0); #ifndef NDEBUG - for (auto& split : read_source.rs_splits) { + for (auto& split : partitial_read_source.rs_splits) { DCHECK(split.rs_reader != nullptr); DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); DCHECK_EQ(split.segment_row_ranges.size(), split.segment_offsets.second - split.segment_offsets.first); } #endif - scanners.emplace_back( - _build_scanner(tablet, version, _key_ranges, - {std::move(read_source.rs_splits), - reade_source_with_delete_info.delete_predicates})); + scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, + {std::move(partitial_read_source.rs_splits), + entire_read_source.delete_predicates})); } } @@ -173,16 +168,17 @@ Status ParallelScannerBuilder::_load() { _total_rows = 0; for (auto&& [tablet, version] : _tablets) { const auto tablet_id = tablet->tablet_id(); - auto& rowsets = _all_rowsets[tablet_id]; - { - std::shared_lock read_lock(tablet->get_header_lock()); - RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, version}, &rowsets)); + auto& read_source = _all_read_sources[tablet_id]; + RETURN_IF_ERROR(tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false)); + if (!_state->skip_delete_predicate()) { + read_source.fill_delete_predicates(); } - bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache ? _state->query_options().enable_segment_cache : true; - for (auto& rowset : rowsets) { + + for (auto& rs_split : read_source.rs_splits) { + auto rowset = rs_split.rs_reader->rowset(); RETURN_IF_ERROR(rowset->load()); const auto rowset_id = rowset->rowset_id(); auto& segment_cache_handle = _segment_cache_handles[rowset_id]; diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index eb25e183df2..934d769ed59 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -19,8 +19,10 @@ #include <memory> #include <string> +#include <unordered_map> #include <utility> +#include "olap/rowset/rowset_fwd.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/segment_loader.h" #include "olap/tablet.h" @@ -90,7 +92,7 @@ private: bool _is_preaggregation; std::vector<TabletWithVersion> _tablets; std::vector<OlapScanRange*> _key_ranges; - std::unordered_map<int64_t, std::vector<RowsetSharedPtr>> _all_rowsets; + std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources; }; -} // namespace doris \ No newline at end of file +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org