This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 692176ec07 [feature-wip](parquet-reader) pre read page data in advance 
to avoid frequent seek (#12898)
692176ec07 is described below

commit 692176ec07b386bac3ef1ca5626a0b11dfd53d15
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Sun Sep 25 21:21:06 2022 +0800

    [feature-wip](parquet-reader) pre read page data in advance to avoid 
frequent seek (#12898)
    
    1. Fix the bug of file position in `HdfsFileReader`
    2. Reserve enough buffer for `ColumnColumnReader` to read large continuous 
memory
---
 be/src/common/config.h                             |  7 ++-
 be/src/io/buffered_reader.cpp                      | 51 ++++++++++------------
 be/src/io/buffered_reader.h                        | 11 +++--
 be/src/io/hdfs_file_reader.cpp                     |  8 +---
 .../parquet/vparquet_column_chunk_reader.cpp       |  5 ++-
 .../exec/format/parquet/vparquet_column_reader.cpp | 17 +++++---
 .../exec/format/parquet/vparquet_column_reader.h   |  7 +--
 .../exec/format/parquet/vparquet_group_reader.cpp  |  5 ++-
 .../exec/format/parquet/vparquet_page_reader.cpp   | 11 +++--
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 38 +++++++++++-----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  1 +
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  2 +-
 12 files changed, 93 insertions(+), 70 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index ad02df2efb..5b5eccdf0d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -823,7 +823,12 @@ CONF_Int32(object_pool_buffer_size, "100");
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
 CONF_Bool(parquet_predicate_push_down, "true");
-CONF_Int32(parquet_header_max_size, "8388608");
+// Max size of parquet page header in bytes
+CONF_mInt32(parquet_header_max_size_mb, "1");
+// Max buffer size for parquet row group
+CONF_mInt32(parquet_rowgroup_max_buffer_mb, "128");
+// Max buffer size for parquet chunk column
+CONF_mInt32(parquet_column_max_buffer_mb, "8");
 CONF_Bool(parquet_reader_using_internal, "false");
 
 // When the rows number reached this limit, will check the filter rate the of 
bloomfilter
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index ca40979321..c98b365235 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -186,40 +186,31 @@ bool BufferedReader::closed() {
 }
 
 BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t 
offset,
-                                                   uint64_t length)
-        : _file(file), _file_start_offset(offset), _file_end_offset(offset + 
length) {}
-
-Status BufferedFileStreamReader::seek(uint64_t position) {
-    if (_file_position != position) {
-        RETURN_IF_ERROR(_file->seek(position));
-        _file_position = position;
-    }
-    return Status::OK();
-}
+                                                   uint64_t length, size_t 
max_buf_size)
+        : _file(file),
+          _file_start_offset(offset),
+          _file_end_offset(offset + length),
+          _max_buf_size(max_buf_size) {}
 
 Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t 
offset,
-                                            size_t* bytes_to_read) {
-    if (offset < _file_start_offset) {
+                                            const size_t bytes_to_read) {
+    if (offset < _file_start_offset || offset >= _file_end_offset) {
         return Status::IOError("Out-of-bounds Access");
     }
-    if (offset >= _file_end_offset) {
-        *bytes_to_read = 0;
-        return Status::OK();
-    }
-    int64_t end_offset = offset + *bytes_to_read;
+    int64_t end_offset = offset + bytes_to_read;
     if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) {
         *buf = _buf.get() + offset - _buf_start_offset;
         return Status::OK();
     }
-    if (_buf_size < *bytes_to_read) {
-        size_t new_size = BitUtil::next_power_of_two(*bytes_to_read);
-        std::unique_ptr<uint8_t[]> new_buf(new uint8_t[new_size]);
+    size_t buf_size = std::max(_max_buf_size, bytes_to_read);
+    if (_buf_size < buf_size) {
+        std::unique_ptr<uint8_t[]> new_buf(new uint8_t[buf_size]);
         if (offset >= _buf_start_offset && offset < _buf_end_offset) {
             memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset,
                    _buf_end_offset - offset);
         }
         _buf = std::move(new_buf);
-        _buf_size = new_size;
+        _buf_size = buf_size;
     } else if (offset > _buf_start_offset && offset < _buf_end_offset) {
         memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, 
_buf_end_offset - offset);
     }
@@ -227,19 +218,25 @@ Status BufferedFileStreamReader::read_bytes(const 
uint8_t** buf, uint64_t offset
         _buf_end_offset = offset;
     }
     _buf_start_offset = offset;
-    int64_t to_read = end_offset - _buf_end_offset;
-    RETURN_IF_ERROR(seek(_buf_end_offset));
-    bool eof = false;
     int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
