This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit f251343fa42878e4e4454723c8d92924eca381ea Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Sat Dec 24 16:02:07 2022 +0800 [fix](multi-catalog) fix and optimize iceberg v2 reader (#15274) Fix three bugs when read iceberg v2 tables: 1. The `delete position` in `delete file` represents the position of delete row in the entire file, but the `read range` in `RowGroupReader` represents the position in current row group. Therefore, we need to subtract the position of first row of current row group from `delete position`. 2. When only reading the partition columns, `RowGroupReader` skips processing the `delete position`. 3. If the `delete position` has delete all rows in a row group, the `read range` is empty, but we read the whole row group in such case. Optimize four performance issues: 1. We change `delete position` to `delete range`, and then merge `delete range` and `read range` into the final read ranges. This process is too tedious and time-consuming. . we can merge `delete position` and `read range` directly. 2. `delete position` is ordered in a `delete file`, so we can use merge-sort, instead of ordered-set. 3. Initialize `RowGroupReader` when reading, instead of initialize all row groups when opening a `ParquetReader`, to save memory usage, and the same as `IcebergReader`. 4. Change the recursive call of `_do_lazy_read` to loop logic. --- be/src/service/internal_service.cpp | 2 +- be/src/vec/exec/format/csv/csv_reader.cpp | 4 +- be/src/vec/exec/format/csv/csv_reader.h | 4 +- be/src/vec/exec/format/generic_reader.h | 6 +- be/src/vec/exec/format/json/new_json_reader.cpp | 4 +- be/src/vec/exec/format/json/new_json_reader.h | 4 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 4 +- be/src/vec/exec/format/orc/vorc_reader.h | 4 +- be/src/vec/exec/format/parquet/parquet_common.h | 2 +- .../exec/format/parquet/vparquet_column_reader.cpp | 30 +-- .../exec/format/parquet/vparquet_column_reader.h | 19 +- .../exec/format/parquet/vparquet_group_reader.cpp | 199 +++++++++----- .../exec/format/parquet/vparquet_group_reader.h | 83 ++++-- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 243 +++++++---------- be/src/vec/exec/format/parquet/vparquet_reader.h | 30 ++- be/src/vec/exec/format/table/iceberg_reader.cpp | 293 ++++++++++----------- be/src/vec/exec/format/table/iceberg_reader.h | 30 +-- be/src/vec/exec/format/table/table_format_reader.h | 5 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 12 +- 19 files changed, 512 insertions(+), 466 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 28b9b8f4ca..381782af85 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -478,7 +478,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c } std::vector<std::string> col_names; std::vector<TypeDescriptor> col_types; - st = reader->get_parsered_schema(&col_names, &col_types); + st = reader->get_parsed_schema(&col_names, &col_types); if (!st.ok()) { LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); st.to_protobuf(result->mutable_status()); diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 2e710ca44f..48f8d84dc7 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -202,8 +202,8 @@ Status CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n return Status::OK(); } -Status CsvReader::get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { +Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { size_t read_line = 0; bool is_parse_name = false; RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name)); diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 5083c00d2d..5bb14523e3 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -49,8 +49,8 @@ public: // 1. header_type is empty, get schema from first line. // 2. header_type is CSV_WITH_NAMES, get schema from first line. // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line. - Status get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) override; + Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) override; private: // used for stream/broker load of csv file. diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index dd2bdd249c..30e93aacd8 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -39,9 +39,9 @@ public: return Status::NotSupported("get_columns is not implemented"); } - virtual Status get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { - return Status::NotSupported("get_parser_schema is not implemented for this reader."); + virtual Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { + return Status::NotSupported("get_parsed_schema is not implemented for this reader."); } virtual ~GenericReader() = default; diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 2228f9701d..68a3f089e5 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -139,8 +139,8 @@ Status NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } -Status NewJsonReader::get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { +Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { RETURN_IF_ERROR(_get_range_params()); RETURN_IF_ERROR(_open_file_reader()); diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 6b003c30fe..5a057d32fe 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -48,8 +48,8 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; - Status get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) override; + Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) override; private: Status _get_range_params(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 4998bc2486..2748160e97 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -176,8 +176,8 @@ Status OrcReader::init_reader( return Status::OK(); } -Status OrcReader::get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { +Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { if (_file_reader == nullptr) { std::unique_ptr<FileReader> inner_reader; RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path, diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 9b0e4b6a44..98c2fc7c02 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -99,8 +99,8 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; - Status get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) override; + Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) override; private: struct OrcProfile { diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 611ce969f1..112be6a223 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -41,7 +41,7 @@ namespace doris::vectorized { using level_t = int16_t; struct RowRange { - RowRange() {} + RowRange() = default; RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {} int64_t first_row; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index c0557c482a..75f43b4730 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -19,7 +19,6 @@ #include <common/status.h> #include <gen_cpp/parquet_types.h> -#include <vec/columns/columns_number.h> #include "schema_desc.h" #include "vec/data_types/data_type_array.h" @@ -28,7 +27,9 @@ namespace doris::vectorized { Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, - const tparquet::RowGroup& row_group, cctz::time_zone* ctz, + const ParquetReadColumn& column, + const tparquet::RowGroup& row_group, + const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size) { if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { @@ -36,13 +37,13 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, } if (field->type.type == TYPE_ARRAY) { tparquet::ColumnChunk chunk = row_group.columns[field->children[0].physical_column_index]; - ArrayColumnReader* array_reader = new ArrayColumnReader(ctz); + ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz); array_reader->init_column_metadata(chunk); RETURN_IF_ERROR(array_reader->init(file, field, &chunk, max_buf_size)); reader.reset(array_reader); } else { tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; - ScalarColumnReader* scalar_reader = new ScalarColumnReader(ctz); + ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, ctz); scalar_reader->init_column_metadata(chunk); RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, max_buf_size)); reader.reset(scalar_reader); @@ -61,23 +62,19 @@ void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chun void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end_index, std::list<RowRange>& read_ranges) { - if (_row_ranges->empty()) { - read_ranges.emplace_back(start_index, end_index); - return; - } int index = _row_range_index; - while (index < _row_ranges->size()) { - const RowRange& row_range = (*_row_ranges)[index]; - if (row_range.last_row <= start_index) { + while (index < _row_ranges.size()) { + const RowRange& read_range = _row_ranges[index]; + if (read_range.last_row <= start_index) { index++; _row_range_index++; continue; } - if (row_range.first_row >= end_index) { + if (read_range.first_row >= end_index) { break; } - int64_t start = row_range.first_row < start_index ? start_index : row_range.first_row; - int64_t end = row_range.last_row < end_index ? row_range.last_row : end_index; + int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row; + int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index; read_ranges.emplace_back(start, end); index++; } @@ -216,7 +213,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr // lazy read size_t remaining_num_values = 0; for (auto& range : read_ranges) { - remaining_num_values = range.last_row - range.first_row; + remaining_num_values += range.last_row - range.first_row; } if (batch_size >= remaining_num_values && select_vector.can_filter_all(remaining_num_values)) { @@ -330,6 +327,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& data_column = doris_column->assume_mutable(); } + // generate array offset size_t real_batch_size = 0; size_t num_values = 0; std::vector<size_t> element_offsets; @@ -339,7 +337,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& level_t* definitions = _def_levels_buf.get(); _chunk_reader->get_def_levels(definitions, num_values); _def_offset = 0; - // read_range delete_row_range + // generate the row ranges that should be read std::list<RowRange> read_ranges; _generate_read_ranges(_current_row_index, _current_row_index + real_batch_size, read_ranges); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index b7d062e7c6..de0ec185b9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -96,7 +96,8 @@ public: } }; - ParquetColumnReader(cctz::time_zone* ctz) : _ctz(ctz) {}; + ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) + : _row_ranges(row_ranges), _ctz(ctz) {}; virtual ~ParquetColumnReader() { if (_stream_reader != nullptr) { delete _stream_reader; @@ -106,12 +107,12 @@ public: virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof) = 0; - static Status create(FileReader* file, FieldSchema* field, const tparquet::RowGroup& row_group, - cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader, - size_t max_buf_size); + static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, + const tparquet::RowGroup& row_group, + const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, + std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size); void init_column_metadata(const tparquet::ColumnChunk& chunk); void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; } - void set_row_ranges(const std::vector<RowRange>* row_ranges) { _row_ranges = row_ranges; }; Statistics statistics() { return Statistics(_stream_reader->statistics(), _chunk_reader->statistics(), _decode_null_map_time); @@ -124,7 +125,7 @@ protected: BufferedFileStreamReader* _stream_reader; std::unique_ptr<ParquetColumnMetadata> _metadata; - const std::vector<RowRange>* _row_ranges; + const std::vector<RowRange>& _row_ranges; cctz::time_zone* _ctz; std::unique_ptr<ColumnChunkReader> _chunk_reader; tparquet::OffsetIndex* _offset_index; @@ -135,7 +136,8 @@ protected: class ScalarColumnReader : public ParquetColumnReader { public: - ScalarColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {}; + ScalarColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) + : ParquetColumnReader(row_ranges, ctz) {}; ~ScalarColumnReader() override { close(); }; Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, size_t max_buf_size); @@ -150,7 +152,8 @@ public: class ArrayColumnReader : public ParquetColumnReader { public: - ArrayColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {}; + ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) + : ParquetColumnReader(row_ranges, ctz) {}; ~ArrayColumnReader() override { close(); }; Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, size_t max_buf_size); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 42baf3a283..24441896a9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -23,26 +23,31 @@ namespace doris::vectorized { +const std::vector<int64_t> RowGroupReader::NO_DELETE = {}; + RowGroupReader::RowGroupReader(doris::FileReader* file_reader, const std::vector<ParquetReadColumn>& read_columns, - const RowGroupIndex& row_group_idx, - const tparquet::RowGroup& row_group, cctz::time_zone* ctz, + const int32_t row_group_id, const tparquet::RowGroup& row_group, + cctz::time_zone* ctz, + const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx) : _file_reader(file_reader), _read_columns(read_columns), - _row_group_idx(row_group_idx), + _row_group_id(row_group_id), _row_group_meta(row_group), _remaining_rows(row_group.num_rows), _ctz(ctz), + _position_delete_ctx(position_delete_ctx), _lazy_read_ctx(lazy_read_ctx) {} RowGroupReader::~RowGroupReader() { _column_readers.clear(); } -Status RowGroupReader::init(const FieldDescriptor& schema, +Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { - if (_read_columns.size() == 0) { + _merge_read_ranges(row_ranges); + if (_read_columns.empty()) { // Query task that only select columns in path. return Status::OK(); } @@ -52,15 +57,15 @@ Status RowGroupReader::init(const FieldDescriptor& schema, for (auto& read_col : _read_columns) { auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name)); std::unique_ptr<ParquetColumnReader> reader; - RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta, _ctz, - reader, max_buf_size)); + RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, + _read_ranges, _ctz, reader, max_buf_size)); auto col_iter = col_offsets.find(read_col._parquet_col_id); if (col_iter != col_offsets.end()) { tparquet::OffsetIndex oi = col_iter->second; reader->add_offset_index(&oi); } if (reader == nullptr) { - VLOG_DEBUG << "Init row group(" << _row_group_idx.row_group_id << ") reader failed"; + VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed"; return Status::Corruption("Init row group reader failed"); } _column_readers[read_col._file_slot_name] = std::move(reader); @@ -68,17 +73,11 @@ Status RowGroupReader::init(const FieldDescriptor& schema, return Status::OK(); } -void RowGroupReader::set_row_ranges(const std::vector<doris::vectorized::RowRange>& row_ranges) { - for (auto& read_col : _read_columns) { - _column_readers[read_col._file_slot_name]->set_row_ranges(&row_ranges); - } -} - Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows, - bool* _batch_eof) { + bool* batch_eof) { // Process external table query task that select columns are all from path. if (_read_columns.empty()) { - RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, _batch_eof)); + RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof)); RETURN_IF_ERROR( _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); @@ -90,11 +89,11 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ } if (_lazy_read_ctx.can_lazy_read) { // call _do_lazy_read recursively when current batch is skipped - return _do_lazy_read(block, batch_size, read_rows, _batch_eof); + return _do_lazy_read(block, batch_size, read_rows, batch_eof); } else { ColumnSelectVector run_length_vector; RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size, - read_rows, _batch_eof, run_length_vector)); + read_rows, batch_eof, run_length_vector)); RETURN_IF_ERROR( _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); @@ -106,8 +105,39 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ } } +void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) { + // row_ranges is generated from page index, and the row index begins with 0 in each row group. + // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file + for (auto& range : row_ranges) { + int64_t start_row_id = range.first_row; + while (_position_delete_ctx.index < _position_delete_ctx.end_index) { + const int64_t& delete_row_id = + _position_delete_ctx.delete_rows[_position_delete_ctx.index] - + _position_delete_ctx.first_row_id; + if (delete_row_id < range.first_row) { + _position_delete_ctx.index++; + } else if (delete_row_id < range.last_row) { + if (start_row_id < delete_row_id) { + _read_ranges.emplace_back(start_row_id, delete_row_id); + } + start_row_id = delete_row_id + 1; + _position_delete_ctx.index++; + } else { // delete_row_id >= range.last_row + if (start_row_id < range.last_row) { + _read_ranges.emplace_back(start_row_id, range.last_row); + start_row_id = range.last_row + 1; + } + break; + } + } + if (start_row_id < range.last_row) { + _read_ranges.emplace_back(start_row_id, range.last_row); + } + } +} + Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns, - size_t batch_size, size_t* read_rows, bool* _batch_eof, + size_t batch_size, size_t* read_rows, bool* batch_eof, ColumnSelectVector& select_vector) { size_t batch_read_rows = 0; bool has_eof = false; @@ -138,57 +168,74 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st col_idx++; } *read_rows = batch_read_rows; - *_batch_eof = has_eof; + *batch_eof = has_eof; return Status::OK(); } Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof) { - // read predicate columns + std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr; size_t pre_read_rows; bool pre_eof; - ColumnSelectVector run_length_vector; - RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size, - &pre_read_rows, &pre_eof, run_length_vector)); - RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, - _lazy_read_ctx.predicate_partition_columns)); - RETURN_IF_ERROR( - _fill_missing_columns(block, pre_read_rows, _lazy_read_ctx.predicate_missing_columns)); - // generate filter vector - if (_lazy_read_ctx.resize_first_column) { - // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 - // The following process may be tricky and time-consuming, but we have no other way. - block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows); - } size_t origin_column_num = block->columns(); int filter_column_id = -1; - RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id)); - ColumnPtr& sv = block->get_by_position(filter_column_id).column; - if (_lazy_read_ctx.resize_first_column) { - // We have to clean the first column to insert right data. - block->get_by_position(0).column->assume_mutable()->clear(); - } - - // build filter map - bool can_filter_all = false; - const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all); - ColumnSelectVector select_vector(filter_map, pre_read_rows, can_filter_all); - if (select_vector.filter_all() && !pre_eof) { - // If continuous batches are skipped, we can cache them to skip a whole page - _cached_filtered_rows += pre_read_rows; - for (auto& col : _lazy_read_ctx.predicate_columns) { - // clean block to read predicate columns - block->get_by_name(col).column->assume_mutable()->clear(); + while (true) { + // read predicate columns + pre_read_rows = 0; + pre_eof = false; + ColumnSelectVector run_length_vector; + RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size, + &pre_read_rows, &pre_eof, run_length_vector)); + if (pre_read_rows == 0) { + DCHECK_EQ(pre_eof, true); + break; } - for (auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_name(col.first).column->assume_mutable()->clear(); + RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, + _lazy_read_ctx.predicate_partition_columns)); + RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, + _lazy_read_ctx.predicate_missing_columns)); + // generate filter vector + if (_lazy_read_ctx.resize_first_column) { + // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 + // The following process may be tricky and time-consuming, but we have no other way. + block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows); } - for (auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_name(col.first).column->assume_mutable()->clear(); + RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id)); + ColumnPtr& sv = block->get_by_position(filter_column_id).column; + if (_lazy_read_ctx.resize_first_column) { + // We have to clean the first column to insert right data. + block->get_by_position(0).column->assume_mutable()->clear(); } - Block::erase_useless_column(block, origin_column_num); - return _do_lazy_read(block, batch_size, read_rows, batch_eof); + + // build filter map + bool can_filter_all = false; + const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all); + select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all)); + if (select_vector_ptr->filter_all() && !pre_eof) { + // If continuous batches are skipped, we can cache them to skip a whole page + _cached_filtered_rows += pre_read_rows; + for (auto& col : _lazy_read_ctx.predicate_columns) { + // clean block to read predicate columns + block->get_by_name(col).column->assume_mutable()->clear(); + } + for (auto& col : _lazy_read_ctx.predicate_partition_columns) { + block->get_by_name(col.first).column->assume_mutable()->clear(); + } + for (auto& col : _lazy_read_ctx.predicate_missing_columns) { + block->get_by_name(col.first).column->assume_mutable()->clear(); + } + Block::erase_useless_column(block, origin_column_num); + } else { + break; + } + } + if (select_vector_ptr == nullptr) { + DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0); + *read_rows = 0; + *batch_eof = true; } + + ColumnSelectVector& select_vector = *select_vector_ptr; std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr; if (_cached_filtered_rows != 0) { _rebuild_select_vector(select_vector, rebuild_filter_map, pre_read_rows); @@ -345,15 +392,37 @@ Status RowGroupReader::_fill_missing_columns( return Status::OK(); } -Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) { - if (batch_size < _remaining_rows) { - *read_rows = batch_size; - _remaining_rows -= batch_size; - *_batch_eof = false; +Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) { + if (_position_delete_ctx.has_filter) { + int64_t start_row_id = _position_delete_ctx.current_row_id; + int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size, + _position_delete_ctx.last_row_id); + int64_t num_delete_rows = 0; + while (_position_delete_ctx.index < _position_delete_ctx.end_index) { + const int64_t& delete_row_id = + _position_delete_ctx.delete_rows[_position_delete_ctx.index]; + if (delete_row_id < start_row_id) { + _position_delete_ctx.index++; + } else if (delete_row_id < end_row_id) { + num_delete_rows++; + _position_delete_ctx.index++; + } else { // delete_row_id >= end_row_id + break; + } + } + *read_rows = end_row_id - start_row_id - num_delete_rows; + _position_delete_ctx.current_row_id = end_row_id; + *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id; } else { - *read_rows = _remaining_rows; - _remaining_rows = 0; - *_batch_eof = true; + if (batch_size < _remaining_rows) { + *read_rows = batch_size; + _remaining_rows -= batch_size; + *batch_eof = false; + } else { + *read_rows = _remaining_rows; + _remaining_rows = 0; + *batch_eof = true; + } } return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index ef73eba21c..53e8a052c0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -25,16 +25,18 @@ namespace doris::vectorized { -struct RowGroupIndex { - int32_t row_group_id; - int32_t first_row; - int32_t last_row; - RowGroupIndex(int32_t id, int32_t first, int32_t last) - : row_group_id(id), first_row(first), last_row(last) {} -}; - class RowGroupReader { public: + static const std::vector<int64_t> NO_DELETE; + + struct RowGroupIndex { + int32_t row_group_id; + int64_t first_row; + int64_t last_row; + RowGroupIndex(int32_t id, int64_t first, int64_t last) + : row_group_id(id), first_row(first), last_row(last) {} + }; + struct LazyReadContext { VExprContext* vconjunct_ctx = nullptr; bool can_lazy_read = false; @@ -56,26 +58,66 @@ public: std::unordered_map<std::string, VExprContext*> missing_columns; }; + /** + * Support row-level delete in iceberg: + * https://iceberg.apache.org/spec/#position-delete-files + */ + struct PositionDeleteContext { + // the filtered rows in current row group + const std::vector<int64_t>& delete_rows; + // the first row id of current row group in parquet file + const int64_t first_row_id; + // the number of rows in current row group + const int64_t num_rows; + const int64_t last_row_id; + // current row id to read in the row group + int64_t current_row_id; + // start index in delete_rows + const int64_t start_index; + // end index in delete_rows + const int64_t end_index; + // current index in delete_rows + int64_t index; + const bool has_filter; + + PositionDeleteContext(const std::vector<int64_t>& delete_rows, const int64_t num_rows, + const int64_t first_row_id, const int64_t start_index, + const int64_t end_index) + : delete_rows(delete_rows), + first_row_id(first_row_id), + num_rows(num_rows), + last_row_id(first_row_id + num_rows), + current_row_id(first_row_id), + start_index(start_index), + end_index(end_index), + index(start_index), + has_filter(end_index > start_index) {} + + PositionDeleteContext(const int64_t num_rows, const int64_t first_row) + : PositionDeleteContext(NO_DELETE, num_rows, first_row, 0, 0) {} + + PositionDeleteContext(const PositionDeleteContext& filter) = default; + }; + RowGroupReader(doris::FileReader* file_reader, - const std::vector<ParquetReadColumn>& read_columns, - const RowGroupIndex& _row_group_idx, const tparquet::RowGroup& row_group, - cctz::time_zone* ctz, const LazyReadContext& lazy_read_ctx); + const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, + const tparquet::RowGroup& row_group, cctz::time_zone* ctz, + const PositionDeleteContext& position_delete_ctx, + const LazyReadContext& lazy_read_ctx); ~RowGroupReader(); - Status init(const FieldDescriptor& schema, + Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); - Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof); - int64_t lazy_read_filtered_rows() { return _lazy_read_filtered_rows; } - const RowGroupIndex& index() { return _row_group_idx; } - void set_row_ranges(const std::vector<doris::vectorized::RowRange>& row_ranges); + Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } ParquetColumnReader::Statistics statistics(); private: - Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof); + void _merge_read_ranges(std::vector<RowRange>& row_ranges); + Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof); Status _read_column_data(Block* block, const std::vector<std::string>& columns, - size_t batch_size, size_t* read_rows, bool* _batch_eof, + size_t batch_size, size_t* read_rows, bool* batch_eof, ColumnSelectVector& select_vector); Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* can_filter_all); @@ -92,10 +134,13 @@ private: doris::FileReader* _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; const std::vector<ParquetReadColumn>& _read_columns; - const RowGroupIndex& _row_group_idx; + const int32_t _row_group_id; const tparquet::RowGroup& _row_group_meta; int64_t _remaining_rows; cctz::time_zone* _ctz; + PositionDeleteContext _position_delete_ctx; + // merge the row ranges generated from page index and position delete. + std::vector<RowRange> _read_ranges; const LazyReadContext& _lazy_read_ctx; int64_t _lazy_read_filtered_rows = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index c8d4f6b3c4..6613b1f987 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -19,6 +19,7 @@ #include <algorithm> +#include "common/status.h" #include "io/file_factory.h" #include "parquet_pred_cmp.h" #include "parquet_thrift_util.h" @@ -150,24 +151,12 @@ Status ParquetReader::_open_file() { return Status::OK(); } -Status ParquetReader::file_metadata(FileMetaData** metadata) { - Status open_status = _open_file(); - if (!open_status.ok()) { - return open_status; - } - *metadata = _file_metadata.get(); - return Status::OK(); -} - Status ParquetReader::init_reader( const std::vector<std::string>& column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, VExprContext* vconjunct_ctx, bool filter_groups) { SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - Status open_status = _open_file(); - if (!open_status.ok()) { - return open_status; - } + RETURN_IF_ERROR(_open_file()); _column_names = &column_names; _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); @@ -182,7 +171,7 @@ Status ParquetReader::init_reader( RETURN_IF_ERROR(_init_read_columns()); // build column predicates for column lazy read _lazy_read_ctx.vconjunct_ctx = vconjunct_ctx; - RETURN_IF_ERROR(_init_row_group_readers(filter_groups)); + RETURN_IF_ERROR(_init_row_groups(filter_groups)); return Status::OK(); } @@ -317,20 +306,9 @@ std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type( return map; } -Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { - if (_file_reader == nullptr) { - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path, - _scan_range.start_offset, - _scan_range.file_size, 0, _file_reader)); - } - if (_file_metadata == nullptr) { - RETURN_IF_ERROR(_file_reader->open()); - if (_file_reader->size() == 0) { - return Status::EndOfFile("Empty Parquet File"); - } - RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); - } +Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { + RETURN_IF_ERROR(_open_file()); _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); @@ -362,143 +340,90 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } -void ParquetReader::merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges) { - if (_row_ranges.empty()) { - _current_group_reader->set_row_ranges(_row_ranges); - return; - } - if (!delete_row_ranges.empty()) { - std::vector<RowRange> candidate_ranges; - auto start_range = _row_ranges.begin(); - auto delete_range = delete_row_ranges.begin(); - int64_t processed_range_start_idx = start_range->first_row; - while (start_range != _row_ranges.end() && delete_range != delete_row_ranges.end()) { - int64_t delete_start = delete_range->first_row; - int64_t delete_end = delete_range->last_row; - int64_t range_start = start_range->first_row; - int64_t range_end = start_range->last_row; - if (delete_end > range_end) { - if (range_start < processed_range_start_idx) { - // rows before processed_range_start_idx have been processed - range_start = processed_range_start_idx; - } - if (range_end < delete_start) { - /** - * start_range - * || --------- || - |--------- | - * delete_range - */ - candidate_ranges.emplace_back(range_start, range_end); - } else if (range_start < delete_start) { - /** - * row_range - * || --------|-------- || ----- | - * delete_start delete_end - */ - candidate_ranges.emplace_back(range_start, delete_start); - } - // range_end > delete_end && range_start > delete_start - start_range++; - } else { - // delete_end < range_end,most of the time, we will use this branch - if (processed_range_start_idx < delete_start) { - /** - * row_range_start row_range_end - * || --- | --------- | --- || - * delete_range - */ - candidate_ranges.emplace_back(processed_range_start_idx, delete_start); - } - // delete_end is in row_range, so it can assign to processed_range_start_idx - processed_range_start_idx = delete_end; - delete_range++; - if (delete_range == delete_row_ranges.end()) { - range_end = _row_ranges[_row_ranges.size() - 1].last_row; - if (processed_range_start_idx != range_end) { - candidate_ranges.emplace_back(processed_range_start_idx, range_end); - } - } - } - } - _row_ranges.assign(candidate_ranges.begin(), candidate_ranges.end()); - } - _current_group_reader->set_row_ranges(_row_ranges); -} - Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - int32_t num_of_readers = _row_group_readers.size(); - DCHECK(num_of_readers <= _read_row_groups.size()); - if (_read_row_groups.empty()) { - *eof = true; - return Status::OK(); + if (_current_group_reader == nullptr) { + if (_read_row_groups.size() > 0) { + RETURN_IF_ERROR(_next_row_group_reader()); + } else { + *read_rows = 0; + *eof = true; + return Status::OK(); + } } - bool _batch_eof = false; + DCHECK(_current_group_reader != nullptr); + bool batch_eof = false; { SCOPED_RAW_TIMER(&_statistics.column_read_time); RETURN_IF_ERROR( - _current_group_reader->next_batch(block, _batch_size, read_rows, &_batch_eof)); + _current_group_reader->next_batch(block, _batch_size, read_rows, &batch_eof)); } - if (_batch_eof) { + if (batch_eof) { auto column_st = _current_group_reader->statistics(); _column_statistics.merge(column_st); _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows(); - if (!_next_row_group_reader()) { + Status st = _next_row_group_reader(); + if (st.is_end_of_file()) { *eof = true; + } else if (!st.ok()) { + return st; } } - VLOG_DEBUG << "ParquetReader::get_next_block: " << block->rows(); return Status::OK(); } -bool ParquetReader::_next_row_group_reader() { - if (_row_group_readers.empty()) { - return false; - } - _current_group_reader = _row_group_readers.front(); - _row_group_readers.pop_front(); - return true; +RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx( + const tparquet::RowGroup& row_group, const RowGroupReader::RowGroupIndex& row_group_index) { + if (_delete_rows == nullptr) { + return RowGroupReader::PositionDeleteContext(row_group.num_rows, row_group_index.first_row); + } + int64_t* delete_rows = const_cast<int64_t*>(&(*_delete_rows)[0]); + int64_t* delete_rows_end = delete_rows + _delete_rows->size(); + int64_t* start_pos = std::lower_bound(delete_rows + _delete_rows_index, delete_rows_end, + row_group_index.first_row); + int64_t start_index = start_pos - delete_rows; + int64_t* end_pos = std::lower_bound(start_pos, delete_rows_end, row_group_index.last_row); + int64_t end_index = end_pos - delete_rows; + _delete_rows_index = end_index; + return RowGroupReader::PositionDeleteContext(*_delete_rows, row_group.num_rows, + row_group_index.first_row, start_index, end_index); } -Status ParquetReader::_init_row_group_readers(const bool& filter_groups) { - std::vector<RowGroupIndex> group_indexes; - RETURN_IF_ERROR(_filter_row_groups(filter_groups, group_indexes)); - DCHECK_EQ(group_indexes.size(), _read_row_groups.size()); - auto group_index = group_indexes.begin(); - for (auto row_group_id : _read_row_groups) { - auto& row_group = _t_metadata->row_groups[row_group_id]; - std::shared_ptr<RowGroupReader> row_group_reader; - row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, *group_index, - row_group, _ctz, _lazy_read_ctx)); - group_index++; - RETURN_IF_ERROR(_process_page_index(row_group)); - if (_row_ranges.empty()) { - _row_ranges.emplace_back(0, row_group.num_rows); - _statistics.read_rows += row_group.num_rows; - } - RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), _col_offsets)); - row_group_reader->set_row_ranges(_row_ranges); - _row_group_readers.emplace_back(row_group_reader); - } - if (!_next_row_group_reader()) { - return Status::EndOfFile("No next reader"); - } - return Status::OK(); +Status ParquetReader::_next_row_group_reader() { + if (_read_row_groups.empty()) { + _current_group_reader.reset(nullptr); + return Status::EndOfFile("No next RowGroupReader"); + } + RowGroupReader::RowGroupIndex row_group_index = _read_row_groups.front(); + _read_row_groups.pop_front(); + + // process page index and generate the ranges to read + auto& row_group = _t_metadata->row_groups[row_group_index.row_group_id]; + std::vector<RowRange> candidate_row_ranges; + RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges)); + + RowGroupReader::PositionDeleteContext position_delete_ctx = + _get_position_delete_ctx(row_group, row_group_index); + _current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, + row_group_index.row_group_id, row_group, _ctz, + position_delete_ctx, _lazy_read_ctx)); + return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, + _col_offsets); } -Status ParquetReader::_filter_row_groups(const bool& enabled, - std::vector<RowGroupIndex>& group_indexes) { +Status ParquetReader::_init_row_groups(const bool& is_filter_groups) { SCOPED_RAW_TIMER(&_statistics.row_group_filter_time); - if (enabled && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) { - return Status::EndOfFile("No row group need read"); + if (is_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) { + return Status::EndOfFile("No row group to read"); } - int32_t start_row_id = 0; + int64_t row_index = 0; for (int32_t row_group_idx = 0; row_group_idx < _total_groups; row_group_idx++) { const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_idx]; - if (enabled && _is_misaligned_range_group(row_group)) { + if (is_filter_groups && _is_misaligned_range_group(row_group)) { + row_index += row_group.num_rows; continue; } bool filter_group = false; - if (enabled) { + if (is_filter_groups) { RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group)); } int64_t group_size = 0; // only calculate the needed columns @@ -508,13 +433,12 @@ Status ParquetReader::_filter_row_groups(const bool& enabled, group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size; } } - // record row group physical id - int32_t first_row_index = start_row_id; - int32_t last_row_index = first_row_index + row_group.num_rows; - start_row_id = last_row_index + 1; if (!filter_group) { - group_indexes.emplace_back(row_group_idx, first_row_index, last_row_index); - _read_row_groups.emplace_back(row_group_idx); + _read_row_groups.emplace_back(row_group_idx, row_index, row_index + row_group.num_rows); + if (_statistics.read_row_groups == 0) { + _whole_range.first_row = row_index; + } + _whole_range.last_row = row_index + row_group.num_rows; _statistics.read_row_groups++; _statistics.read_bytes += group_size; } else { @@ -522,6 +446,11 @@ Status ParquetReader::_filter_row_groups(const bool& enabled, _statistics.filtered_bytes += group_size; _statistics.filtered_group_rows += row_group.num_rows; } + row_index += row_group.num_rows; + } + + if (_read_row_groups.empty()) { + return Status::EndOfFile("No row group to read"); } return Status::OK(); } @@ -545,13 +474,22 @@ bool ParquetReader::_has_page_index(const std::vector<tparquet::ColumnChunk>& co return page_index.check_and_get_page_index_ranges(columns); } -Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) { +Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, + std::vector<RowRange>& candidate_row_ranges) { SCOPED_RAW_TIMER(&_statistics.page_index_filter_time); + + std::function<void()> read_whole_row_group = [&]() { + candidate_row_ranges.emplace_back(0, row_group.num_rows); + _statistics.read_rows += row_group.num_rows; + }; + if (_colname_to_value_range == nullptr || _colname_to_value_range->empty()) { + read_whole_row_group(); return Status::OK(); } PageIndex page_index; if (!_has_page_index(row_group.columns, page_index)) { + read_whole_row_group(); return Status::OK(); } uint8_t col_index_buff[page_index._column_index_size]; @@ -573,12 +511,12 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) { auto& chunk = row_group.columns[read_col._parquet_col_id]; tparquet::ColumnIndex column_index; if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) { - return Status::OK(); + continue; } RETURN_IF_ERROR(page_index.parse_column_index(chunk, col_index_buff, &column_index)); const int num_of_pages = column_index.null_pages.size(); if (num_of_pages <= 0) { - break; + continue; } auto& conjuncts = conjunct_iter->second; std::vector<int> skipped_page_range; @@ -600,6 +538,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) { _col_offsets.emplace(read_col._parquet_col_id, offset_index); } if (skipped_row_ranges.empty()) { + read_whole_row_group(); return Status::OK(); } @@ -617,14 +556,14 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) { } } else { // read row with candidate ranges rather than skipped ranges - _row_ranges.emplace_back(skip_end, skip_range.first_row); + candidate_row_ranges.emplace_back(skip_end, skip_range.first_row); read_rows += skip_range.first_row - skip_end; skip_end = skip_range.last_row; } } DCHECK_LE(skip_end, row_group.num_rows); if (skip_end != row_group.num_rows) { - _row_ranges.emplace_back(skip_end, row_group.num_rows); + candidate_row_ranges.emplace_back(skip_end, row_group.num_rows); read_rows += row_group.num_rows - skip_end; } _statistics.read_rows += read_rows; @@ -676,13 +615,13 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co void ParquetReader::_init_chunk_dicts() {} Status ParquetReader::_process_dict_filter(bool* filter_group) { - return Status(); + return Status::OK(); } void ParquetReader::_init_bloom_filter() {} Status ParquetReader::_process_bloom_filter(bool* filter_group) { - return Status(); + return Status::OK(); } int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index f5891754e7..2bfc74f823 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -77,9 +77,10 @@ public: void close(); - Status file_metadata(FileMetaData** metadata); + RowRange get_whole_range() { return _whole_range; } - void merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges); + // set the delete rows in current parquet file + void set_delete_rows(const std::vector<int64_t>* delete_rows) { _delete_rows = delete_rows; } int64_t size() const { return _file_reader->size(); } @@ -87,8 +88,8 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; - Status get_parsered_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) override; + Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) override; Statistics& statistics() { return _statistics; } @@ -125,12 +126,16 @@ private: Status _open_file(); void _init_profile(); - bool _next_row_group_reader(); + Status _next_row_group_reader(); + RowGroupReader::PositionDeleteContext _get_position_delete_ctx( + const tparquet::RowGroup& row_group, + const RowGroupReader::RowGroupIndex& row_group_index); Status _init_read_columns(); - Status _init_row_group_readers(const bool& filter_groups); + Status _init_row_groups(const bool& is_filter_groups); // Page Index Filter bool _has_page_index(const std::vector<tparquet::ColumnChunk>& columns, PageIndex& page_index); - Status _process_page_index(const tparquet::RowGroup& row_group); + Status _process_page_index(const tparquet::RowGroup& row_group, + std::vector<RowRange>& candidate_row_ranges); // Row Group Filter bool _is_misaligned_range_group(const tparquet::RowGroup& row_group); @@ -141,28 +146,27 @@ private: Status _process_dict_filter(bool* filter_group); void _init_bloom_filter(); Status _process_bloom_filter(bool* filter_group); - Status _filter_row_groups(const bool& enabled, std::vector<RowGroupIndex>& group_indexes); int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers); -private: RuntimeProfile* _profile; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; std::unique_ptr<FileReader> _file_reader = nullptr; - std::vector<RowRange> _row_ranges; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; - std::list<std::shared_ptr<RowGroupReader>> _row_group_readers; - std::shared_ptr<RowGroupReader> _current_group_reader; + std::unique_ptr<RowGroupReader> _current_group_reader; int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file std::map<std::string, int> _map_column; // column-name <---> column-index std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; std::vector<ParquetReadColumn> _read_columns; + RowRange _whole_range = RowRange(0, 0); + const std::vector<int64_t>* _delete_rows = nullptr; + int64_t _delete_rows_index = 0; // Used for column lazy read. RowGroupReader::LazyReadContext _lazy_read_ctx; - std::list<int32_t> _read_row_groups; + std::list<RowGroupReader::RowGroupIndex> _read_row_groups; // parquet file reader object size_t _batch_size; int64_t _range_start_offset; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 9e2f659772..332aa44e54 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -17,12 +17,10 @@ #include "iceberg_reader.h" -#include <vec/core/column_with_type_and_name.h> -#include <vec/exec/format/parquet/vparquet_reader.h> - -#include <vec/data_types/data_type_factory.hpp> - #include "vec/common/assert_cast.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/exec/format/parquet/vparquet_reader.h" namespace doris::vectorized { @@ -34,9 +32,11 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); - _iceberg_profile._delete_files_init_time = - ADD_CHILD_TIMER(_profile, "DeleteFileInitTime", iceberg_profile); - _iceberg_profile._delete_files_read_total_time = + _iceberg_profile.num_delete_files = + ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile); + _iceberg_profile.num_delete_rows = + ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile); + _iceberg_profile.delete_files_read_time = ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); } @@ -63,162 +63,159 @@ Status IcebergTableReader::get_columns( return _file_format_reader->get_columns(name_to_type, missing_cols); } -void IcebergTableReader::filter_rows(const TFileRangeDesc& range) { - if (_cur_delete_file_reader == nullptr) { - return; - } - SCOPED_TIMER(_iceberg_profile._delete_files_read_total_time); +Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { auto& table_desc = range.table_format_params.iceberg_params; auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { - return; + return Status::OK(); } - bool eof = false; - std::set<RowRange> delete_row_ranges; - while (!eof) { - size_t read_rows = 0; - Block block = Block(); - for (const FieldSchema& field : _column_schemas) { - DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field.type, true); - MutableColumnPtr data_column = data_type->create_column(); - block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field.name)); - } - Status st = _cur_delete_file_reader->get_next_block(&block, &read_rows, &eof); - if (!st.ok() || eof) { - if (!_delete_file_readers.empty()) { - eof = false; - _cur_delete_file_reader = std::move(_delete_file_readers.front()); - _delete_file_readers.pop_front(); - } - } - if (read_rows != 0) { - auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS); - ColumnPtr pos_column = pos_type_column.column; - using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; - if (pos_type_column.type->is_nullable()) { - pos_column = - assert_cast<const ColumnNullable&>(*pos_column).get_nested_column_ptr(); - } - auto& data = assert_cast<const ColumnType&>(*pos_column).get_data(); - std::vector<int64_t> delete_row_ids; - for (int row_id = 0; row_id < read_rows; row_id++) { - delete_row_ids.emplace_back(data[row_id]); + auto& delete_file_type = table_desc.content; + auto files = table_desc.delete_files; + if (files.empty()) { + return Status::OK(); + } + if (delete_file_type == POSITION_DELETE) { + // position delete + SCOPED_TIMER(_iceberg_profile.delete_files_read_time); + auto row_desc = RowDescriptor(_state->desc_tbl(), + std::vector<TupleId>({table_desc.delete_table_tuple_id}), + std::vector<bool>({false})); + RETURN_IF_ERROR(VExpr::create_expr_tree(_state->obj_pool(), table_desc.file_select_conjunct, + &_data_path_conjunct_ctx)); + RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc)); + RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state)); + + ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get()); + RowRange whole_range = parquet_reader->get_whole_range(); + bool init_schema = false; + std::vector<std::string> delete_file_col_names; + std::vector<TypeDescriptor> delete_file_col_types; + std::list<std::vector<int64_t>> delete_rows_list; + delete_rows_list.resize(files.size()); + int64_t num_delete_rows = 0; + auto delete_rows_iter = delete_rows_list.begin(); + for (auto& delete_file : files) { + if (whole_range.last_row <= delete_file.position_lower_bound || + whole_range.first_row > delete_file.position_upper_bound) { + delete_rows_iter++; + continue; } - if (delete_row_ids.empty()) { - return; + std::vector<int64_t>& delete_rows = *delete_rows_iter; + TFileRangeDesc delete_range; + delete_range.path = delete_file.path; + delete_range.start_offset = 0; + delete_range.size = -1; + delete_range.file_size = -1; + ParquetReader delete_reader(_profile, _params, delete_range, 102400, + const_cast<cctz::time_zone*>(&_state->timezone_obj())); + if (!init_schema) { + delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types); + init_schema = true; } - - int num_deleted_ids = delete_row_ids.size(); - int i = 0; - while (i < num_deleted_ids) { - int64_t row_id = delete_row_ids[i]; - int64_t row_range_start = row_id; - int64_t row_range_end = row_id; - while (i + 1 < num_deleted_ids) { - if (delete_row_ids[i + 1] == delete_row_ids[i] + 1) { - row_range_end = delete_row_ids[i + 1]; - i++; - continue; - } else { - delete_row_ranges.emplace(row_range_start, row_range_end + 1); - row_range_start = ++row_range_end; - break; - } + RETURN_IF_ERROR(delete_reader.init_reader(delete_file_col_names, nullptr, + _data_path_conjunct_ctx, false)); + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + partition_columns; + std::unordered_map<std::string, VExprContext*> missing_columns; + delete_reader.set_fill_columns(partition_columns, missing_columns); + + bool eof = false; + while (!eof) { + Block block = Block(); + for (int i = 0; i < delete_file_col_names.size(); ++i) { + DataTypePtr data_type = DataTypeFactory::instance().create_data_type( + delete_file_col_types[i], true); + MutableColumnPtr data_column = data_type->create_column(); + block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, + delete_file_col_names[i])); } - if (i == num_deleted_ids - 1) { - delete_row_ranges.emplace(row_range_start, - delete_row_ids[num_deleted_ids - 1] + 1); + eof = false; + size_t read_rows = 0; + RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof)); + if (read_rows > 0) { + auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS); + ColumnPtr pos_column = pos_type_column.column; + using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; + if (pos_type_column.type->is_nullable()) { + pos_column = assert_cast<const ColumnNullable&>(*pos_column) + .get_nested_column_ptr(); + } + const int64_t* src_data = + assert_cast<const ColumnType&>(*pos_column).get_data().data(); + const int64_t* src_data_end = src_data + read_rows; + const int64_t* cpy_start = + std::lower_bound(src_data, src_data_end, whole_range.first_row); + const int64_t* cpy_end = + std::lower_bound(cpy_start, src_data_end, whole_range.last_row); + int64_t cpy_count = cpy_end - cpy_start; + + if (cpy_count > 0) { + int64_t origin_size = delete_rows.size(); + delete_rows.resize(origin_size + cpy_count); + int64_t* dest_position = &delete_rows[origin_size]; + memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); + num_delete_rows += cpy_count; + } } - row_range_start = delete_row_ids[i + 1]; - i++; } + delete_rows_iter++; } - } - if (VLOG_IS_ON(3)) { - if (!delete_row_ranges.empty()) { - std::stringstream out; - out << "["; - for (const RowRange& delete_row_range : delete_row_ranges) { - out << " " << delete_row_range.debug_string(); + if (num_delete_rows > 0) { + for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end();) { + if (iter->empty()) { + delete_rows_list.erase(iter++); + } else { + iter++; + } } - out << " ]"; - VLOG_NOTICE << "Delete row range info: " << out.str(); + _merge_sort(delete_rows_list, num_delete_rows); + parquet_reader->set_delete_rows(&_delete_rows); + COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); } } - ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get()); - parquet_reader->merge_delete_row_ranges(delete_row_ranges); + // todo: equality delete + COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size()); + return Status::OK(); } -Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { - auto& table_desc = range.table_format_params.iceberg_params; - auto& version = table_desc.format_version; - if (version >= MIN_SUPPORT_DELETE_FILES_VERSION) { - SCOPED_TIMER(_iceberg_profile._delete_files_init_time); - auto& delete_file_type = table_desc.content; - auto files = table_desc.delete_files; - if (delete_file_type == POSITON_DELELE) { - // position delete - auto row_desc = RowDescriptor(_state->desc_tbl(), - std::vector<TupleId>({table_desc.delete_table_tuple_id}), - std::vector<bool>({false})); - RETURN_IF_ERROR(VExpr::create_expr_tree( - _state->obj_pool(), table_desc.file_select_conjunct, &_data_path_conjunct_ctx)); - RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc)); - RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state)); - vector<std::string> names; - for (auto& delete_file : files) { - _position_delete_params.low_bound_index = delete_file.position_lower_bound; - _position_delete_params.upper_bound_index = delete_file.position_upper_bound; - - TFileRangeDesc delete_range; - delete_range.path = delete_file.path; - delete_range.start_offset = 0; - delete_range.size = -1; - delete_range.file_size = -1; - ParquetReader* delete_reader = new ParquetReader( - _profile, _params, delete_range, _state->query_options().batch_size, - const_cast<cctz::time_zone*>(&_state->timezone_obj())); - if (_delete_file_schema == nullptr) { - FileMetaData* metadata = nullptr; - RETURN_IF_ERROR(delete_reader->file_metadata(&metadata)); - if (metadata == nullptr) { - break; - } - _delete_file_schema = &metadata->schema(); - int num_of_col = _delete_file_schema->size(); - for (auto i = 0; i < num_of_col; ++i) { - const FieldSchema* field = _delete_file_schema->get_column(i); - _column_schemas.emplace_back(*field); - names.emplace_back(field->name); - } - } - DCHECK_EQ(_column_schemas.size(), _delete_file_schema->size()); - // The expr assure reading delete file data from current file range only - Status d_st = - delete_reader->init_reader(names, nullptr, _data_path_conjunct_ctx, false); - std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> - partition_columns; - std::unordered_map<std::string, VExprContext*> missing_columns; - delete_reader->set_fill_columns(partition_columns, missing_columns); - - _delete_file_readers.emplace_back((GenericReader*)delete_reader); - - ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get()); - FileMetaData* file_metadata = nullptr; - RETURN_IF_ERROR(parquet_reader->file_metadata(&file_metadata)); - _position_delete_params.total_file_rows = file_metadata->to_thrift().num_rows; - } - if (!_delete_file_readers.empty()) { - _cur_delete_file_reader = std::move(_delete_file_readers.front()); - _delete_file_readers.pop_front(); - } else { - _cur_delete_file_reader = nullptr; - } +void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, + int64_t num_delete_rows) { + if (delete_rows_list.empty()) { + return; + } + if (delete_rows_list.size() == 1) { + _delete_rows.resize(num_delete_rows); + memcpy(&_delete_rows[0], &(delete_rows_list.front()[0]), sizeof(int64_t) * num_delete_rows); + return; + } + if (delete_rows_list.size() == 2) { + _delete_rows.resize(num_delete_rows); + std::merge(delete_rows_list.front().begin(), delete_rows_list.front().end(), + delete_rows_list.back().begin(), delete_rows_list.back().end(), + _delete_rows.begin()); + return; + } + + // merge sort + using vec_pair = + std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::const_iterator>; + auto cmp = [](const vec_pair& a, const vec_pair& b) { return *a.first > *b.first; }; + std::priority_queue<vec_pair, vector<vec_pair>, decltype(cmp)> pq(cmp); + for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end(); ++iter) { + if (iter->size() > 0) { + pq.push({iter->begin(), iter->end()}); + } + } + _delete_rows.reserve(num_delete_rows); + while (!pq.empty()) { + vec_pair p = pq.top(); + pq.pop(); + _delete_rows.emplace_back(*p.first); + p.first++; + if (p.first != p.second) { + pq.push(p); } } - // todo: equality delete - filter_rows(range); - return Status::OK(); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index c71ffdaa9c..0a9d4ef71f 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -17,12 +17,11 @@ #pragma once -#include <vec/exec/format/parquet/parquet_common.h> - #include <queue> #include "table_format_reader.h" #include "vec/exec/format/generic_reader.h" +#include "vec/exec/format/parquet/parquet_common.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized { @@ -32,8 +31,8 @@ public: IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params); ~IcebergTableReader() override; - Status init_row_filters(const TFileRangeDesc& range); - void filter_rows(const TFileRangeDesc& range) override; + + Status init_row_filters(const TFileRangeDesc& range) override; Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; @@ -45,30 +44,23 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; -public: - enum { DATA, POSITON_DELELE, EQULITY_DELELE }; - struct PositionDeleteParams { - int64_t low_bound_index = -1; - int64_t upper_bound_index = -1; - int64_t last_delete_row_index = -1; - int64_t total_file_rows = 0; - }; + enum { DATA, POSITION_DELETE, EQUALITY_DELETE }; private: struct IcebergProfile { - RuntimeProfile::Counter* _delete_files_init_time; - RuntimeProfile::Counter* _delete_files_read_total_time; + RuntimeProfile::Counter* num_delete_files; + RuntimeProfile::Counter* num_delete_rows; + RuntimeProfile::Counter* delete_files_read_time; }; + + void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows); + RuntimeProfile* _profile; RuntimeState* _state; const TFileScanRangeParams& _params; - std::vector<FieldSchema> _column_schemas; - std::deque<std::unique_ptr<GenericReader>> _delete_file_readers; - std::unique_ptr<GenericReader> _cur_delete_file_reader; - PositionDeleteParams _position_delete_params; - const FieldDescriptor* _delete_file_schema = nullptr; VExprContext* _data_path_conjunct_ctx = nullptr; IcebergProfile _iceberg_profile; + std::vector<int64_t> _delete_rows; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 188d41a2b9..1c2ebec7d3 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -17,12 +17,11 @@ #pragma once -#include <vec/exec/format/parquet/parquet_common.h> - #include <string> #include "runtime/runtime_state.h" #include "vec/exec/format/generic_reader.h" +#include "vec/exec/format/parquet/parquet_common.h" namespace doris::vectorized { @@ -38,7 +37,7 @@ public: return _file_format_reader->get_columns(name_to_type, missing_cols); } - virtual void filter_rows(const TFileRangeDesc& range) = 0; + virtual Status init_row_filters(const TFileRangeDesc& range) = 0; protected: std::string _table_format; // hudi, iceberg diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index c8eeb5d00b..d1f59e9234 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -413,13 +413,13 @@ TEST_F(ParquetThriftReaderTest, group_reader) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto row_group = t_metadata.row_groups[0]; std::shared_ptr<RowGroupReader> row_group_reader; - row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, - RowGroupIndex(0, 0, 10000), row_group, &ctz, - lazy_read_ctx)); + RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0); + row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group, &ctz, + position_delete_ctx, lazy_read_ctx)); + std::vector<RowRange> row_ranges; + row_ranges.emplace_back(0, row_group.num_rows); auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>(); - auto stg = row_group_reader->init(meta_data->schema(), col_offsets); - std::vector<RowRange> row_ranges = std::vector<RowRange>(); - row_group_reader->set_row_ranges(row_ranges); + auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets); EXPECT_TRUE(stg.ok()); vectorized::Block block; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org