morningman commented on code in PR #59307:
URL: https://github.com/apache/doris/pull/59307#discussion_r2706700447


##########
be/src/vec/exec/format/parquet/vparquet_page_reader.cpp:
##########
@@ -77,11 +83,79 @@ Status PageReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
         return Status::IOError("Should skip or load current page to get next 
page");
     }
 
+    _page_statistics.page_read_counter += 1;
+
+    // Parse page header from file; header bytes are saved for possible cache 
insertion
     const uint8_t* page_header_buf = nullptr;
     size_t max_size = _end_offset - _offset;
     size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
     const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 
20;
     uint32_t real_header_size = 0;
+
+    // Try a header-only lookup in the page cache. Cached pages store
+    // header + optional v2 levels + uncompressed payload, so we can
+    // parse the page header directly from the cached bytes and avoid
+    // a file read for the header.
+    if (_page_read_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        PageCacheHandle handle;
+        StoragePageCache::CacheKey key(fmt::format("{}::{}", _reader->path(), 
_reader->mtime()),
+                                       _end_offset, _offset);
+        if (StoragePageCache::instance()->lookup(key, &handle, 
segment_v2::DATA_PAGE)) {
+            // Parse header directly from cached data
+            _page_cache_handle = std::move(handle);
+            Slice s = _page_cache_handle.data();
+            real_header_size = cast_set<uint32_t>(s.size);
+            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
+            auto st = deserialize_thrift_msg(reinterpret_cast<const 
uint8_t*>(s.data),
+                                             &real_header_size, true, 
&_cur_page_header);
+            if (!st.ok()) return st;
+            // Increment page cache counters for a true cache hit on 
header+payload
+            _page_statistics.page_cache_hit_counter += 1;
+            // Detect whether the cached payload is compressed or decompressed 
and record
+            bool is_cache_payload_decompressed = true;
+            if (_cur_page_header.compressed_page_size > 0) {

Review Comment:
   Better extract this check logic:
   ```
     bool should_cache_decompressed(const tparquet::PageHeader* header,
                                   const tparquet::ColumnMetaData& metadata) {
         if (header->compressed_page_size <= 0) return true;
         if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return 
true;
   
         double ratio = static_cast<double>(header->uncompressed_page_size) /
                        static_cast<double>(header->compressed_page_size);
         return ratio <= config::parquet_page_cache_decompress_threshold;
     }
   ```
   And reuse it for both here and in 
`be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp`



##########
be/src/io/file_factory.cpp:
##########
@@ -203,6 +203,21 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
         const io::FileSystemProperties& system_properties,
         const io::FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
         RuntimeProfile* profile) {
+    auto reader_res = _create_file_reader_internal(system_properties, 
file_description,
+                                                   reader_options, profile);
+    if (!reader_res.has_value()) {
+        return unexpected(std::move(reader_res).error());
+    }
+    auto file_reader = std::move(reader_res).value();
+    LOG_INFO("create file reader for path={}, size={}, mtime={}", 
file_description.path,

Review Comment:
   Remove this log, or using DEBUG level



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -305,6 +495,32 @@ void ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_reserve_decompress_buf(siz
     }
 }
 
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+        const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+    StoragePageCache::CacheKey key(
+            fmt::format("{}::{}", _stream_reader->path(), 
_stream_reader->mtime()),
+            _page_reader->file_end_offset(), 
_page_reader->header_start_offset());
+    const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
+    size_t total = header_bytes.size() + level_bytes.size() + payload.size;
+    auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);

Review Comment:
   Potential memory leak?
   Better use a more safety way?



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -305,6 +495,32 @@ void ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_reserve_decompress_buf(siz
     }
 }
 
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+        const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+    StoragePageCache::CacheKey key(
+            fmt::format("{}::{}", _stream_reader->path(), 
_stream_reader->mtime()),

Review Comment:
   `fmt::format("{}::{}", _stream_reader->path(), _stream_reader->mtime()`
   this part is same for every page, better cache it to reuse:
   
   ```
     class ParquetPageCacheKeyBuilder {
         std::string _file_key_prefix;  // Cached once per column chunk
     public:
         void init(const std::string& path, int64_t mtime) {
             _file_key_prefix = fmt::format("{}::{}", path, mtime);
         }
         StoragePageCache::CacheKey make_key(uint64_t end_offset, uint64_t 
offset) const {
             return StoragePageCache::CacheKey(_file_key_prefix, end_offset, 
offset);
         }
     };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to