This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fa14b7ea9c [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024) fa14b7ea9c is described below commit fa14b7ea9c23bb56293c564ee3a5c0a0368b868a Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Sat Jan 28 00:04:27 2023 +0800 [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024) close #16023 --- .../exec/format/parquet/vparquet_group_reader.cpp | 197 +++++++++++++++++---- .../exec/format/parquet/vparquet_group_reader.h | 8 + 2 files changed, 168 insertions(+), 37 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 0c4ff2802c..f4818783f6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -98,42 +98,35 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); - Status st = - VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, block, block->columns()); + if (block->rows() == 0) { + *read_rows = block->rows(); + return Status::OK(); + } + + RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows)); + + std::vector<uint32_t> columns_to_filter; + int column_to_keep = block->columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + if (_lazy_read_ctx.vconjunct_ctx != nullptr) { + int result_column_id = -1; + RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &result_column_id)); + ColumnPtr filter_column = block->get_by_position(result_column_id).column; + RETURN_IF_ERROR(_filter_block(block, filter_column, column_to_keep, columns_to_filter)); + } else { + RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)); + } + *read_rows = block->rows(); - return st; + return Status::OK(); } } void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) { - // row_ranges is generated from page index, and the row index begins with 0 in each row group. - // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file - for (auto& range : row_ranges) { - int64_t start_row_id = range.first_row; - while (_position_delete_ctx.index < _position_delete_ctx.end_index) { - const int64_t& delete_row_id = - _position_delete_ctx.delete_rows[_position_delete_ctx.index] - - _position_delete_ctx.first_row_id; - if (delete_row_id < range.first_row) { - _position_delete_ctx.index++; - } else if (delete_row_id < range.last_row) { - if (start_row_id < delete_row_id) { - _read_ranges.emplace_back(start_row_id, delete_row_id); - } - start_row_id = delete_row_id + 1; - _position_delete_ctx.index++; - } else { // delete_row_id >= range.last_row - if (start_row_id < range.last_row) { - _read_ranges.emplace_back(start_row_id, range.last_row); - start_row_id = range.last_row + 1; - } - break; - } - } - if (start_row_id < range.last_row) { - _read_ranges.emplace_back(start_row_id, range.last_row); - } - } + _read_ranges = row_ranges; } Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns, @@ -194,6 +187,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, _lazy_read_ctx.predicate_missing_columns)); + + RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows)); + // generate filter vector if (_lazy_read_ctx.resize_first_column) { // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 @@ -262,8 +258,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // generated from next batch, so the filter column is removed ahead. DCHECK_EQ(block->rows(), 0); } else { - Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id, - origin_column_num); + ColumnPtr filter_column = block->get_by_position(filter_column_id).column; + RETURN_IF_ERROR(_filter_block(block, filter_column, origin_column_num, + _lazy_read_ctx.all_predicate_col_ids)); } } else { Block::erase_useless_column(block, origin_column_num); @@ -302,16 +299,23 @@ const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows, nullable_column->get_nested_column_ptr()->assume_mutable().get()); uint8_t* filter_data = concrete_column->get_data().data(); for (int i = 0; i < num_rows; ++i) { - // filter null if filter_column if nullable - filter_data[i] &= !null_map_column[i]; + (*_filter_ptr)[i] &= (!null_map_column[i]) & filter_data[i]; } - filter_map = filter_data; + filter_map = _filter_ptr->data(); } } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) { // filter all *can_filter_all = !const_column->get_bool(0); } else { - filter_map = assert_cast<const ColumnVector<UInt8>&>(*sv).get_data().data(); + const IColumn::Filter& filter = + assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*sv).get_data(); + + auto* __restrict filter_data = filter.data(); + const size_t size = filter.size(); + for (size_t i = 0; i < size; ++i) { + (*_filter_ptr)[i] &= filter_data[i]; + } + filter_map = filter.data(); } return filter_map; } @@ -434,6 +438,125 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b return Status::OK(); } +Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) { + _filter_ptr.reset(new IColumn::Filter(read_rows, 1)); + if (!_position_delete_ctx.has_filter) { + _total_read_rows += read_rows; + return Status::OK(); + } + while (_position_delete_ctx.index < _position_delete_ctx.end_index) { + const int64_t delete_row_index_in_row_group = + _position_delete_ctx.delete_rows[_position_delete_ctx.index] - + _position_delete_ctx.first_row_id; + int64_t read_range_rows = 0; + size_t remaining_read_rows = _total_read_rows + read_rows; + for (auto& range : _read_ranges) { + if (delete_row_index_in_row_group < range.first_row) { + ++_position_delete_ctx.index; + break; + } else if (delete_row_index_in_row_group < range.last_row) { + int64_t index = (delete_row_index_in_row_group - range.first_row) + + read_range_rows - _total_read_rows; + if (index > read_rows - 1) { + _total_read_rows += read_rows; + return Status::OK(); + } + (*_filter_ptr)[index] = 0; + ++_position_delete_ctx.index; + break; + } else { // delete_row >= range.last_row + } + + int64_t range_size = range.last_row - range.first_row; + // Don't search next range when there is no remaining_read_rows. + if (remaining_read_rows <= range_size) { + _total_read_rows += read_rows; + return Status::OK(); + } else { + remaining_read_rows -= range_size; + read_range_rows += range_size; + } + } + } + _total_read_rows += read_rows; + return Status::OK(); +} + +Status RowGroupReader::_filter_block(Block* block, const ColumnPtr filter_column, + int column_to_keep, std::vector<uint32_t> columns_to_filter) { + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { + ColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + + MutableColumnPtr mutable_holder = + nested_column->use_count() == 1 + ? nested_column->assume_mutable() + : nested_column->clone_resized(nested_column->size()); + + ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(mutable_holder.get()); + if (!concrete_column) { + return Status::InvalidArgument( + "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).", + filter_column->get_name()); + } + auto* __restrict null_map = nullable_column->get_null_map_data().data(); + IColumn::Filter& filter = concrete_column->get_data(); + auto* __restrict filter_data = filter.data(); + + const size_t size = filter.size(); + for (size_t i = 0; i < size; ++i) { + (*_filter_ptr)[i] &= (!null_map[i]) & filter_data[i]; + } + RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter)); + } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { + bool ret = const_column->get_bool(0); + if (!ret) { + for (auto& col : columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + } + } else { + const IColumn::Filter& filter = + assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*filter_column) + .get_data(); + + auto* __restrict filter_data = filter.data(); + const size_t size = filter.size(); + for (size_t i = 0; i < size; ++i) { + (*_filter_ptr)[i] &= filter_data[i]; + } + RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter)); + } + Block::erase_useless_column(block, column_to_keep); + return Status::OK(); +} + +Status RowGroupReader::_filter_block(Block* block, int column_to_keep, + const std::vector<uint32_t>& columns_to_filter) { + RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter)); + Block::erase_useless_column(block, column_to_keep); + + return Status::OK(); +} + +Status RowGroupReader::_filter_block_internal(Block* block, + const std::vector<uint32_t>& columns_to_filter) { + size_t count = _filter_ptr->size() - + simd::count_zero_num((int8_t*)_filter_ptr->data(), _filter_ptr->size()); + if (count == 0) { + for (auto& col : columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + } else { + for (auto& col : columns_to_filter) { + if (block->get_by_position(col).column->size() != count) { + block->get_by_position(col).column = + block->get_by_position(col).column->filter(*_filter_ptr, count); + } + } + } + return Status::OK(); +} + ParquetColumnReader::Statistics RowGroupReader::statistics() { ParquetColumnReader::Statistics st; for (auto& reader : _column_readers) { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index d53e32de2a..15ff153b60 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -131,6 +131,12 @@ private: Status _fill_missing_columns( Block* block, size_t rows, const std::unordered_map<std::string, VExprContext*>& missing_columns); + Status _build_pos_delete_filter(size_t read_rows); + Status _filter_block(Block* block, const ColumnPtr filter_column, int column_to_keep, + std::vector<uint32_t> columns_to_filter); + Status _filter_block(Block* block, int column_to_keep, + const vector<uint32_t>& columns_to_filter); + Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter); io::FileReaderSPtr _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; @@ -148,5 +154,7 @@ private: // If continuous batches are skipped, we can cache them to skip a whole page size_t _cached_filtered_rows = 0; std::unique_ptr<TextConverter> _text_converter = nullptr; + std::unique_ptr<IColumn::Filter> _filter_ptr = nullptr; + int64_t _total_read_rows = 0; }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org