AshinGau commented on code in PR #12652: URL: https://github.com/apache/doris/pull/12652#discussion_r972541784
########## be/src/vec/exec/format/parquet/vparquet_column_reader.h: ########## @@ -77,6 +80,9 @@ class ParquetColumnReader { std::unique_ptr<ColumnChunkReader> _chunk_reader; std::unique_ptr<level_t[]> _def_levels_buf = nullptr; size_t _def_levels_buf_size = 0; + std::unique_ptr<tparquet::OffsetIndex> _offset_index; Review Comment: Why should release `tparquet::OffsetIndex`? Is it a heap or stack variable? It's better to release it in parent class if it's a heap variable. ########## be/src/vec/exec/format/parquet/vparquet_reader.cpp: ########## @@ -233,28 +233,57 @@ Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group) { int64_t bytes_read = 0; RETURN_IF_ERROR( _file_reader->readat(_page_index->_column_index_start, buffer_size, &bytes_read, buff)); - for (auto col_id : _include_column_ids) { - auto conjunct_iter = _slot_conjuncts.find(col_id); + + std::vector<RowRange> skipped_row_ranges; + for (auto& read_col : _read_columns) { + auto conjunct_iter = _slot_conjuncts.find(read_col._parquet_col_id); if (_slot_conjuncts.end() == conjunct_iter) { continue; } - auto& chunk = row_group.columns[col_id]; + auto& chunk = row_group.columns[read_col._parquet_col_id]; tparquet::ColumnIndex column_index; RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, &column_index)); - const int num_of_page = column_index.null_pages.size(); - if (num_of_page <= 1) { + const int num_of_pages = column_index.null_pages.size(); + if (num_of_pages <= 0) { break; } auto& conjuncts = conjunct_iter->second; - std::vector<int> candidate_page_range; - _page_index->collect_skipped_page_range(conjuncts, candidate_page_range); + std::vector<int> skipped_page_range; + _page_index->collect_skipped_page_range(&column_index, conjuncts, skipped_page_range); + if (skipped_page_range.empty()) { + return Status::OK(); + } tparquet::OffsetIndex offset_index; RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size, &offset_index)); - for (int page_id : candidate_page_range) { + for (int page_id : skipped_page_range) { RowRange skipped_row_range; _page_index->create_skipped_row_range(offset_index, row_group.num_rows, page_id, &skipped_row_range); - _skipped_row_ranges.emplace_back(skipped_row_range); + // use the union row range + skipped_row_ranges.emplace_back(skipped_row_range); + } + _col_offsets.emplace(read_col._parquet_col_id, offset_index); + } + if (skipped_row_ranges.empty()) { + return Status::OK(); + } + + std::sort(skipped_row_ranges.begin(), skipped_row_ranges.end(), + [](const RowRange& lhs, const RowRange& rhs) { + return std::tie(lhs.first_row, lhs.last_row) < + std::tie(rhs.first_row, rhs.last_row); + }); + int skip_end = -1; + for (auto& skip_range : skipped_row_ranges) { + LOG(WARNING) << skip_range.first_row << " " << skip_range.last_row << " | "; Review Comment: Too many logs ########## be/src/vec/exec/format/parquet/vparquet_reader.cpp: ########## @@ -232,28 +232,57 @@ Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group) { int64_t bytes_read = 0; RETURN_IF_ERROR( _file_reader->readat(_page_index->_column_index_start, buffer_size, &bytes_read, buff)); - for (auto col_id : _include_column_ids) { - auto conjunct_iter = _slot_conjuncts.find(col_id); + + std::vector<RowRange> skipped_row_ranges; + for (auto& read_col : _read_columns) { + auto conjunct_iter = _slot_conjuncts.find(read_col._parquet_col_id); if (_slot_conjuncts.end() == conjunct_iter) { continue; } - auto& chunk = row_group.columns[col_id]; + auto& chunk = row_group.columns[read_col._parquet_col_id]; tparquet::ColumnIndex column_index; RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, &column_index)); - const int num_of_page = column_index.null_pages.size(); - if (num_of_page <= 1) { + const int num_of_pages = column_index.null_pages.size(); + if (num_of_pages <= 0) { break; } auto& conjuncts = conjunct_iter->second; - std::vector<int> candidate_page_range; - _page_index->collect_skipped_page_range(conjuncts, candidate_page_range); + std::vector<int> skipped_page_range; + _page_index->collect_skipped_page_range(&column_index, conjuncts, skipped_page_range); + if (skipped_page_range.empty()) { + return Status::OK(); + } tparquet::OffsetIndex offset_index; RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size, &offset_index)); - for (int page_id : candidate_page_range) { + for (int page_id : skipped_page_range) { RowRange skipped_row_range; _page_index->create_skipped_row_range(offset_index, row_group.num_rows, page_id, &skipped_row_range); - _skipped_row_ranges.emplace_back(skipped_row_range); + // use the union row range + skipped_row_ranges.emplace_back(skipped_row_range); + } + _col_offsets.emplace(read_col._parquet_col_id, offset_index); + } + if (skipped_row_ranges.empty()) { + return Status::OK(); + } + + std::sort(skipped_row_ranges.begin(), skipped_row_ranges.end(), + [](const RowRange& lhs, const RowRange& rhs) { + return std::tie(lhs.first_row, lhs.last_row) < + std::tie(rhs.first_row, rhs.last_row); + }); + int skip_end = -1; + for (auto& skip_range : skipped_row_ranges) { + VLOG_DEBUG << skip_range.first_row << " " << skip_range.last_row << " | "; Review Comment: There is no description. Only numbers are output. It's kind of meaningless. ########## be/src/vec/exec/format/parquet/vparquet_page_index.cpp: ########## @@ -35,9 +36,34 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index, return Status::OK(); } -Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*> conjuncts, - std::vector<int> page_range) { - return Status(); +Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index, + std::vector<ExprContext*> conjuncts, + std::vector<int> skipped_ranges) { + const vector<std::string>& encoded_min_vals = column_index->min_values; + const vector<std::string>& encoded_max_vals = column_index->max_values; + DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size()); + + const int num_of_pages = column_index->null_pages.size(); + for (int page_id = 0; page_id < num_of_pages; page_id++) { + for (int i = 0; i < conjuncts.size(); i++) { + ExprContext* conjunct_expr = conjuncts[i]; + if (conjunct_expr->root()->get_child(1) == nullptr) { + // conjunct value is null + continue; + } + // bool is_null_page = column_index->null_pages[page_id]; + // if (UNLIKELY(is_null_page) && is_not_null_predicate()) { + // skipped_ranges.emplace_back(page_id); + // } + if (_filter_page_by_min_max(conjunct_expr, encoded_min_vals[page_id], + encoded_max_vals[page_id])) { + skipped_ranges.emplace_back(page_id); + break; + } + } + } + LOG(WARNING) << "skipped_ranges.size()=" << skipped_ranges.size(); Review Comment: Too many logs. Should remove, or use `DEBUG` instead. ########## be/src/vec/exec/format/parquet/vparquet_reader.cpp: ########## @@ -87,14 +89,14 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup auto parquet_col_id = iter->second; if (iter != _map_column.end()) { _include_column_ids.emplace_back(parquet_col_id); + ParquetReadColumn column(parquet_col_id, slot_desc); + _read_columns.emplace_back(column); Review Comment: Can write `_read_columns.emplace_back(parquet_col_id, slot_desc)` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org