This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 36b61d4826 [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441) 36b61d4826 is described below commit 36b61d4826eeb1f172d7043a32d471aaeb64e1e1 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Fri Dec 30 08:57:55 2022 +0800 [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441) **Optimize** PR #14470 has used `Expr` to filter delete rows to match current data file, but the rows in the delete file are [sorted by file_path then position](https://iceberg.apache.org/spec/#position-delete-files) to optimize filtering rows while scanning, so this PR remove `Expr` and use binary search to filter delete rows. In addition, delete files are likely to be encoded in dictionary, it's time-consuming to decode `file_path` columns into `ColumnString`, so this PR use `ColumnDictionary` to read `file_path` column. After testing, the performance of iceberg v2's MOR is improved by 30%+. **Fix Bug** Lazy-read-block may not have the filter column, if the whole group is filtered by `Expr` and the batch_eof is generated from next batch. --- be/src/vec/columns/column_dictionary.h | 11 ++ be/src/vec/exec/format/parquet/parquet_common.cpp | 56 +++++++ be/src/vec/exec/format/parquet/parquet_common.h | 21 ++- .../parquet/vparquet_column_chunk_reader.cpp | 4 + .../format/parquet/vparquet_column_chunk_reader.h | 1 + .../exec/format/parquet/vparquet_group_reader.cpp | 10 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 19 ++- be/src/vec/exec/format/parquet/vparquet_reader.h | 4 +- be/src/vec/exec/format/table/iceberg_reader.cpp | 184 ++++++++++++++++----- be/src/vec/exec/format/table/iceberg_reader.h | 28 +++- be/src/vec/exec/scan/vfile_scanner.cpp | 4 +- .../planner/external/IcebergScanProvider.java | 69 -------- .../doris/planner/external/IcebergSplit.java | 2 - gensrc/thrift/PlanNodes.thrift | 2 + 14 files changed, 274 insertions(+), 141 deletions(-) diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index 729957946f..422e2fdfbb 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -211,6 +211,17 @@ public: LOG(FATAL) << "should not call replace_column_data_default in ColumnDictionary"; } + /** + * Just insert dictionary data items, the items will append into _dict. + */ + void insert_many_dict_data(const StringRef* dict_array, uint32_t dict_num) { + _dict.reserve(_dict.size() + dict_num); + for (uint32_t i = 0; i < dict_num; ++i) { + auto value = StringValue(dict_array[i].data, dict_array[i].size); + _dict.insert_value(value); + } + } + void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict_array, size_t data_num, uint32_t dict_num) override { diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 98f0a7de1a..5f8656fcd9 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -18,6 +18,7 @@ #include "parquet_common.h" #include "util/coding.h" +#include "vec/columns/column_dictionary.h" #include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { @@ -277,6 +278,36 @@ void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) { } } +Status Decoder::_decode_dict_values(MutableColumnPtr& doris_column, + ColumnSelectVector& select_vector) { + DCHECK(doris_column->is_column_dictionary()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data(); + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + uint32_t* start_index = &_indexes[0]; + column_data.insert(start_index + dict_index, start_index + dict_index + run_length); + dict_index += run_length; + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + break; + } + } + } + return Status::OK(); +} + Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) { if (num_values * _type_length != length) { @@ -321,11 +352,26 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP ColumnSelectVector& select_vector) { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); if (_has_dict) { + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back(_dict_items[i], _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } _indexes.resize(non_null_size); _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); } else if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); } + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); switch (logical_type) { #define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ @@ -507,9 +553,19 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP ColumnSelectVector& select_vector) { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); if (_has_dict) { + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&_dict_items[0], _dict_items.size()); + } _indexes.resize(non_null_size); _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); } + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); switch (logical_type) { case TypeIndex::String: diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 112be6a223..a3d3e0d8a4 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -178,11 +178,23 @@ public: } protected: + /** + * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type, + * and the coded values must be read into _indexes previously. + */ + Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); + int32_t _type_length; Slice* _data = nullptr; uint32_t _offset = 0; FieldSchema* _field_schema = nullptr; std::unique_ptr<DecodeParams> _decode_params = nullptr; + + // For dictionary encoding + bool _has_dict = false; + std::unique_ptr<uint8_t[]> _dict = nullptr; + std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr; + std::vector<uint32_t> _indexes; }; template <typename DecimalPrimitiveType> @@ -253,12 +265,9 @@ protected: if (!_has_dict) _offset += _type_length tparquet::Type::type _physical_type; + // For dictionary encoding - bool _has_dict = false; - std::unique_ptr<uint8_t[]> _dict = nullptr; std::vector<char*> _dict_items; - std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr; - std::vector<uint32_t> _indexes; }; template <typename Numeric> @@ -567,11 +576,7 @@ protected: ColumnSelectVector& select_vector); // For dictionary encoding - bool _has_dict = false; - std::unique_ptr<uint8_t[]> _dict = nullptr; std::vector<StringRef> _dict_items; - std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr; - std::vector<uint32_t> _indexes; }; template <typename DecimalPrimitiveType> diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 2d398b5409..e1c2c0af64 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -176,6 +176,7 @@ Status ColumnChunkReader::_decode_dict_page() { header.dictionary_page_header.num_values)); _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder); + _has_dict = true; return Status::OK(); } @@ -218,6 +219,9 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) { Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, ColumnSelectVector& select_vector) { SCOPED_RAW_TIMER(&_statistics.decode_value_time); + if (UNLIKELY(doris_column->is_column_dictionary() && !_has_dict)) { + return Status::IOError("Not dictionary coded"); + } if (UNLIKELY(_remaining_num_values < select_vector.num_values())) { return Status::IOError("Decode too many values in current page"); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index f76c05b735..1bb1d017ce 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -168,6 +168,7 @@ private: Slice _page_data; std::unique_ptr<uint8_t[]> _decompress_buf; size_t _decompress_buf_size = 0; + bool _has_dict = false; Decoder* _page_decoder = nullptr; // Map: encoding -> Decoder // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index e3f841f1b1..0d7df17e4b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -257,8 +257,14 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // filter data in predicate columns, and remove filter column if (select_vector.has_filter()) { - Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id, - origin_column_num); + if (block->columns() == origin_column_num) { + // the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is + // generated from next batch, so the filter column is removed ahead. + DCHECK_EQ(block->rows(), 0); + } else { + Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id, + origin_column_num); + } } else { Block::erase_useless_column(block, origin_column_num); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6613b1f987..a27eddce71 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -141,7 +141,6 @@ Status ParquetReader::_open_file() { _scan_range.file_size, 0, _file_reader)); } if (_file_metadata == nullptr) { - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); RETURN_IF_ERROR(_file_reader->open()); if (_file_reader->size() == 0) { return Status::EndOfFile("Empty Parquet File"); @@ -341,31 +340,31 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor } Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - if (_current_group_reader == nullptr) { + if (_current_group_reader == nullptr || _row_group_eof) { if (_read_row_groups.size() > 0) { RETURN_IF_ERROR(_next_row_group_reader()); } else { + _current_group_reader.reset(nullptr); + _row_group_eof = true; *read_rows = 0; *eof = true; return Status::OK(); } } DCHECK(_current_group_reader != nullptr); - bool batch_eof = false; { SCOPED_RAW_TIMER(&_statistics.column_read_time); RETURN_IF_ERROR( - _current_group_reader->next_batch(block, _batch_size, read_rows, &batch_eof)); + _current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof)); } - if (batch_eof) { + if (_row_group_eof) { auto column_st = _current_group_reader->statistics(); _column_statistics.merge(column_st); _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows(); - Status st = _next_row_group_reader(); - if (st.is_end_of_file()) { + if (_read_row_groups.size() == 0) { *eof = true; - } else if (!st.ok()) { - return st; + } else { + *eof = false; } } return Status::OK(); @@ -390,6 +389,7 @@ RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx( Status ParquetReader::_next_row_group_reader() { if (_read_row_groups.empty()) { + _row_group_eof = true; _current_group_reader.reset(nullptr); return Status::EndOfFile("No next RowGroupReader"); } @@ -406,6 +406,7 @@ Status ParquetReader::_next_row_group_reader() { _current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_index.row_group_id, row_group, _ctz, position_delete_ctx, _lazy_read_ctx)); + _row_group_eof = false; return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 2bfc74f823..19a7b2534d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -154,7 +154,9 @@ private: std::unique_ptr<FileReader> _file_reader = nullptr; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; - std::unique_ptr<RowGroupReader> _current_group_reader; + std::unique_ptr<RowGroupReader> _current_group_reader = nullptr; + // read to the end of current reader + bool _row_group_eof = true; int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file std::map<std::string, int> _map_column; // column-name <---> column-index std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 332aa44e54..1bdae3a6e2 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -26,10 +26,16 @@ namespace doris::vectorized { const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; const std::string ICEBERG_ROW_POS = "pos"; +const std::string ICEBERG_FILE_PATH = "file_path"; IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile, - RuntimeState* state, const TFileScanRangeParams& params) - : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) { + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range) + : TableFormatReader(file_format_reader), + _profile(profile), + _state(state), + _params(params), + _range(range) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -38,12 +44,8 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile); _iceberg_profile.delete_files_read_time = ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); -} - -IcebergTableReader::~IcebergTableReader() { - if (_data_path_conjunct_ctx != nullptr) { - _data_path_conjunct_ctx->close(_state); - } + _iceberg_profile.delete_rows_sort_time = + ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { @@ -76,15 +78,6 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { } if (delete_file_type == POSITION_DELETE) { // position delete - SCOPED_TIMER(_iceberg_profile.delete_files_read_time); - auto row_desc = RowDescriptor(_state->desc_tbl(), - std::vector<TupleId>({table_desc.delete_table_tuple_id}), - std::vector<bool>({false})); - RETURN_IF_ERROR(VExpr::create_expr_tree(_state->obj_pool(), table_desc.file_select_conjunct, - &_data_path_conjunct_ctx)); - RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc)); - RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state)); - ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get()); RowRange whole_range = parquet_reader->get_whole_range(); bool init_schema = false; @@ -100,6 +93,7 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { delete_rows_iter++; continue; } + SCOPED_TIMER(_iceberg_profile.delete_files_read_time); std::vector<int64_t>& delete_rows = *delete_rows_iter; TFileRangeDesc delete_range; delete_range.path = delete_file.path; @@ -112,49 +106,90 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types); init_schema = true; } - RETURN_IF_ERROR(delete_reader.init_reader(delete_file_col_names, nullptr, - _data_path_conjunct_ctx, false)); + std::string data_file_path = _range.path; + // the path in _range is remove the namenode prefix, + // and the file_path in delete file is full path, so we should add it back. + if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) { + std::string fs_name = _params.hdfs_params.fs_name; + if (!starts_with(data_file_path, fs_name)) { + data_file_path = fs_name + data_file_path; + } + } + RETURN_IF_ERROR( + delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false)); std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> partition_columns; std::unordered_map<std::string, VExprContext*> missing_columns; delete_reader.set_fill_columns(partition_columns, missing_columns); bool eof = false; + // We can only know whether a parquet file is encoded in dictionary after reading the first block, + // so we assume it dictionary encoded first, and reset it false if error thrown. + bool dictionary_coded = true; while (!eof) { Block block = Block(); for (int i = 0; i < delete_file_col_names.size(); ++i) { DataTypePtr data_type = DataTypeFactory::instance().create_data_type( - delete_file_col_types[i], true); - MutableColumnPtr data_column = data_type->create_column(); - block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, - delete_file_col_names[i])); + delete_file_col_types[i], false); + if (delete_file_col_names[i] == ICEBERG_FILE_PATH && dictionary_coded) { + // the dictionary data in ColumnDictI32 is referenced by StringValue, it does keep + // the dictionary data in its life circle, so the upper caller should keep the + // dictionary data alive after ColumnDictI32. + MutableColumnPtr dict_column = ColumnDictI32::create(); + block.insert(ColumnWithTypeAndName(std::move(dict_column), data_type, + delete_file_col_names[i])); + } else { + MutableColumnPtr data_column = data_type->create_column(); + block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, + delete_file_col_names[i])); + } } eof = false; size_t read_rows = 0; - RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof)); + Status st = delete_reader.get_next_block(&block, &read_rows, &eof); + if (!st.ok()) { + if (st.to_string() == "[IO_ERROR]Not dictionary coded") { + dictionary_coded = false; + continue; + } + return st; + } if (read_rows > 0) { - auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS); - ColumnPtr pos_column = pos_type_column.column; - using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; - if (pos_type_column.type->is_nullable()) { - pos_column = assert_cast<const ColumnNullable&>(*pos_column) - .get_nested_column_ptr(); + ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column; + DCHECK_EQ(path_column->size(), read_rows); + std::pair<int, int> path_range; + if (dictionary_coded) { + path_range = _binary_search(assert_cast<const ColumnDictI32&>(*path_column), + data_file_path); + } else { + path_range = _binary_search(assert_cast<const ColumnString&>(*path_column), + data_file_path); } - const int64_t* src_data = - assert_cast<const ColumnType&>(*pos_column).get_data().data(); - const int64_t* src_data_end = src_data + read_rows; - const int64_t* cpy_start = - std::lower_bound(src_data, src_data_end, whole_range.first_row); - const int64_t* cpy_end = - std::lower_bound(cpy_start, src_data_end, whole_range.last_row); - int64_t cpy_count = cpy_end - cpy_start; - - if (cpy_count > 0) { - int64_t origin_size = delete_rows.size(); - delete_rows.resize(origin_size + cpy_count); - int64_t* dest_position = &delete_rows[origin_size]; - memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); - num_delete_rows += cpy_count; + + int skip_count = path_range.first; + int valid_count = path_range.second; + if (valid_count > 0) { + // delete position + ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column; + CHECK_EQ(pos_column->size(), read_rows); + using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; + const int64_t* src_data = + assert_cast<const ColumnType&>(*pos_column).get_data().data() + + skip_count; + const int64_t* src_data_end = src_data + valid_count; + const int64_t* cpy_start = + std::lower_bound(src_data, src_data_end, whole_range.first_row); + const int64_t* cpy_end = + std::lower_bound(cpy_start, src_data_end, whole_range.last_row); + int64_t cpy_count = cpy_end - cpy_start; + + if (cpy_count > 0) { + int64_t origin_size = delete_rows.size(); + delete_rows.resize(origin_size + cpy_count); + int64_t* dest_position = &delete_rows[origin_size]; + memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); + num_delete_rows += cpy_count; + } } } } @@ -168,6 +203,7 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { iter++; } } + SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); _merge_sort(delete_rows_list, num_delete_rows); parquet_reader->set_delete_rows(&_delete_rows); COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); @@ -178,6 +214,64 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { return Status::OK(); } +std::pair<int, int> IcebergTableReader::_binary_search(const ColumnDictI32& file_path_column, + const std::string& data_file_path) { + size_t read_rows = file_path_column.get_data().size(); + + int data_file_code = file_path_column.find_code(StringValue(data_file_path)); + if (data_file_code == -2) { // -1 is null code + return std::make_pair(read_rows, 0); + } + + const int* coded_path = file_path_column.get_data().data(); + const int* coded_path_end = coded_path + read_rows; + const int* path_start = std::lower_bound(coded_path, coded_path_end, data_file_code); + const int* path_end = std::lower_bound(path_start, coded_path_end, data_file_code + 1); + int skip_count = path_start - coded_path; + int valid_count = path_end - path_start; + + return std::make_pair(skip_count, valid_count); +} + +std::pair<int, int> IcebergTableReader::_binary_search(const ColumnString& file_path_column, + const std::string& data_file_path) { + const int read_rows = file_path_column.size(); + if (read_rows == 0) { + return std::make_pair(0, 0); + } + StringRef data_file(data_file_path); + + int left = 0; + int right = read_rows - 1; + if (file_path_column.get_data_at(left) > data_file || + file_path_column.get_data_at(right) < data_file) { + return std::make_pair(read_rows, 0); + } + while (left < right) { + int mid = (left + right) / 2; + if (file_path_column.get_data_at(mid) < data_file) { + left = mid; + } else { + right = mid; + } + } + if (file_path_column.get_data_at(left) == data_file) { + int start = left; + int end = read_rows - 1; + while (start < end) { + int pivot = (start + end) / 2; + if (file_path_column.get_data_at(pivot) > data_file) { + end = pivot; + } else { + start = pivot; + } + } + return std::make_pair(left, end - left + 1); + } else { + return std::make_pair(read_rows, 0); + } +} + void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows) { if (delete_rows_list.empty()) { diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 0a9d4ef71f..d689fbcf51 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -20,6 +20,7 @@ #include <queue> #include "table_format_reader.h" +#include "vec/columns/column_dictionary.h" #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/parquet/parquet_common.h" #include "vec/exprs/vexpr.h" @@ -29,8 +30,9 @@ namespace doris::vectorized { class IcebergTableReader : public TableFormatReader { public: IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile, - RuntimeState* state, const TFileScanRangeParams& params); - ~IcebergTableReader() override; + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range); + ~IcebergTableReader() override = default; Status init_row_filters(const TFileRangeDesc& range) override; @@ -51,14 +53,34 @@ private: RuntimeProfile::Counter* num_delete_files; RuntimeProfile::Counter* num_delete_rows; RuntimeProfile::Counter* delete_files_read_time; + RuntimeProfile::Counter* delete_rows_sort_time; }; + /** + * https://iceberg.apache.org/spec/#position-delete-files + * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. + * Sorting by file_path allows filter pushdown by file in columnar storage formats. + * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. + * + * So, use merge-sort to merge delete rows from different files. + */ void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows); + /** + * Delete rows is sorted by file_path, using binary-search to locate the right delete rows for current data file. + * @return a pair of \<skip_count, valid_count\>, + * and the range of [skip_count, skip_count + valid_count) is the delete rows for current data file. + */ + std::pair<int, int> _binary_search(const ColumnDictI32& file_path_column, + const std::string& data_file_path); + + std::pair<int, int> _binary_search(const ColumnString& file_path_column, + const std::string& data_file_path); + RuntimeProfile* _profile; RuntimeState* _state; const TFileScanRangeParams& _params; - VExprContext* _data_path_conjunct_ctx = nullptr; + const TFileRangeDesc& _range; IcebergProfile _iceberg_profile; std::vector<int64_t> _delete_rows; }; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index e675583bd3..4043174a4a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -493,8 +493,8 @@ Status VFileScanner::_get_next_reader() { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { IcebergTableReader* iceberg_reader = new IcebergTableReader( - (GenericReader*)parquet_reader, _profile, _state, _params); - iceberg_reader->init_row_filters(range); + (GenericReader*)parquet_reader, _profile, _state, _params, range); + RETURN_IF_ERROR(iceberg_reader->init_row_filters(range)); _cur_reader.reset((GenericReader*)iceberg_reader); } else { _cur_reader.reset((GenericReader*)parquet_reader); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index e51b39144f..46adddfde6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -18,19 +18,9 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.BaseTableRef; -import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; -import org.apache.doris.analysis.TableName; -import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HMSResource; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -41,12 +31,8 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; -import org.apache.doris.thrift.TIcebergTable; -import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableFormatFileDesc; -import org.apache.doris.thrift.TTableType; -import lombok.Data; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputSplit; @@ -76,10 +62,6 @@ import java.util.OptionalLong; public class IcebergScanProvider extends HiveScanProvider { private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; - public static final String V2_DELETE_TBL = "iceberg#delete#tbl"; - public static final String V2_DELETE_DB = "iceberg#delete#db"; - private static final DeleteFileTempTable scanDeleteTable = - new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE); private final Analyzer analyzer; public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc, @@ -98,7 +80,6 @@ public class IcebergScanProvider extends HiveScanProvider { if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { - setPathSelectConjunct(fileDesc, icebergSplit); for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); deleteFileDesc.setPath(filter.getDeleteFilePath()); @@ -127,19 +108,6 @@ public class IcebergScanProvider extends HiveScanProvider { rangeDesc.setTableFormatParams(tableFormatFileDesc); } - private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit) - throws UserException { - BaseTableRef tableRef = icebergSplit.getDeleteTableRef(); - fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt()); - SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH); - lhs.analyze(icebergSplit.getAnalyzer()); - lhs.getDesc().setIsMaterialized(true); - StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString()); - BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs); - pathSelectConjunct.analyze(icebergSplit.getAnalyzer()); - fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift()); - } - @Override public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { TFileFormatType type; @@ -173,14 +141,6 @@ public class IcebergScanProvider extends HiveScanProvider { } List<InputSplit> splits = new ArrayList<>(); int formatVersion = ((BaseTable) table).operations().current().formatVersion(); - BaseTableRef tableRef = null; - if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { - TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName()); - fullName.analyze(analyzer); - TableRef ref = new TableRef(fullName, fullName.toString(), null); - tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName()); - tableRef.analyze(analyzer); - } for (FileScanTask task : scan.planFiles()) { for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) { String dataFilePath = spitTask.file().path().toString(); @@ -189,7 +149,6 @@ public class IcebergScanProvider extends HiveScanProvider { split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(spitTask)); - split.setDeleteTableRef(tableRef); } split.setTableFormatType(TableFormatType.ICEBERG); split.setAnalyzer(analyzer); @@ -239,32 +198,4 @@ public class IcebergScanProvider extends HiveScanProvider { public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException { return Collections.emptyList(); } - - @Data - static class DeleteFileTempTable extends ExternalTable { - public static final String DATA_FILE_PATH = "file_path"; - private final TableName tableName; - private final List<Column> fullSchema = new ArrayList<>(); - - public DeleteFileTempTable(TableType type) { - super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type); - this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL); - Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true); - this.fullSchema.add(dataFilePathCol); - } - - @Override - public List<Column> getFullSchema() { - return fullSchema; - } - - @Override - public TTableDescriptor toThrift() { - TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE, - fullSchema.size(), 0, getName(), ""); - tTableDescriptor.setIcebergTable(tIcebergTable); - return tTableDescriptor; - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java index 14fce6caf8..b9607a7f00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.BaseTableRef; import lombok.Data; import org.apache.hadoop.fs.Path; @@ -34,7 +33,6 @@ public class IcebergSplit extends HiveSplit { private Analyzer analyzer; private String dataFilePath; private Integer formatVersion; - private BaseTableRef deleteTableRef; private List<IcebergDeleteFileFilter> deleteFileFilters; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 613c884a06..ff97361c6e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -274,7 +274,9 @@ struct TIcebergFileDesc { 2: optional i32 content; // When open a delete file, filter the data file path with the 'file_path' property 3: optional list<TIcebergDeleteFileDesc> delete_files; + // Deprecated 4: optional Types.TTupleId delete_table_tuple_id; + // Deprecated 5: optional Exprs.TExpr file_select_conjunct; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org