This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4a55b504c0 [feature-wip](parquet-reader) bug fix, get the correct group reader (#12294) 4a55b504c0 is described below commit 4a55b504c0ba8bb548d54c4011e6326435e75043 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Tue Sep 6 13:59:35 2022 +0800 [feature-wip](parquet-reader) bug fix, get the correct group reader (#12294) Fix the problem that cannot read the lineitem table of TPCH , and the error of allocate memory Co-authored-by: jinzhe <jin...@selectdb.com> --- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 51 ++++++++++------------ be/src/vec/exec/format/parquet/vparquet_reader.h | 11 ++--- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index bbb6a169b4..e29cca08e4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -44,6 +44,8 @@ void ParquetReader::close() { conjuncts.second.clear(); } _row_group_readers.clear(); + _read_row_groups.clear(); + _skipped_row_ranges.clear(); _slot_conjuncts.clear(); _file_reader->close(); delete _file_reader; @@ -98,31 +100,31 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup } Status ParquetReader::read_next_batch(Block* block, bool* eof) { - if (_row_group_readers.empty()) { + int32_t num_of_readers = _row_group_readers.size(); + DCHECK(num_of_readers <= _read_row_groups.size()); + if (_read_row_groups.empty()) { *eof = true; return Status::OK(); } - int32_t num_of_readers = _row_group_readers.size(); - DCHECK(num_of_readers <= _total_groups); bool _batch_eof = false; - auto row_group_reader = _row_group_readers[_current_row_group_id]; - RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, &_batch_eof)); + RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof)); if (_batch_eof) { - _current_row_group_id = _next_row_group_id(); - if (_current_row_group_id == -1 || _current_row_group_id >= num_of_readers) { + if (!_next_row_group_reader()) { *eof = true; + } else { + _read_row_groups.pop_front(); } } return Status::OK(); } -int32_t ParquetReader::_next_row_group_id() { - if (_read_row_groups.empty()) { - return -1; +bool ParquetReader::_next_row_group_reader() { + if (_row_group_readers.empty()) { + return false; } - auto group_id = _read_row_groups.front(); - _read_row_groups.pop_front(); - return group_id; + _current_group_reader = _row_group_readers.front(); + _row_group_readers.pop_front(); + return true; } Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& conjunct_ctxs) { @@ -131,18 +133,19 @@ Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& c for (auto row_group_id : _read_row_groups) { auto row_group = _t_metadata->row_groups[row_group_id]; auto column_chunks = row_group.columns; - std::vector<RowRange> skipped_row_ranges; if (_has_page_index(column_chunks)) { - RETURN_IF_ERROR(_process_page_index(row_group, skipped_row_ranges)); + RETURN_IF_ERROR(_process_page_index(row_group)); } std::shared_ptr<RowGroupReader> row_group_reader; row_group_reader.reset( new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz)); // todo: can filter row with candidate ranges rather than skipped ranges - RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), skipped_row_ranges)); + RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), _skipped_row_ranges)); _row_group_readers.emplace_back(row_group_reader); } - _current_row_group_id = _next_row_group_id(); + if (!_next_row_group_reader()) { + return Status::EndOfFile("No next reader"); + } return Status::OK(); } @@ -191,21 +194,16 @@ Status ParquetReader::_filter_row_groups() { if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size < 0) { return Status::EndOfFile("No row group need read"); } - int32_t row_group_idx = 0; - while (row_group_idx < _total_groups) { + for (int32_t row_group_idx = 0; row_group_idx < _total_groups; row_group_idx++) { const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_idx]; if (_is_misaligned_range_group(row_group)) { - row_group_idx++; continue; } bool filter_group = false; RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group)); if (!filter_group) { - _read_row_groups.push_back(row_group_idx); - row_group_idx++; - break; + _read_row_groups.emplace_back(row_group_idx); } - row_group_idx++; } return Status::OK(); } @@ -229,8 +227,7 @@ bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>& columns) return _page_index->check_and_get_page_index_ranges(columns); } -Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group, - std::vector<RowRange>& skipped_row_ranges) { +Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group) { int64_t buffer_size = _page_index->_column_index_size + _page_index->_offset_index_size; for (auto col_id : _include_column_ids) { uint8_t buff[buffer_size]; @@ -257,7 +254,7 @@ Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group, RowRange skipped_row_range; _page_index->create_skipped_row_range(offset_index, row_group.num_rows, page_id, &skipped_row_range); - skipped_row_ranges.emplace_back(skipped_row_range); + _skipped_row_ranges.emplace_back(skipped_row_range); } } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index d98825b6f0..9facffa623 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -82,14 +82,13 @@ public: int64_t size() const { return _file_reader->size(); } private: - int32_t _next_row_group_id(); + bool _next_row_group_reader(); Status _init_read_columns(const std::vector<SlotDescriptor*>& tuple_slot_descs); Status _init_row_group_readers(const std::vector<ExprContext*>& conjunct_ctxs); void _init_conjuncts(const std::vector<ExprContext*>& conjunct_ctxs); // Page Index Filter bool _has_page_index(std::vector<tparquet::ColumnChunk>& columns); - Status _process_page_index(tparquet::RowGroup& row_group, - std::vector<RowRange>& skipped_row_ranges); + Status _process_page_index(tparquet::RowGroup& row_group); // Row Group Filter bool _is_misaligned_range_group(const tparquet::RowGroup& row_group); @@ -113,8 +112,9 @@ private: FileReader* _file_reader; std::shared_ptr<FileMetaData> _file_metadata; tparquet::FileMetaData* _t_metadata; - std::shared_ptr<PageIndex> _page_index; - std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers; + std::unique_ptr<PageIndex> _page_index; + std::list<std::shared_ptr<RowGroupReader>> _row_group_readers; + std::shared_ptr<RowGroupReader> _current_group_reader; int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file int32_t _current_row_group_id; // std::shared_ptr<Statistics> _statistics; @@ -129,6 +129,7 @@ private: int64_t _range_start_offset; int64_t _range_size; cctz::time_zone* _ctz; + std::vector<RowRange> _skipped_row_ranges; const TupleDescriptor* _tuple_desc; // get all slot info }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org