This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d435f0de41 [feature-wip](parquet-reader) add page index row range (#12652) d435f0de41 is described below commit d435f0de4120e604e3590088954b5d8d99e91cfb Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Tue Sep 20 10:36:19 2022 +0800 [feature-wip](parquet-reader) add page index row range (#12652) Add some utils and provide the candidate row range (generated with skipped row range of each column) to read for page index filter this version support binary operator filter todo: - use context instead of structures in close() - process complex type filter - use this instead of row group minmax filter - refactor _eval_binary() for row group filter and page index filter --- be/src/exprs/expr_context.h | 2 + be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 44 +++++++++++++ .../exec/format/parquet/vparquet_column_reader.h | 2 + .../exec/format/parquet/vparquet_group_reader.cpp | 17 +++-- .../exec/format/parquet/vparquet_group_reader.h | 6 +- .../exec/format/parquet/vparquet_page_index.cpp | 37 +++++++++-- .../vec/exec/format/parquet/vparquet_page_index.h | 7 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 75 +++++++++++++++------- be/src/vec/exec/format/parquet/vparquet_reader.h | 7 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 5 +- 10 files changed, 161 insertions(+), 41 deletions(-) diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index b1df684a9d..ebd1b5968e 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -39,6 +39,7 @@ namespace doris { namespace vectorized { class VOlapScanNode; class ParquetReader; +class PageIndex; } // namespace vectorized class Expr; @@ -167,6 +168,7 @@ private: friend class EsPredicate; friend class RowGroupReader; friend class vectorized::ParquetReader; + friend class vectorized::PageIndex; friend class vectorized::VOlapScanNode; /// FunctionContexts for each registered expression. The FunctionContexts are created diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h index b58701418e..61750d0f29 100644 --- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h +++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h @@ -450,4 +450,48 @@ bool ParquetReader::_determine_filter_min_max(const std::vector<ExprContext*>& c return need_filter; } +void _eval_binary(Expr* conjunct, void* conjunct_value, const char* min_bytes, + const char* max_bytes, bool& need_filter) { + // todo: use this instead of row group minmax filter + Expr* expr = conjunct->get_child(1); + if (expr == nullptr) { + return; + } + auto conjunct_type = expr->type().type; + switch (conjunct->op()) { + case TExprOpcode::EQ: + need_filter = _eval_eq(conjunct_type, conjunct_value, min_bytes, max_bytes); + break; + case TExprOpcode::NE: + break; + case TExprOpcode::GT: + need_filter = _eval_gt(conjunct_type, conjunct_value, max_bytes); + break; + case TExprOpcode::GE: + need_filter = _eval_ge(conjunct_type, conjunct_value, max_bytes); + break; + case TExprOpcode::LT: + need_filter = _eval_lt(conjunct_type, conjunct_value, min_bytes); + break; + case TExprOpcode::LE: + need_filter = _eval_le(conjunct_type, conjunct_value, min_bytes); + break; + default: + break; + } +} + +bool PageIndex::_filter_page_by_min_max(ExprContext* conjunct_expr, const std::string& encoded_min, + const std::string& encoded_max) { + const char* min_bytes = encoded_min.data(); + const char* max_bytes = encoded_max.data(); + bool need_filter = false; + Expr* conjunct = conjunct_expr->root(); + void* conjunct_value = conjunct_expr->get_value(conjunct->get_child(1), nullptr); + if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { + _eval_binary(conjunct, conjunct_value, min_bytes, max_bytes, need_filter); + } + return need_filter; +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index fc197748b8..316410c34e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -63,6 +63,7 @@ public: const tparquet::RowGroup& row_group, std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader); void init_column_metadata(const tparquet::ColumnChunk& chunk); + void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; } virtual void close() = 0; protected: @@ -77,6 +78,7 @@ protected: std::unique_ptr<ColumnChunkReader> _chunk_reader; std::unique_ptr<level_t[]> _def_levels_buf = nullptr; size_t _def_levels_buf_size = 0; + tparquet::OffsetIndex* _offset_index; }; class ScalarColumnReader : public ParquetColumnReader { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 91b6a5aa18..2e8355dc4d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -17,7 +17,6 @@ #include "vparquet_group_reader.h" -#include "parquet_pred_cmp.h" #include "schema_desc.h" #include "vparquet_column_reader.h" @@ -37,14 +36,16 @@ RowGroupReader::~RowGroupReader() { _column_readers.clear(); } -Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges) { +Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, + std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { VLOG_DEBUG << "Row group id: " << _row_group_id; - RETURN_IF_ERROR(_init_column_readers(schema, row_ranges)); + RETURN_IF_ERROR(_init_column_readers(schema, row_ranges, col_offsets)); return Status::OK(); } -Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, - std::vector<RowRange>& row_ranges) { +Status RowGroupReader::_init_column_readers( + const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, + std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { for (auto& read_col : _read_columns) { SlotDescriptor* slot_desc = read_col._slot_desc; TypeDescriptor col_type = slot_desc->type(); @@ -52,10 +53,16 @@ Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, std::unique_ptr<ParquetColumnReader> reader; RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, row_ranges, _ctz, reader)); + auto col_iter = col_offsets.find(read_col._parquet_col_id); + if (col_iter != col_offsets.end()) { + tparquet::OffsetIndex oi = col_iter->second; + reader->add_offset_index(&oi); + } if (reader == nullptr) { VLOG_DEBUG << "Init row group reader failed"; return Status::Corruption("Init row group reader failed"); } + _column_readers[slot_desc->id()] = std::move(reader); } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 57c72d4863..b6ed8834da 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -35,11 +35,13 @@ public: const std::vector<ParquetReadColumn>& read_columns, const int32_t _row_group_id, tparquet::RowGroup& row_group, cctz::time_zone* ctz); ~RowGroupReader(); - Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges); + Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, + std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); Status next_batch(Block* block, size_t batch_size, bool* _batch_eof); private: - Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges); + Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, + std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); private: doris::FileReader* _file_reader; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp index d5df11a5af..4707e9fa21 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp @@ -17,6 +17,7 @@ #include "vparquet_page_index.h" +#include "parquet_pred_cmp.h" #include "util/thrift_util.h" namespace doris::vectorized { @@ -27,17 +28,43 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index, const auto& page_locations = offset_index.page_locations; DCHECK_LT(page_idx, page_locations.size()); row_range->first_row = page_locations[page_idx].first_row_index; + // the row range is right open section as "[first_row, last_row)" if (page_idx == page_locations.size() - 1) { - row_range->last_row = total_rows_of_group - 1; + row_range->last_row = total_rows_of_group; } else { - row_range->last_row = page_locations[page_idx + 1].first_row_index - 1; + row_range->last_row = page_locations[page_idx + 1].first_row_index; } return Status::OK(); } -Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*> conjuncts, - std::vector<int> page_range) { - return Status(); +Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index, + std::vector<ExprContext*> conjuncts, + std::vector<int>& skipped_ranges) { + const vector<std::string>& encoded_min_vals = column_index->min_values; + const vector<std::string>& encoded_max_vals = column_index->max_values; + DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size()); + + const int num_of_pages = column_index->null_pages.size(); + for (int page_id = 0; page_id < num_of_pages; page_id++) { + for (int i = 0; i < conjuncts.size(); i++) { + ExprContext* conjunct_expr = conjuncts[i]; + if (conjunct_expr->root()->get_child(1) == nullptr) { + // conjunct value is null + continue; + } + // bool is_null_page = column_index->null_pages[page_id]; + // if (UNLIKELY(is_null_page) && is_not_null_predicate()) { + // skipped_ranges.emplace_back(page_id); + // } + if (_filter_page_by_min_max(conjunct_expr, encoded_min_vals[page_id], + encoded_max_vals[page_id])) { + skipped_ranges.emplace_back(page_id); + break; + } + } + } + VLOG_DEBUG << "skipped_ranges.size()=" << skipped_ranges.size(); + return Status::OK(); } bool PageIndex::check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns) { diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index 5894a4e8d6..ea42da8509 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -31,13 +31,16 @@ public: ~PageIndex() = default; Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int total_rows_of_group, int page_idx, RowRange* row_range); - Status collect_skipped_page_range(std::vector<ExprContext*> conjuncts, - std::vector<int> page_range); + Status collect_skipped_page_range(tparquet::ColumnIndex* column_index, + std::vector<ExprContext*> conjuncts, + std::vector<int>& page_range); bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns); Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, tparquet::ColumnIndex* _column_index); Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, int64_t buffer_size, tparquet::OffsetIndex* _offset_index); + bool _filter_page_by_min_max(ExprContext* conjunct_expr, const std::string& encoded_min, + const std::string& encoded_max); private: friend class ParquetReader; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index e3eb31976b..92ffa1e70c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -42,11 +42,13 @@ void ParquetReader::close() { for (auto& conjuncts : _slot_conjuncts) { conjuncts.second.clear(); } + // todo: use context instead of these structures _row_group_readers.clear(); _read_row_groups.clear(); - _skipped_row_ranges.clear(); + _candidate_row_ranges.clear(); _slot_conjuncts.clear(); _file_reader->close(); + _col_offsets.clear(); delete _file_reader; } @@ -86,14 +88,14 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup auto parquet_col_id = iter->second; if (iter != _map_column.end()) { _include_column_ids.emplace_back(parquet_col_id); + ParquetReadColumn column(parquet_col_id, slot_desc); + _read_columns.emplace_back(column); } else { std::stringstream str_error; str_error << "Invalid Column Name:" << slot_desc->col_name(); VLOG_DEBUG << str_error.str(); return Status::InvalidArgument(str_error.str()); } - ParquetReadColumn column(slot_desc); - _read_columns.emplace_back(column); } return Status::OK(); } @@ -110,8 +112,6 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) { if (_batch_eof) { if (!_next_row_group_reader()) { *eof = true; - } else { - // _read_row_groups.pop_front(); } } return Status::OK(); @@ -131,12 +131,12 @@ Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& c RETURN_IF_ERROR(_filter_row_groups()); for (auto row_group_id : _read_row_groups) { auto& row_group = _t_metadata->row_groups[row_group_id]; - RETURN_IF_ERROR(_process_page_index(row_group)); std::shared_ptr<RowGroupReader> row_group_reader; row_group_reader.reset( new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz)); - // todo: can filter row with candidate ranges rather than skipped ranges - RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), _skipped_row_ranges)); + RETURN_IF_ERROR(_process_page_index(row_group)); + RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), _candidate_row_ranges, + _col_offsets)); _row_group_readers.emplace_back(row_group_reader); } if (!_next_row_group_reader()) { @@ -172,11 +172,11 @@ void ParquetReader::_init_conjuncts(const std::vector<ExprContext*>& conjunct_ct SlotId conjunct_slot_id = slot_ref->slot_id(); if (conjunct_slot_id == _tuple_desc->slots()[i]->id()) { // Get conjuncts by conjunct_slot_id - auto iter = _slot_conjuncts.find(conjunct_slot_id); + auto iter = _slot_conjuncts.find(parquet_col_id); if (_slot_conjuncts.end() == iter) { std::vector<ExprContext*> conjuncts; conjuncts.emplace_back(conjunct_ctxs[conj_idx]); - _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts)); + _slot_conjuncts.emplace(std::make_pair(parquet_col_id, conjuncts)); } else { std::vector<ExprContext*> conjuncts = iter->second; conjuncts.emplace_back(conjunct_ctxs[conj_idx]); @@ -232,28 +232,57 @@ Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group) { int64_t bytes_read = 0; RETURN_IF_ERROR( _file_reader->readat(_page_index->_column_index_start, buffer_size, &bytes_read, buff)); - for (auto col_id : _include_column_ids) { - auto conjunct_iter = _slot_conjuncts.find(col_id); + + std::vector<RowRange> skipped_row_ranges; + for (auto& read_col : _read_columns) { + auto conjunct_iter = _slot_conjuncts.find(read_col._parquet_col_id); if (_slot_conjuncts.end() == conjunct_iter) { continue; } - auto& chunk = row_group.columns[col_id]; + auto& chunk = row_group.columns[read_col._parquet_col_id]; tparquet::ColumnIndex column_index; RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, &column_index)); - const int num_of_page = column_index.null_pages.size(); - if (num_of_page <= 1) { + const int num_of_pages = column_index.null_pages.size(); + if (num_of_pages <= 0) { break; } auto& conjuncts = conjunct_iter->second; - std::vector<int> candidate_page_range; - _page_index->collect_skipped_page_range(conjuncts, candidate_page_range); + std::vector<int> skipped_page_range; + _page_index->collect_skipped_page_range(&column_index, conjuncts, skipped_page_range); + if (skipped_page_range.empty()) { + return Status::OK(); + } tparquet::OffsetIndex offset_index; RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size, &offset_index)); - for (int page_id : candidate_page_range) { + for (int page_id : skipped_page_range) { RowRange skipped_row_range; _page_index->create_skipped_row_range(offset_index, row_group.num_rows, page_id, &skipped_row_range); - _skipped_row_ranges.emplace_back(skipped_row_range); + // use the union row range + skipped_row_ranges.emplace_back(skipped_row_range); + } + _col_offsets.emplace(read_col._parquet_col_id, offset_index); + } + if (skipped_row_ranges.empty()) { + return Status::OK(); + } + + std::sort(skipped_row_ranges.begin(), skipped_row_ranges.end(), + [](const RowRange& lhs, const RowRange& rhs) { + return std::tie(lhs.first_row, lhs.last_row) < + std::tie(rhs.first_row, rhs.last_row); + }); + int skip_end = -1; + for (auto& skip_range : skipped_row_ranges) { + VLOG_DEBUG << skip_range.first_row << " " << skip_range.last_row << " | "; + if (skip_end + 1 >= skip_range.first_row) { + if (skip_end < skip_range.last_row) { + skip_end = skip_range.last_row; + } + } else { + // read row with candidate ranges rather than skipped ranges + _candidate_row_ranges.push_back({skip_end + 1, skip_range.first_row - 1}); + skip_end = skip_range.last_row; } } return Status::OK(); @@ -275,16 +304,16 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co std::unordered_set<int> _parquet_column_ids(_include_column_ids.begin(), _include_column_ids.end()); for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) { - auto slot_iter = _slot_conjuncts.find(slot_id); - if (slot_iter == _slot_conjuncts.end()) { - continue; - } const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name(); auto col_iter = _map_column.find(col_name); if (col_iter == _map_column.end()) { continue; } int parquet_col_id = col_iter->second; + auto slot_iter = _slot_conjuncts.find(parquet_col_id); + if (slot_iter == _slot_conjuncts.end()) { + continue; + } if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { // Column not exist in parquet file continue; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 95ffa10bd6..b0a0ea2d2b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -55,10 +55,12 @@ class ParquetReadColumn { public: friend class ParquetReader; friend class RowGroupReader; - ParquetReadColumn(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) {}; + ParquetReadColumn(int parquet_col_id, SlotDescriptor* slot_desc) + : _parquet_col_id(parquet_col_id), _slot_desc(slot_desc) {}; ~ParquetReadColumn() = default; private: + int _parquet_col_id; SlotDescriptor* _slot_desc; // int64_t start_offset; // int64_t chunk_size; @@ -130,7 +132,8 @@ private: int64_t _range_start_offset; int64_t _range_size; cctz::time_zone* _ctz; - std::vector<RowRange> _skipped_row_ranges; + std::vector<RowRange> _candidate_row_ranges; + std::unordered_map<int, tparquet::OffsetIndex> _col_offsets; const TupleDescriptor* _tuple_desc; // get all slot info }; diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index d6a9217307..94436c90c4 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -382,7 +382,7 @@ TEST_F(ParquetThriftReaderTest, group_reader) { std::vector<ParquetReadColumn> read_columns; for (const auto& slot : tuple_slots) { - read_columns.emplace_back(ParquetReadColumn(slot)); + read_columns.emplace_back(ParquetReadColumn(7, slot)); } LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); @@ -400,7 +400,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) { std::shared_ptr<RowGroupReader> row_group_reader; row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group, &ctz)); std::vector<RowRange> row_ranges = std::vector<RowRange>(); - auto stg = row_group_reader->init(meta_data->schema(), row_ranges); + auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>(); + auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets); EXPECT_TRUE(stg.ok()); vectorized::Block block; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org