This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a55b504c0 [feature-wip](parquet-reader) bug fix, get the correct 
group reader (#12294)
4a55b504c0 is described below

commit 4a55b504c0ba8bb548d54c4011e6326435e75043
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Tue Sep 6 13:59:35 2022 +0800

    [feature-wip](parquet-reader) bug fix, get the correct group reader (#12294)
    
    Fix the problem that cannot read the lineitem table of TPCH , and the error 
of allocate memory
    Co-authored-by: jinzhe <jin...@selectdb.com>
---
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 51 ++++++++++------------
 be/src/vec/exec/format/parquet/vparquet_reader.h   | 11 ++---
 2 files changed, 30 insertions(+), 32 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index bbb6a169b4..e29cca08e4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -44,6 +44,8 @@ void ParquetReader::close() {
         conjuncts.second.clear();
     }
     _row_group_readers.clear();
+    _read_row_groups.clear();
+    _skipped_row_ranges.clear();
     _slot_conjuncts.clear();
     _file_reader->close();
     delete _file_reader;
@@ -98,31 +100,31 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
 }
 
 Status ParquetReader::read_next_batch(Block* block, bool* eof) {
-    if (_row_group_readers.empty()) {
+    int32_t num_of_readers = _row_group_readers.size();
+    DCHECK(num_of_readers <= _read_row_groups.size());
+    if (_read_row_groups.empty()) {
         *eof = true;
         return Status::OK();
     }
-    int32_t num_of_readers = _row_group_readers.size();
-    DCHECK(num_of_readers <= _total_groups);
     bool _batch_eof = false;
-    auto row_group_reader = _row_group_readers[_current_row_group_id];
-    RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, 
&_batch_eof));
+    RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, 
&_batch_eof));
     if (_batch_eof) {
-        _current_row_group_id = _next_row_group_id();
-        if (_current_row_group_id == -1 || _current_row_group_id >= 
num_of_readers) {
+        if (!_next_row_group_reader()) {
             *eof = true;
+        } else {
+            _read_row_groups.pop_front();
         }
     }
     return Status::OK();
 }
 
-int32_t ParquetReader::_next_row_group_id() {
-    if (_read_row_groups.empty()) {
-        return -1;
+bool ParquetReader::_next_row_group_reader() {
+    if (_row_group_readers.empty()) {
+        return false;
     }
-    auto group_id = _read_row_groups.front();
-    _read_row_groups.pop_front();
-    return group_id;
+    _current_group_reader = _row_group_readers.front();
+    _row_group_readers.pop_front();
+    return true;
 }
 
 Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& 
conjunct_ctxs) {
@@ -131,18 +133,19 @@ Status ParquetReader::_init_row_group_readers(const 
std::vector<ExprContext*>& c
     for (auto row_group_id : _read_row_groups) {
         auto row_group = _t_metadata->row_groups[row_group_id];
         auto column_chunks = row_group.columns;
-        std::vector<RowRange> skipped_row_ranges;
         if (_has_page_index(column_chunks)) {
-            RETURN_IF_ERROR(_process_page_index(row_group, 
skipped_row_ranges));
+            RETURN_IF_ERROR(_process_page_index(row_group));
         }
         std::shared_ptr<RowGroupReader> row_group_reader;
         row_group_reader.reset(
                 new RowGroupReader(_file_reader, _read_columns, row_group_id, 
row_group, _ctz));
         // todo: can filter row with candidate ranges rather than skipped 
ranges
-        RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), 
skipped_row_ranges));
+        RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), 
_skipped_row_ranges));
         _row_group_readers.emplace_back(row_group_reader);
     }
-    _current_row_group_id = _next_row_group_id();
+    if (!_next_row_group_reader()) {
+        return Status::EndOfFile("No next reader");
+    }
     return Status::OK();
 }
 
@@ -191,21 +194,16 @@ Status ParquetReader::_filter_row_groups() {
     if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size < 
0) {
         return Status::EndOfFile("No row group need read");
     }
-    int32_t row_group_idx = 0;
-    while (row_group_idx < _total_groups) {
+    for (int32_t row_group_idx = 0; row_group_idx < _total_groups; 
row_group_idx++) {
         const tparquet::RowGroup& row_group = 
_t_metadata->row_groups[row_group_idx];
         if (_is_misaligned_range_group(row_group)) {
-            row_group_idx++;
             continue;
         }
         bool filter_group = false;
         RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
         if (!filter_group) {
-            _read_row_groups.push_back(row_group_idx);
-            row_group_idx++;
-            break;
+            _read_row_groups.emplace_back(row_group_idx);
         }
-        row_group_idx++;
     }
     return Status::OK();
 }
@@ -229,8 +227,7 @@ bool 
ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>& columns)
     return _page_index->check_and_get_page_index_ranges(columns);
 }
 
-Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
-                                          std::vector<RowRange>& 
skipped_row_ranges) {
+Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group) {
     int64_t buffer_size = _page_index->_column_index_size + 
_page_index->_offset_index_size;
     for (auto col_id : _include_column_ids) {
         uint8_t buff[buffer_size];
@@ -257,7 +254,7 @@ Status 
ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
             RowRange skipped_row_range;
             _page_index->create_skipped_row_range(offset_index, 
row_group.num_rows, page_id,
                                                   &skipped_row_range);
-            skipped_row_ranges.emplace_back(skipped_row_range);
+            _skipped_row_ranges.emplace_back(skipped_row_range);
         }
     }
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index d98825b6f0..9facffa623 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -82,14 +82,13 @@ public:
     int64_t size() const { return _file_reader->size(); }
 
 private:
-    int32_t _next_row_group_id();
+    bool _next_row_group_reader();
     Status _init_read_columns(const std::vector<SlotDescriptor*>& 
tuple_slot_descs);
     Status _init_row_group_readers(const std::vector<ExprContext*>& 
conjunct_ctxs);
     void _init_conjuncts(const std::vector<ExprContext*>& conjunct_ctxs);
     // Page Index Filter
     bool _has_page_index(std::vector<tparquet::ColumnChunk>& columns);
-    Status _process_page_index(tparquet::RowGroup& row_group,
-                               std::vector<RowRange>& skipped_row_ranges);
+    Status _process_page_index(tparquet::RowGroup& row_group);
 
     // Row Group Filter
     bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
@@ -113,8 +112,9 @@ private:
     FileReader* _file_reader;
     std::shared_ptr<FileMetaData> _file_metadata;
     tparquet::FileMetaData* _t_metadata;
-    std::shared_ptr<PageIndex> _page_index;
-    std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
+    std::unique_ptr<PageIndex> _page_index;
+    std::list<std::shared_ptr<RowGroupReader>> _row_group_readers;
+    std::shared_ptr<RowGroupReader> _current_group_reader;
     int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
     int32_t _current_row_group_id;
     //        std::shared_ptr<Statistics> _statistics;
@@ -129,6 +129,7 @@ private:
     int64_t _range_start_offset;
     int64_t _range_size;
     cctz::time_zone* _ctz;
+    std::vector<RowRange> _skipped_row_ranges;
 
     const TupleDescriptor* _tuple_desc; // get all slot info
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to