This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dd65cc1d145 [opt](MergedIO) no need to merge large columns (#27315) dd65cc1d145 is described below commit dd65cc1d1450771578d0411ffd9c8ed4fa83e90d Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Thu Nov 23 19:15:47 2023 +0800 [opt](MergedIO) no need to merge large columns (#27315) 1. Fix a profile bug of `MergeRangeFileReader`, and add a profile `ApplyBytes` to show the total bytes of ranges. 2. There's no need to merge large columns, because `MergeRangeFileReader` will increase the copy time. --- be/src/io/fs/buffered_reader.cpp | 10 +++++----- be/src/io/fs/buffered_reader.h | 18 +++++++++++++----- be/src/vec/exec/format/orc/vorc_reader.cpp | 12 ++++++++++-- be/test/io/fs/buffered_reader_test.cpp | 6 +++--- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index caa80616d35..e7f2cc95334 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -54,7 +54,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b Status st = _reader->read_at(offset, result, bytes_read, io_ctx); _statistics.merged_io++; _statistics.request_bytes += *bytes_read; - _statistics.read_bytes += *bytes_read; + _statistics.merged_bytes += *bytes_read; return st; } if (offset + result.size > _random_access_ranges[range_index].end_offset) { @@ -68,10 +68,10 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b if (cached_data.contains(offset)) { // has cached data in box _read_in_box(cached_data, offset, result, &has_read); + _statistics.request_bytes += has_read; if (has_read == result.size) { // all data is read in cache *bytes_read = has_read; - _statistics.request_bytes += has_read; return Status::OK(); } } else if (!cached_data.empty()) { @@ -91,7 +91,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b *bytes_read = has_read + read_size; _statistics.merged_io++; _statistics.request_bytes += read_size; - _statistics.read_bytes += read_size; + _statistics.merged_bytes += read_size; return Status::OK(); } @@ -186,7 +186,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b *bytes_read = has_read + read_size; _statistics.merged_io++; _statistics.request_bytes += read_size; - _statistics.read_bytes += read_size; + _statistics.merged_bytes += read_size; return Status::OK(); } @@ -314,7 +314,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz RETURN_IF_ERROR( _reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx)); _statistics.merged_io++; - _statistics.read_bytes += *bytes_read; + _statistics.merged_bytes += *bytes_read; } SCOPED_RAW_TIMER(&_statistics.copy_time); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 0bde47d79d9..8e5d87fcc2b 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -80,7 +80,8 @@ public: int64_t request_io = 0; int64_t merged_io = 0; int64_t request_bytes = 0; - int64_t read_bytes = 0; + int64_t merged_bytes = 0; + int64_t apply_bytes = 0; }; struct RangeCachedData { @@ -147,6 +148,9 @@ public: // Equivalent min size of each IO that can reach the maximum storage speed limit: // 512KB for oss, 4KB for hdfs _equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE; + for (const PrefetchRange& range : _random_access_ranges) { + _statistics.apply_bytes += range.end_offset - range.start_offset; + } if (_profile != nullptr) { const char* random_profile = "MergedSmallIO"; ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1); @@ -158,8 +162,10 @@ public: random_profile, 1); _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES, random_profile, 1); - _read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES, - random_profile, 1); + _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES, + random_profile, 1); + _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", TUnit::BYTES, + random_profile, 1); } } @@ -184,7 +190,8 @@ public: COUNTER_UPDATE(_request_io, _statistics.request_io); COUNTER_UPDATE(_merged_io, _statistics.merged_io); COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); - COUNTER_UPDATE(_read_bytes, _statistics.read_bytes); + COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); + COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes); } } return Status::OK(); @@ -220,7 +227,8 @@ private: RuntimeProfile::Counter* _request_io; RuntimeProfile::Counter* _merged_io; RuntimeProfile::Counter* _request_bytes; - RuntimeProfile::Counter* _read_bytes; + RuntimeProfile::Counter* _merged_bytes; + RuntimeProfile::Counter* _apply_bytes; int _search_read_range(size_t start_offset, size_t end_offset); void _clean_cached_data(RangeCachedData& cached_data); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index f17dadfad71..7dcff69824b 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -2188,6 +2188,7 @@ void ORCFileInputStream::beforeReadStripe( // Generate prefetch ranges, build stripe file reader. uint64_t offset = current_strip_information->getOffset(); std::vector<io::PrefetchRange> prefetch_ranges; + size_t total_io_size = 0; for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams(); ++stream_id) { std::unique_ptr<orc::StreamInformation> stream = @@ -2195,13 +2196,20 @@ void ORCFileInputStream::beforeReadStripe( uint32_t columnId = stream->getColumnId(); uint64_t length = stream->getLength(); if (selected_columns[columnId]) { + total_io_size += length; doris::io::PrefetchRange prefetch_range = {offset, offset + length}; prefetch_ranges.emplace_back(std::move(prefetch_range)); } offset += length; } - // The underlying page reader will prefetch data in column. - _file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges)); + size_t num_columns = std::count_if(selected_columns.begin(), selected_columns.end(), + [](bool selected) { return selected; }); + if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) { + // The underlying page reader will prefetch data in column. + _file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges)); + } else { + _file_reader = _inner_reader; + } } } // namespace doris::vectorized diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 1f20e007f99..f805889471b 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -295,13 +295,13 @@ TEST_F(BufferedReaderTest, test_read_amplify) { static_cast<void>(merge_reader.read_at(1024 * kb, result, &bytes_read, nullptr)); EXPECT_EQ(bytes_read, 1024 * kb); EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb); - EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb); + EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb); // read column0 result.size = 1 * kb; // will merge column 0 ~ 3 static_cast<void>(merge_reader.read_at(0, result, &bytes_read, nullptr)); EXPECT_EQ(bytes_read, 1 * kb); - EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb); + EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb); // read column1 result.size = 1 * kb; static_cast<void>(merge_reader.read_at(3 * kb, result, &bytes_read, nullptr)); @@ -312,7 +312,7 @@ TEST_F(BufferedReaderTest, test_read_amplify) { result.size = 5 * kb; static_cast<void>(merge_reader.read_at(7 * kb, result, &bytes_read, nullptr)); EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb); - EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb); + EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb); } TEST_F(BufferedReaderTest, test_merged_io) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org