AshinGau commented on code in PR #32785:
URL: https://github.com/apache/doris/pull/32785#discussion_r1543938247


##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -66,44 +68,48 @@ Status ColumnChunkReader::init() {
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
&_block_compress_codec));
     if (_metadata.__isset.dictionary_page_offset) {
         // seek to the directory page
-        _page_reader->seek_to_page(_metadata.dictionary_page_offset);
-        // Parse dictionary data when reading
-        // RETURN_IF_ERROR(_page_reader->next_page_header());
-        // RETURN_IF_ERROR(_decode_dict_page());
+        _page_reader->seek_page(_metadata.dictionary_page_offset);

Review Comment:
   why change `seek_to_page ` to `seek_page `, and remove the above comments.



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h:
##########
@@ -84,41 +84,34 @@ class ColumnChunkReader {
     };
 
     ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* 
column_chunk,
-                      FieldSchema* field_schema, cctz::time_zone* ctz, 
io::IOContext* io_ctx);
+                      FieldSchema* field_schema, const tparquet::OffsetIndex* 
offset_index,
+                      cctz::time_zone* ctz, io::IOContext* io_ctx);
     ~ColumnChunkReader() = default;
 
     // Initialize chunk reader, will generate the decoder and codec.
     Status init();
 
     // Whether the chunk reader has a more page to read.
-    bool has_next_page() { return _chunk_parsed_values < _metadata.num_values; 
}
-
-    // Deprecated
-    // Seek to the specific page, page_header_offset must be the start offset 
of the page header.
-    // _end_offset may exceed the actual data area, so we can only use the 
number of parsed values
-    // to determine whether there are remaining pages to read. That's to say 
we can't use the
-    // PageLocation in parquet metadata to seek to the specified page. We 
should call next_page()
-    // and skip_page() to skip pages one by one.
-    // todo: change this interface to seek_to_page(int64_t page_header_offset, 
size_t num_parsed_values)
-    // and set _chunk_parsed_values = num_parsed_values
-    // [[deprecated]]

Review Comment:
   It's best to keep this interface, only the page with `OffsetIndex` can call 
this function.



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -66,44 +68,48 @@ Status ColumnChunkReader::init() {
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
&_block_compress_codec));
     if (_metadata.__isset.dictionary_page_offset) {
         // seek to the directory page
-        _page_reader->seek_to_page(_metadata.dictionary_page_offset);
-        // Parse dictionary data when reading
-        // RETURN_IF_ERROR(_page_reader->next_page_header());
-        // RETURN_IF_ERROR(_decode_dict_page());
+        _page_reader->seek_page(_metadata.dictionary_page_offset);
     } else {
         // seek to the first data page
-        _page_reader->seek_to_page(_metadata.data_page_offset);
+        _page_reader->seek_page(_metadata.data_page_offset);
     }
     _state = INITIALIZED;
+
+    // the first page maybe directory page even if 
_metadata.__isset.dictionary_page_offset == false
+    RETURN_IF_ERROR(_page_reader->parse_page_header());
+    if (_page_reader->get_page_header()->type == 
tparquet::PageType::DICTIONARY_PAGE) {
+        // Parse dictionary data when reading
+        RETURN_IF_ERROR(_decode_dict_page());
+    }
     return Status::OK();
 }
 
 Status ColumnChunkReader::next_page() {
-    if (_state == HEADER_PARSED) {
-        return Status::OK();
-    }
     if (UNLIKELY(_state == NOT_INIT)) {
         return Status::Corruption("Should initialize chunk reader");
     }
     if (UNLIKELY(_remaining_num_values != 0)) {
         return Status::Corruption("Should skip current page");
     }
-    RETURN_IF_ERROR(_page_reader->next_page_header());
-    if (_page_reader->get_page_header()->type == 
tparquet::PageType::DICTIONARY_PAGE) {
-        // the first page maybe directory page even if 
_metadata.__isset.dictionary_page_offset == false,
-        // so we should parse the directory page in next_page()
-        RETURN_IF_ERROR(_decode_dict_page());
-        // parse the real first data page
-        return next_page();
-    } else if (_page_reader->get_page_header()->type == 
tparquet::PageType::DATA_PAGE_V2) {
+
+    if (_offset_index) {

Review Comment:
   Don't add is-else logic, the underlying `PageReader` supply the unified 
interface.



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h:
##########
@@ -186,13 +180,23 @@ class ColumnChunkReader {
     }
 
 private:
-    enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, 
DATA_LOADED, PAGE_SKIPPED };
+    enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, DATA_LOADED };
 
     Status _decode_dict_page();
     void _reserve_decompress_buf(size_t size);
     int32_t _get_type_length();
     void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, 
Slice& page_data);
 
+    //Returns the number of values in the current page.
+    int64_t _get_page_num_values() const {

Review Comment:
   Move to `PageReader`



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -776,9 +776,6 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         const FieldSchema* col_schema = schema_desc.get_column(read_col);
         static_cast<void>(page_index.collect_skipped_page_range(
                 &column_index, conjuncts, col_schema, skipped_page_range, 
*_ctz));
-        if (skipped_page_range.empty()) {

Review Comment:
   Never change the main logic in `ParquetReader`



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h:
##########
@@ -84,41 +84,34 @@ class ColumnChunkReader {
     };
 
     ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* 
column_chunk,
-                      FieldSchema* field_schema, cctz::time_zone* ctz, 
io::IOContext* io_ctx);
+                      FieldSchema* field_schema, const tparquet::OffsetIndex* 
offset_index,
+                      cctz::time_zone* ctz, io::IOContext* io_ctx);
     ~ColumnChunkReader() = default;
 
     // Initialize chunk reader, will generate the decoder and codec.
     Status init();
 
     // Whether the chunk reader has a more page to read.