-    RETURN_IF_ERROR(_file->read(_buf.get() + buf_remaining, to_read, &to_read, 
&eof));
-    *bytes_to_read = buf_remaining + to_read;
+    int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - 
_buf_end_offset);
+    int64_t has_read = 0;
+    while (has_read < to_read) {
+        int64_t loop_read = 0;
+        RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - 
has_read, &loop_read,
+                                      _buf.get() + buf_remaining + has_read));
+        has_read += loop_read;
+    }
+    if (has_read != to_read) {
+        return Status::Corruption("Try to read {} bytes, but received {} 
bytes", to_read, has_read);
+    }
     _buf_end_offset += to_read;
     *buf = _buf.get();
     return Status::OK();
 }
 
 Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) {
-    return read_bytes((const uint8_t**)&slice.data, offset, &slice.size);
+    return read_bytes((const uint8_t**)&slice.data, offset, slice.size);
 }
 
 } // namespace doris
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index 2cfcaaa413..97ec01cc7d 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -93,7 +93,7 @@ public:
      * @param offset start offset ot read in stream
      * @param bytes_to_read bytes to read
      */
-    virtual Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* 
bytes_to_read) = 0;
+    virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const 
size_t bytes_to_read) = 0;
     /**
      * Save the data address to slice.data, and the slice.size is the bytes to 
read.
      */
@@ -103,10 +103,11 @@ public:
 
 class BufferedFileStreamReader : public BufferedStreamReader {
 public:
-    BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t 
length);
+    BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t 
length,
+                             size_t max_buf_size);
     ~BufferedFileStreamReader() override = default;
 
-    Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* 
bytes_to_read) override;
+    Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t 
bytes_to_read) override;
     Status read_bytes(Slice& slice, uint64_t offset) override;
 
 private:
@@ -115,12 +116,10 @@ private:
     uint64_t _file_start_offset;
     uint64_t _file_end_offset;
 
-    int64_t _file_position = -1;
     uint64_t _buf_start_offset = 0;
     uint64_t _buf_end_offset = 0;
     size_t _buf_size = 0;
-
-    Status seek(uint64_t position);
+    size_t _max_buf_size;
 };
 
 } // namespace doris
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
index 0112e880aa..37b2d73bba 100644
--- a/be/src/io/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -141,12 +141,7 @@ Status HdfsFileReader::read(uint8_t* buf, int64_t buf_len, 
int64_t* bytes_read,
 
 Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
     if (position != _current_offset) {
-        int ret = hdfsSeek(_hdfs_fs, _hdfs_file, position);
-        if (ret != 0) { // check fseek return value
-            return Status::InternalError("hdfsSeek failed.(BE: {}) 
namenode:{}, path:{}, err: {}",
-                                         BackendOptions::get_localhost(), 
_namenode, _path,
-                                         hdfsGetLastError());
-        }
+        seek(position);
     }
 
     *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
@@ -191,6 +186,7 @@ Status HdfsFileReader::seek(int64_t position) {
         return Status::InternalError("Seek to offset failed. (BE: {}) 
offset={}, err: {}",
                                      BackendOptions::get_localhost(), 
position, hdfsGetLastError());
     }
+    _current_offset = position;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 0e7d1cd4ec..193200f28d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -40,8 +40,9 @@ Status ColumnChunkReader::init() {
     if (_metadata.__isset.dictionary_page_offset) {
         // seek to the directory page
         _page_reader->seek_to_page(_metadata.dictionary_page_offset);
-        RETURN_IF_ERROR(_page_reader->next_page_header());
-        RETURN_IF_ERROR(_decode_dict_page());
+        // Parse dictionary data when reading
+        // RETURN_IF_ERROR(_page_reader->next_page_header());
+        // RETURN_IF_ERROR(_decode_dict_page());
     } else {
         // seek to the first data page
         _page_reader->seek_to_page(_metadata.data_page_offset);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 4c54d45aaa..3074705ffa 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -31,7 +31,8 @@ Status ParquetColumnReader::create(FileReader* file, 
FieldSchema* field,
                                    const ParquetReadColumn& column,
                                    const tparquet::RowGroup& row_group,
                                    std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz,
-                                   std::unique_ptr<ParquetColumnReader>& 
reader) {
+                                   std::unique_ptr<ParquetColumnReader>& 
reader,
+                                   size_t max_buf_size) {
     if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
         return Status::Corruption("not supported type");
     }
@@ -39,13 +40,13 @@ Status ParquetColumnReader::create(FileReader* file, 
FieldSchema* field,
         tparquet::ColumnChunk chunk = 
row_group.columns[field->children[0].physical_column_index];
         ArrayColumnReader* array_reader = new ArrayColumnReader(ctz);
         array_reader->init_column_metadata(chunk);
-        RETURN_IF_ERROR(array_reader->init(file, field, &chunk, row_ranges));
+        RETURN_IF_ERROR(array_reader->init(file, field, &chunk, row_ranges, 
max_buf_size));
         reader.reset(array_reader);
     } else {
         tparquet::ColumnChunk chunk = 
row_group.columns[field->physical_column_index];
         ScalarColumnReader* scalar_reader = new ScalarColumnReader(ctz);
         scalar_reader->init_column_metadata(chunk);
-        RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges));
+        RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges, 
max_buf_size));
         reader.reset(scalar_reader);
     }
     return Status::OK();
