This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dd65cc1d145 [opt](MergedIO) no need to merge large columns (#27315)
dd65cc1d145 is described below

commit dd65cc1d1450771578d0411ffd9c8ed4fa83e90d
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Thu Nov 23 19:15:47 2023 +0800

    [opt](MergedIO) no need to merge large columns (#27315)
    
    1. Fix a profile bug of `MergeRangeFileReader`, and add a profile 
`ApplyBytes` to show the total bytes  of ranges.
    2. There's no need to merge large columns, because `MergeRangeFileReader` 
will increase the copy time.
---
 be/src/io/fs/buffered_reader.cpp           | 10 +++++-----
 be/src/io/fs/buffered_reader.h             | 18 +++++++++++++-----
 be/src/vec/exec/format/orc/vorc_reader.cpp | 12 ++++++++++--
 be/test/io/fs/buffered_reader_test.cpp     |  6 +++---
 4 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index caa80616d35..e7f2cc95334 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -54,7 +54,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
         Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
         _statistics.merged_io++;
         _statistics.request_bytes += *bytes_read;
-        _statistics.read_bytes += *bytes_read;
+        _statistics.merged_bytes += *bytes_read;
         return st;
     }
     if (offset + result.size > _random_access_ranges[range_index].end_offset) {
@@ -68,10 +68,10 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
     if (cached_data.contains(offset)) {
         // has cached data in box
         _read_in_box(cached_data, offset, result, &has_read);
+        _statistics.request_bytes += has_read;
         if (has_read == result.size) {
             // all data is read in cache
             *bytes_read = has_read;
-            _statistics.request_bytes += has_read;
             return Status::OK();
         }
     } else if (!cached_data.empty()) {
@@ -91,7 +91,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
         *bytes_read = has_read + read_size;
         _statistics.merged_io++;
         _statistics.request_bytes += read_size;
-        _statistics.read_bytes += read_size;
+        _statistics.merged_bytes += read_size;
         return Status::OK();
     }
 
@@ -186,7 +186,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
         *bytes_read = has_read + read_size;
         _statistics.merged_io++;
         _statistics.request_bytes += read_size;
-        _statistics.read_bytes += read_size;
+        _statistics.merged_bytes += read_size;
         return Status::OK();
     }
 
@@ -314,7 +314,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, 
size_t start_offset, siz
         RETURN_IF_ERROR(
                 _reader->read_at(start_offset, Slice(_read_slice, to_read), 
bytes_read, io_ctx));
         _statistics.merged_io++;
-        _statistics.read_bytes += *bytes_read;
+        _statistics.merged_bytes += *bytes_read;
     }
 
     SCOPED_RAW_TIMER(&_statistics.copy_time);
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 0bde47d79d9..8e5d87fcc2b 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -80,7 +80,8 @@ public:
         int64_t request_io = 0;
         int64_t merged_io = 0;
         int64_t request_bytes = 0;
-        int64_t read_bytes = 0;
+        int64_t merged_bytes = 0;
+        int64_t apply_bytes = 0;
     };
 
     struct RangeCachedData {
@@ -147,6 +148,9 @@ public:
         // Equivalent min size of each IO that can reach the maximum storage 
speed limit:
         // 512KB for oss, 4KB for hdfs
         _equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
+        for (const PrefetchRange& range : _random_access_ranges) {
+            _statistics.apply_bytes += range.end_offset - range.start_offset;
+        }
         if (_profile != nullptr) {
             const char* random_profile = "MergedSmallIO";
             ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
@@ -158,8 +162,10 @@ public:
                                                       random_profile, 1);
             _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, 
"RequestBytes", TUnit::BYTES,
                                                           random_profile, 1);
-            _read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, 
"MergedBytes", TUnit::BYTES,
-                                                       random_profile, 1);
+            _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, 
"MergedBytes", TUnit::BYTES,
+                                                         random_profile, 1);
+            _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, 
"ApplyBytes", TUnit::BYTES,
+                                                        random_profile, 1);
         }
     }
 
@@ -184,7 +190,8 @@ public:
                 COUNTER_UPDATE(_request_io, _statistics.request_io);
                 COUNTER_UPDATE(_merged_io, _statistics.merged_io);
                 COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
-                COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+                COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+                COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
             }
         }
         return Status::OK();
@@ -220,7 +227,8 @@ private:
     RuntimeProfile::Counter* _request_io;
     RuntimeProfile::Counter* _merged_io;
     RuntimeProfile::Counter* _request_bytes;
-    RuntimeProfile::Counter* _read_bytes;
+    RuntimeProfile::Counter* _merged_bytes;
+    RuntimeProfile::Counter* _apply_bytes;
 
     int _search_read_range(size_t start_offset, size_t end_offset);
     void _clean_cached_data(RangeCachedData& cached_data);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index f17dadfad71..7dcff69824b 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -2188,6 +2188,7 @@ void ORCFileInputStream::beforeReadStripe(
     // Generate prefetch ranges, build stripe file reader.
     uint64_t offset = current_strip_information->getOffset();
     std::vector<io::PrefetchRange> prefetch_ranges;
+    size_t total_io_size = 0;
     for (uint64_t stream_id = 0; stream_id < 
current_strip_information->getNumberOfStreams();
          ++stream_id) {
         std::unique_ptr<orc::StreamInformation> stream =
@@ -2195,13 +2196,20 @@ void ORCFileInputStream::beforeReadStripe(
         uint32_t columnId = stream->getColumnId();
         uint64_t length = stream->getLength();
         if (selected_columns[columnId]) {
+            total_io_size += length;
             doris::io::PrefetchRange prefetch_range = {offset, offset + 
length};
             prefetch_ranges.emplace_back(std::move(prefetch_range));
         }
         offset += length;
     }
-    // The underlying page reader will prefetch data in column.
-    _file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, 
prefetch_ranges));
+    size_t num_columns = std::count_if(selected_columns.begin(), 
selected_columns.end(),
+                                       [](bool selected) { return selected; });
+    if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) {
+        // The underlying page reader will prefetch data in column.
+        _file_reader.reset(new io::MergeRangeFileReader(_profile, 
_inner_reader, prefetch_ranges));
+    } else {
+        _file_reader = _inner_reader;
+    }
 }
 
 } // namespace doris::vectorized
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index 1f20e007f99..f805889471b 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -295,13 +295,13 @@ TEST_F(BufferedReaderTest, test_read_amplify) {
     static_cast<void>(merge_reader.read_at(1024 * kb, result, &bytes_read, 
nullptr));
     EXPECT_EQ(bytes_read, 1024 * kb);
     EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb);
-    EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb);
+    EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb);
     // read column0
     result.size = 1 * kb;
     // will merge column 0 ~ 3
     static_cast<void>(merge_reader.read_at(0, result, &bytes_read, nullptr));
     EXPECT_EQ(bytes_read, 1 * kb);
-    EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
+    EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb);
     // read column1
     result.size = 1 * kb;
     static_cast<void>(merge_reader.read_at(3 * kb, result, &bytes_read, 
nullptr));
@@ -312,7 +312,7 @@ TEST_F(BufferedReaderTest, test_read_amplify) {
     result.size = 5 * kb;
     static_cast<void>(merge_reader.read_at(7 * kb, result, &bytes_read, 
nullptr));
     EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb);
-    EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
+    EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb);
 }
 
 TEST_F(BufferedReaderTest, test_merged_io) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to