-    bool has_next_page() { return _chunk_parsed_values < _metadata.num_values; 
}
-
-    // Deprecated
-    // Seek to the specific page, page_header_offset must be the start offset 
of the page header.
-    // _end_offset may exceed the actual data area, so we can only use the 
number of parsed values
-    // to determine whether there are remaining pages to read. That's to say 
we can't use the
-    // PageLocation in parquet metadata to seek to the specified page. We 
should call next_page()
-    // and skip_page() to skip pages one by one.
-    // todo: change this interface to seek_to_page(int64_t page_header_offset, 
size_t num_parsed_values)
-    // and set _chunk_parsed_values = num_parsed_values
-    // [[deprecated]]
-    void seek_to_page(int64_t page_header_offset) {
-        _remaining_num_values = 0;
-        _page_reader->seek_to_page(page_header_offset);
-        _state = INITIALIZED;
-    }
+    bool has_next_page() const { return _chunk_parsed_values < 
_metadata.num_values; }
 
     // Seek to next page. Only read and parse the page header.
     Status next_page();
 
     // Skip current page(will not read and parse) if the page is filtered by 
predicates.
     Status skip_page() {
         Status res = Status::OK();
-        _remaining_num_values = 0;
-        if (_state == HEADER_PARSED) {
-            res = _page_reader->skip_page();
+        if (_offset_index) {
+            _page_index++;
+            if (_page_index == _offset_index->page_locations.size()) {
+                return Status::EndOfFile("End of file");

Review Comment:
   Return other errors, don't return `EndOfFile`, because is has special 
meaning in `VfileScanner`.



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -66,44 +68,48 @@ Status ColumnChunkReader::init() {
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
&_block_compress_codec));
     if (_metadata.__isset.dictionary_page_offset) {
         // seek to the directory page
-        _page_reader->seek_to_page(_metadata.dictionary_page_offset);
-        // Parse dictionary data when reading
-        // RETURN_IF_ERROR(_page_reader->next_page_header());
-        // RETURN_IF_ERROR(_decode_dict_page());
+        _page_reader->seek_page(_metadata.dictionary_page_offset);
     } else {
         // seek to the first data page
-        _page_reader->seek_to_page(_metadata.data_page_offset);
+        _page_reader->seek_page(_metadata.data_page_offset);
     }
     _state = INITIALIZED;
+

Review Comment:
   dictionary page can be parsed lazily.



##########
be/src/vec/exec/format/parquet/vparquet_column_reader.cpp:
##########
@@ -118,13 +118,13 @@ static void fill_array_offset(FieldSchema* field, 
ColumnArray::Offsets64& offset
 Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
                                    const tparquet::RowGroup& row_group,
                                    const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz,
-                                   io::IOContext* io_ctx,
+                                   io::IOContext* io_ctx, const 
tparquet::OffsetIndex* offset_index,

Review Comment:
   The complex type column will get wrong `OffsetIndex`



##########
be/src/vec/exec/format/parquet/vparquet_page_reader.h:
##########
@@ -45,36 +45,41 @@ class PageReader {
                uint64_t length);
     ~PageReader() = default;
 
-    // Deprecated
-    // Parquet file may not be standardized,
-    // _end_offset may exceed the actual data area.
-    // ColumnChunkReader::has_next_page() use the number of parsed values for 
judgment
-    // [[deprecated]]
-    bool has_next_page() const { return _offset < _end_offset; }
+    Status parse_page_header();
 
-    Status next_page_header();
-
-    Status skip_page();
+    Status get_page_data(Slice& slice);
 
-    const tparquet::PageHeader* get_page_header() const { return 
&_cur_page_header; }
+    bool has_header_parsed() const { return _state == HEADER_PARSED; }
 
-    Status get_page_data(Slice& slice);
+    const tparquet::PageHeader* get_page_header() const {
+        DCHECK_EQ(_state, HEADER_PARSED);
+        return &_cur_page_header;
+    }
 
-    Statistics& statistics() { return _statistics; }
+    Status skip_page() {
+        if (UNLIKELY(_state != HEADER_PARSED)) {
+            return Status::IOError("Should generate page header first to skip 
current page");
+        }
+        _offset = _next_header_offset;
+        _state = INITIALIZED;
+        return Status::OK();
+    }
 
-    void seek_to_page(int64_t page_header_offset) {
+    void seek_page(int64_t page_header_offset) {
         _offset = page_header_offset;
         _next_header_offset = page_header_offset;
         _state = INITIALIZED;
     }
 
+    Statistics& statistics() { return _statistics; }
+
 private:
     enum PageReaderState { INITIALIZED, HEADER_PARSED };
 
     io::BufferedStreamReader* _reader = nullptr;
     io::IOContext* _io_ctx = nullptr;
-    tparquet::PageHeader _cur_page_header;
-    Statistics _statistics;
+    tparquet::PageHeader _cur_page_header {};

Review Comment:
   Why add `{}`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to