@@ -84,9 +85,10 @@ void ParquetColumnReader::_generate_read_ranges(int64_t 
start_index, int64_t end
 }
 
 Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
-                                std::vector<RowRange>& row_ranges) {
+                                std::vector<RowRange>& row_ranges, size_t 
max_buf_size) {
     _stream_reader =
-            new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size());
+            new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size(),
+                                         std::min((size_t)_metadata->size(), 
max_buf_size));
     _row_ranges = row_ranges;
     _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field, 
_ctz));
     RETURN_IF_ERROR(_chunk_reader->init());
@@ -210,9 +212,10 @@ void ArrayColumnReader::_reserve_def_levels_buf(size_t 
size) {
 }
 
 Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
-                               std::vector<RowRange>& row_ranges) {
+                               std::vector<RowRange>& row_ranges, size_t 
max_buf_size) {
     _stream_reader =
-            new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size());
+            new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size(),
+                                         std::min((size_t)_metadata->size(), 
max_buf_size));
     _row_ranges = row_ranges;
     _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, 
&field->children[0], _ctz));
     RETURN_IF_ERROR(_chunk_reader->init());
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 644da79abb..b245ff6aa4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -60,7 +60,8 @@ public:
                                     size_t* read_rows, bool* eof) = 0;
     static Status create(FileReader* file, FieldSchema* field, const 
ParquetReadColumn& column,
                          const tparquet::RowGroup& row_group, 
std::vector<RowRange>& row_ranges,
-                         cctz::time_zone* ctz, 
std::unique_ptr<ParquetColumnReader>& reader);
+                         cctz::time_zone* ctz, 
std::unique_ptr<ParquetColumnReader>& reader,
+                         size_t max_buf_size);
     void init_column_metadata(const tparquet::ColumnChunk& chunk);
     void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index 
= offset_index; }
     virtual void close() = 0;
@@ -84,7 +85,7 @@ public:
     ScalarColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {};
     ~ScalarColumnReader() override { close(); };
     Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* 
chunk,
-                std::vector<RowRange>& row_ranges);
+                std::vector<RowRange>& row_ranges, size_t max_buf_size);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t 
batch_size,
                             size_t* read_rows, bool* eof) override;
     Status _skip_values(size_t num_values);
@@ -97,7 +98,7 @@ public:
     ArrayColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {};
     ~ArrayColumnReader() override { close(); };
     Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* 
chunk,
-                std::vector<RowRange>& row_ranges);
+                std::vector<RowRange>& row_ranges, size_t max_buf_size);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t 
batch_size,
                             size_t* read_rows, bool* eof) override;
     void close() override;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index ae4505bdd7..0168a97a43 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -46,11 +46,14 @@ Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>
 Status RowGroupReader::_init_column_readers(
         const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
         std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
+    const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb 
<< 20;
+    const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 
20;
+    size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / 
_read_columns.size());
     for (auto& read_col : _read_columns) {
         auto field = 
const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
         std::unique_ptr<ParquetColumnReader> reader;
         RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, 
read_col, _row_group_meta,
-                                                    row_ranges, _ctz, reader));
+                                                    row_ranges, _ctz, reader, 
max_buf_size));
         auto col_iter = col_offsets.find(read_col._parquet_col_id);
         if (col_iter != col_offsets.end()) {
             tparquet::OffsetIndex oi = col_iter->second;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 80c6ff228b..baa88036c2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -23,7 +23,7 @@
 
 namespace doris::vectorized {
 
-static constexpr size_t initPageHeaderSize = 128;
+static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
 
 PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t 
length)
         : _reader(reader), _start_offset(offset), _end_offset(offset + length) 
{}
@@ -41,19 +41,19 @@ Status PageReader::next_page_header() {
 
     const uint8_t* page_header_buf = nullptr;
     size_t max_size = _end_offset - _offset;
-    size_t header_size = std::min(initPageHeaderSize, max_size);
+    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
+    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 
20;
     uint32_t real_header_size = 0;
     while (true) {
         header_size = std::min(header_size, max_size);
-        RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, 
&header_size));
+        RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, 
header_size));
         real_header_size = header_size;
         auto st =
                 deserialize_thrift_msg(page_header_buf, &real_header_size, 
true, &_cur_page_header);
         if (st.ok()) {
             break;
         }
-        if (_offset + header_size >= _end_offset ||
-            real_header_size > config::parquet_header_max_size) {
+        if (_offset + header_size >= _end_offset || real_header_size > 
MAX_PAGE_HEADER_SIZE) {
             return Status::IOError("Failed to deserialize parquet page 
header");
         }
         header_size <<= 2;
@@ -80,7 +80,6 @@ Status PageReader::get_page_data(Slice& slice) {
     }
     slice.size = _cur_page_header.compressed_page_size;
     RETURN_IF_ERROR(_reader->read_bytes(slice, _offset));
-    DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size);
     _offset += slice.size;
     _state = INITIALIZED;
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index b7372a7495..6d5718d181 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -17,6 +17,8 @@
 
 #include "vparquet_reader.h"
 
+#include <algorithm>
+
 #include "io/file_factory.h"
 #include "parquet_thrift_util.h"
 
@@ -45,6 +47,10 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams
 
 ParquetReader::~ParquetReader() {
     close();
+    if (_group_file_reader != _file_reader.get()) {
+        delete _group_file_reader;
+        _group_file_reader = nullptr;
+    }
 }
 
 void ParquetReader::close() {
@@ -63,9 +69,19 @@ void ParquetReader::close() {
 
 Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) {
     if (_file_reader == nullptr) {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, _scan_params, _scan_range, _file_reader,
-                config::remote_storage_read_buffer_mb * 1024 * 1024));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range,
+                                                        _file_reader, 2048));
+        // RowGroupReader has its own underlying buffer, so we should return 
file reader directly
+        // If RowGroupReaders use the same file reader with ParquetReader, the 
file position will change
+        // when ParquetReader try to read ColumnIndex meta, which causes 
performance cost
+        std::unique_ptr<FileReader> group_file_reader;
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range,
+                                                        group_file_reader, 0));
+        _group_file_reader = group_file_reader.release();
+        RETURN_IF_ERROR(_group_file_reader->open());
+    } else {
+        // test only
+        _group_file_reader = _file_reader.get();
     }
     RETURN_IF_ERROR(_file_reader->open());
     if (_file_reader->size() == 0) {
@@ -90,16 +106,18 @@ Status 
ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) {
 Status ParquetReader::_init_read_columns() {
     _include_column_ids.clear();
     for (auto& file_col_name : _column_names) {
-        // Get the Column Reader for the boolean column
         auto iter = _map_column.find(file_col_name);
-        auto parquet_col_id = iter->second;
         if (iter != _map_column.end()) {
-            _include_column_ids.emplace_back(parquet_col_id);
-            _read_columns.emplace_back(parquet_col_id, file_col_name);
-        } else {
-            continue;
+            _include_column_ids.emplace_back(iter->second);
         }
     }
+    // The same order as physical columns
+    std::sort(_include_column_ids.begin(), _include_column_ids.end());
+    _read_columns.clear();
+    for (int& parquet_col_id : _include_column_ids) {
+        _read_columns.emplace_back(parquet_col_id,
+                                   
_file_metadata->schema().get_column(parquet_col_id)->name);
+    }
     return Status::OK();
 }
 
@@ -148,7 +166,7 @@ Status ParquetReader::_init_row_group_readers(const 
std::vector<ExprContext*>& c
     for (auto row_group_id : _read_row_groups) {
         auto& row_group = _t_metadata->row_groups[row_group_id];
         std::shared_ptr<RowGroupReader> row_group_reader;
-        row_group_reader.reset(new RowGroupReader(_file_reader.get(), 
_read_columns, row_group_id,
+        row_group_reader.reset(new RowGroupReader(_group_file_reader, 
_read_columns, row_group_id,
                                                   row_group, _ctz));
         std::vector<RowRange> candidate_row_ranges;
         RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 098aa154e8..c91bc08059 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -123,6 +123,7 @@ private:
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
     std::unique_ptr<FileReader> _file_reader = nullptr;
+    FileReader* _group_file_reader = nullptr;
 
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 8785c297b1..4272954214 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -153,7 +153,7 @@ static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk*
                                   ? chunk_meta.dictionary_page_offset
                                   : chunk_meta.data_page_offset;
     size_t chunk_size = chunk_meta.total_compressed_size;
-    BufferedFileStreamReader stream_reader(file_reader, start_offset, 
chunk_size);
+    BufferedFileStreamReader stream_reader(file_reader, start_offset, 
chunk_size, 1024);
 
     cctz::time_zone ctz;
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to