This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 67e4292533 [fix](iceberg-v2) icebergv2 filter data path (#14470) 67e4292533 is described below commit 67e4292533e5822e3a40559b09a364fd548c5f92 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Thu Dec 15 10:18:12 2022 +0800 [fix](iceberg-v2) icebergv2 filter data path (#14470) 1. a icebergv2 delete file may cross many data paths, so the path of a file split is required as a predicate to filter rows of delete file - create delete file structure to save predicate parameters - create predicate for file path 2. add some log to print row range 3. fix bug when create file metadata --- be/src/vec/exec/format/parquet/parquet_common.h | 8 ++ be/src/vec/exec/format/parquet/schema_desc.h | 4 +- .../exec/format/parquet/vparquet_column_reader.cpp | 10 +- .../vec/exec/format/parquet/vparquet_page_index.h | 4 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 58 +++++++----- be/src/vec/exec/format/parquet/vparquet_reader.h | 3 +- be/src/vec/exec/format/table/iceberg_reader.cpp | 104 ++++++++++++++++----- be/src/vec/exec/format/table/iceberg_reader.h | 26 ++++-- be/src/vec/exec/format/table/table_format_reader.h | 3 +- be/src/vec/exec/scan/vfile_scanner.cpp | 6 +- .../planner/external/ExternalFileScanNode.java | 2 +- .../apache/doris/planner/external/HiveSplit.java | 1 + .../planner/external/IcebergScanProvider.java | 86 +++++++++++++++-- .../doris/planner/external/IcebergSplit.java | 5 + .../doris/planner/external/QueryScanProvider.java | 9 +- gensrc/thrift/PlanNodes.thrift | 7 +- 16 files changed, 252 insertions(+), 84 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index d124c2fb9a..611ce969f1 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -46,6 +46,14 @@ struct RowRange { int64_t first_row; int64_t last_row; + + bool operator<(const RowRange& range) const { return first_row < range.first_row; } + + std::string debug_string() const { + std::stringstream ss; + ss << "[" << first_row << "," << last_row << ")"; + return ss.str(); + } }; struct ParquetReadColumn { diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 73f9f97d97..c45e96f236 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -40,11 +40,13 @@ struct FieldSchema { tparquet::Type::type physical_type; // The index order in FieldDescriptor._physical_fields int physical_column_index = -1; - int16_t definition_level = 0; int16_t repetition_level = 0; std::vector<FieldSchema> children; + FieldSchema() = default; + ~FieldSchema() = default; + FieldSchema(const FieldSchema& fieldSchema) = default; std::string debug_string() const; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 235a5f3e3b..c0557c482a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -67,17 +67,17 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end } int index = _row_range_index; while (index < _row_ranges->size()) { - const RowRange& read_range = (*_row_ranges)[index]; - if (read_range.last_row <= start_index) { + const RowRange& row_range = (*_row_ranges)[index]; + if (row_range.last_row <= start_index) { index++; _row_range_index++; continue; } - if (read_range.first_row >= end_index) { + if (row_range.first_row >= end_index) { break; } - int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row; - int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index; + int64_t start = row_range.first_row < start_index ? start_index : row_range.first_row; + int64_t end = row_range.last_row < end_index ? row_range.last_row : end_index; read_ranges.emplace_back(start, end); index++; } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index 978a798bf4..4a27593abf 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -36,9 +36,9 @@ public: std::vector<int>& skipped_ranges); bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns); Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, - tparquet::ColumnIndex* _column_index); + tparquet::ColumnIndex* column_index); Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, - tparquet::OffsetIndex* _offset_index); + tparquet::OffsetIndex* offset_index); private: friend class ParquetReader; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 01c226b1e4..c8d4f6b3c4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -324,11 +324,13 @@ Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names, _scan_range.start_offset, _scan_range.file_size, 0, _file_reader)); } - RETURN_IF_ERROR(_file_reader->open()); - if (_file_reader->size() == 0) { - return Status::EndOfFile("Empty Parquet File"); + if (_file_metadata == nullptr) { + RETURN_IF_ERROR(_file_reader->open()); + if (_file_reader->size() == 0) { + return Status::EndOfFile("Empty Parquet File"); + } + RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); } - RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); @@ -360,57 +362,64 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } -void ParquetReader::merge_delete_row_ranges(const std::vector<RowRange>& delete_row_ranges) { +void ParquetReader::merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges) { if (_row_ranges.empty()) { + _current_group_reader->set_row_ranges(_row_ranges); return; } - std::vector<RowRange> candidate_ranges; - auto start_range = _row_ranges.begin(); if (!delete_row_ranges.empty()) { + std::vector<RowRange> candidate_ranges; + auto start_range = _row_ranges.begin(); auto delete_range = delete_row_ranges.begin(); - int64_t range_start_idx = start_range->first_row; + int64_t processed_range_start_idx = start_range->first_row; while (start_range != _row_ranges.end() && delete_range != delete_row_ranges.end()) { int64_t delete_start = delete_range->first_row; int64_t delete_end = delete_range->last_row; int64_t range_start = start_range->first_row; int64_t range_end = start_range->last_row; - - if (delete_end >= range_end) { - if (range_start < delete_start) { + if (delete_end > range_end) { + if (range_start < processed_range_start_idx) { + // rows before processed_range_start_idx have been processed + range_start = processed_range_start_idx; + } + if (range_end < delete_start) { + /** + * start_range + * || --------- || - |--------- | + * delete_range + */ + candidate_ranges.emplace_back(range_start, range_end); + } else if (range_start < delete_start) { /** * row_range * || --------|-------- || ----- | * delete_start delete_end */ candidate_ranges.emplace_back(range_start, delete_start); - } else if (range_end <= delete_start) { - /** - * start_range - * || --------- || ----------- | - * delete_range - */ - candidate_ranges.emplace_back(range_start, range_end); } + // range_end > delete_end && range_start > delete_start start_range++; } else { // delete_end < range_end,most of the time, we will use this branch - if (range_start <= delete_start) { + if (processed_range_start_idx < delete_start) { /** * row_range_start row_range_end * || --- | --------- | --- || * delete_range */ - candidate_ranges.emplace_back(range_start_idx, delete_start); - range_start_idx = delete_end + 1; + candidate_ranges.emplace_back(processed_range_start_idx, delete_start); } + // delete_end is in row_range, so it can assign to processed_range_start_idx + processed_range_start_idx = delete_end; delete_range++; if (delete_range == delete_row_ranges.end()) { - candidate_ranges.emplace_back(delete_end + 1, range_end); + range_end = _row_ranges[_row_ranges.size() - 1].last_row; + if (processed_range_start_idx != range_end) { + candidate_ranges.emplace_back(processed_range_start_idx, range_end); + } } } } - } - if (!candidate_ranges.empty()) { _row_ranges.assign(candidate_ranges.begin(), candidate_ranges.end()); } _current_group_reader->set_row_ranges(_row_ranges); @@ -545,7 +554,6 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) { if (!_has_page_index(row_group.columns, page_index)) { return Status::OK(); } - // int64_t buffer_size = page_index._column_index_size; uint8_t col_index_buff[page_index._column_index_size]; int64_t bytes_read = 0; RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start, diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index aaae115bd3..f5891754e7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -79,7 +79,7 @@ public: Status file_metadata(FileMetaData** metadata); - void merge_delete_row_ranges(const std::vector<RowRange>& delete_row_ranges); + void merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges); int64_t size() const { return _file_reader->size(); } @@ -149,7 +149,6 @@ private: const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; std::unique_ptr<FileReader> _file_reader = nullptr; - std::vector<RowRange> _delete_row_ranges; std::vector<RowRange> _row_ranges; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 041c70438f..9e2f659772 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -29,38 +29,64 @@ namespace doris::vectorized { const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; const std::string ICEBERG_ROW_POS = "pos"; +IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params) + : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) { + static const char* iceberg_profile = "IcebergProfile"; + ADD_TIMER(_profile, iceberg_profile); + _iceberg_profile._delete_files_init_time = + ADD_CHILD_TIMER(_profile, "DeleteFileInitTime", iceberg_profile); + _iceberg_profile._delete_files_read_total_time = + ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); +} + +IcebergTableReader::~IcebergTableReader() { + if (_data_path_conjunct_ctx != nullptr) { + _data_path_conjunct_ctx->close(_state); + } +} + Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return _file_format_reader->get_next_block(block, read_rows, eof); } +Status IcebergTableReader::set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) { + return _file_format_reader->set_fill_columns(partition_columns, missing_columns); +} + Status IcebergTableReader::get_columns( std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) { return _file_format_reader->get_columns(name_to_type, missing_cols); } -void IcebergTableReader::filter_rows() { +void IcebergTableReader::filter_rows(const TFileRangeDesc& range) { if (_cur_delete_file_reader == nullptr) { return; } - auto& table_desc = _params.table_format_params.iceberg_params; + SCOPED_TIMER(_iceberg_profile._delete_files_read_total_time); + auto& table_desc = range.table_format_params.iceberg_params; auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { return; } bool eof = false; - std::vector<RowRange> delete_row_ranges; + std::set<RowRange> delete_row_ranges; while (!eof) { size_t read_rows = 0; Block block = Block(); - for (const FieldSchema* field : _column_schemas) { - DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field->type, true); + for (const FieldSchema& field : _column_schemas) { + DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field.type, true); MutableColumnPtr data_column = data_type->create_column(); - block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field->name)); + block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field.name)); } Status st = _cur_delete_file_reader->get_next_block(&block, &read_rows, &eof); if (!st.ok() || eof) { if (!_delete_file_readers.empty()) { + eof = false; _cur_delete_file_reader = std::move(_delete_file_readers.front()); _delete_file_readers.pop_front(); } @@ -88,40 +114,58 @@ void IcebergTableReader::filter_rows() { int64_t row_id = delete_row_ids[i]; int64_t row_range_start = row_id; int64_t row_range_end = row_id; - // todo: add debug info - // todo: asure reading delete file data in file_range only while (i + 1 < num_deleted_ids) { if (delete_row_ids[i + 1] == delete_row_ids[i] + 1) { row_range_end = delete_row_ids[i + 1]; i++; continue; } else { - delete_row_ranges.emplace_back(row_range_start, row_range_end); + delete_row_ranges.emplace(row_range_start, row_range_end + 1); row_range_start = ++row_range_end; break; } } if (i == num_deleted_ids - 1) { - delete_row_ranges.emplace_back(row_range_start, - delete_row_ids[num_deleted_ids - 1]); + delete_row_ranges.emplace(row_range_start, + delete_row_ids[num_deleted_ids - 1] + 1); } row_range_start = delete_row_ids[i + 1]; i++; } } } + if (VLOG_IS_ON(3)) { + if (!delete_row_ranges.empty()) { + std::stringstream out; + out << "["; + for (const RowRange& delete_row_range : delete_row_ranges) { + out << " " << delete_row_range.debug_string(); + } + out << " ]"; + VLOG_NOTICE << "Delete row range info: " << out.str(); + } + } ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get()); parquet_reader->merge_delete_row_ranges(delete_row_ranges); } -Status IcebergTableReader::init_row_filters() { - auto& table_desc = _params.table_format_params.iceberg_params; +Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { + auto& table_desc = range.table_format_params.iceberg_params; auto& version = table_desc.format_version; if (version >= MIN_SUPPORT_DELETE_FILES_VERSION) { + SCOPED_TIMER(_iceberg_profile._delete_files_init_time); auto& delete_file_type = table_desc.content; auto files = table_desc.delete_files; if (delete_file_type == POSITON_DELELE) { // position delete + auto row_desc = RowDescriptor(_state->desc_tbl(), + std::vector<TupleId>({table_desc.delete_table_tuple_id}), + std::vector<bool>({false})); + RETURN_IF_ERROR(VExpr::create_expr_tree( + _state->obj_pool(), table_desc.file_select_conjunct, &_data_path_conjunct_ctx)); + RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc)); + RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state)); + vector<std::string> names; for (auto& delete_file : files) { _position_delete_params.low_bound_index = delete_file.position_lower_bound; _position_delete_params.upper_bound_index = delete_file.position_upper_bound; @@ -134,17 +178,29 @@ Status IcebergTableReader::init_row_filters() { ParquetReader* delete_reader = new ParquetReader( _profile, _params, delete_range, _state->query_options().batch_size, const_cast<cctz::time_zone*>(&_state->timezone_obj())); - FileMetaData* metadata = nullptr; - RETURN_IF_ERROR(delete_reader->file_metadata(&metadata)); - - auto& delete_file_schema = metadata->schema(); - vector<std::string> names; - for (auto i = 0; i < delete_file_schema.size(); ++i) { - const FieldSchema* field = delete_file_schema.get_column(i); - _column_schemas.emplace_back(field); - names.emplace_back(field->name); + if (_delete_file_schema == nullptr) { + FileMetaData* metadata = nullptr; + RETURN_IF_ERROR(delete_reader->file_metadata(&metadata)); + if (metadata == nullptr) { + break; + } + _delete_file_schema = &metadata->schema(); + int num_of_col = _delete_file_schema->size(); + for (auto i = 0; i < num_of_col; ++i) { + const FieldSchema* field = _delete_file_schema->get_column(i); + _column_schemas.emplace_back(*field); + names.emplace_back(field->name); + } } - Status d_st = delete_reader->init_reader(names, false); + DCHECK_EQ(_column_schemas.size(), _delete_file_schema->size()); + // The expr assure reading delete file data from current file range only + Status d_st = + delete_reader->init_reader(names, nullptr, _data_path_conjunct_ctx, false); + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + partition_columns; + std::unordered_map<std::string, VExprContext*> missing_columns; + delete_reader->set_fill_columns(partition_columns, missing_columns); + _delete_file_readers.emplace_back((GenericReader*)delete_reader); ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get()); @@ -161,7 +217,7 @@ Status IcebergTableReader::init_row_filters() { } } // todo: equality delete - filter_rows(); + filter_rows(range); return Status::OK(); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index e862fe6171..c71ffdaa9c 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -23,22 +23,25 @@ #include "table_format_reader.h" #include "vec/exec/format/generic_reader.h" +#include "vec/exprs/vexpr.h" namespace doris::vectorized { class IcebergTableReader : public TableFormatReader { public: IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile, - RuntimeState* state, const TFileScanRangeParams& params) - : TableFormatReader(file_format_reader), - _profile(profile), - _state(state), - _params(params) {} - Status init_row_filters(); - void filter_rows() override; + RuntimeState* state, const TFileScanRangeParams& params); + ~IcebergTableReader() override; + Status init_row_filters(const TFileRangeDesc& range); + void filter_rows(const TFileRangeDesc& range) override; Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) override; + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; @@ -52,13 +55,20 @@ public: }; private: + struct IcebergProfile { + RuntimeProfile::Counter* _delete_files_init_time; + RuntimeProfile::Counter* _delete_files_read_total_time; + }; RuntimeProfile* _profile; RuntimeState* _state; const TFileScanRangeParams& _params; - std::vector<const FieldSchema*> _column_schemas; + std::vector<FieldSchema> _column_schemas; std::deque<std::unique_ptr<GenericReader>> _delete_file_readers; std::unique_ptr<GenericReader> _cur_delete_file_reader; PositionDeleteParams _position_delete_params; + const FieldDescriptor* _delete_file_schema = nullptr; + VExprContext* _data_path_conjunct_ctx = nullptr; + IcebergProfile _iceberg_profile; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 3409135dee..188d41a2b9 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -29,6 +29,7 @@ namespace doris::vectorized { class TableFormatReader : public GenericReader { public: TableFormatReader(GenericReader* file_format_reader); + ~TableFormatReader() override = default; Status get_next_block(Block* block, size_t* read_rows, bool* eof) override { return _file_format_reader->get_next_block(block, read_rows, eof); } @@ -37,7 +38,7 @@ public: return _file_format_reader->get_columns(name_to_type, missing_cols); } - virtual void filter_rows() = 0; + virtual void filter_rows(const TFileRangeDesc& range) = 0; protected: std::string _table_format; // hudi, iceberg diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 75107b4d1d..1d67056920 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -488,11 +488,11 @@ Status VFileScanner::_get_next_reader() { } init_status = parquet_reader->init_reader(_file_col_names, _colname_to_value_range, _push_down_expr); - if (_params.__isset.table_format_params && - _params.table_format_params.table_format_type == "iceberg") { + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "iceberg") { IcebergTableReader* iceberg_reader = new IcebergTableReader( (GenericReader*)parquet_reader, _profile, _state, _params); - iceberg_reader->init_row_filters(); + iceberg_reader->init_row_filters(range); _cur_reader.reset((GenericReader*)iceberg_reader); } else { _cur_reader.reset((GenericReader*)parquet_reader); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index ab0b25b08d..9d77a33278 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -207,7 +207,7 @@ public class ExternalFileScanNode extends ExternalScanNode { scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange); break; case ICEBERG: - scanProvider = new IcebergScanProvider(hmsTable, desc, columnNameToRange); + scanProvider = new IcebergScanProvider(hmsTable, analyzer, desc, columnNameToRange); break; case HIVE: scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java index a1a1749c2f..6c8f916a5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java @@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.FileSplit; @Data public class HiveSplit extends FileSplit { + public HiveSplit() {} public HiveSplit(Path file, long start, long length, String[] hosts) { super(file, start, length, hosts); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index 30b8888fd7..7681d6a66d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -17,8 +17,19 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BaseTableRef; +import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -26,10 +37,15 @@ import org.apache.doris.common.UserException; import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; +import org.apache.doris.thrift.TIcebergTable; +import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableFormatFileDesc; +import org.apache.doris.thrift.TTableType; +import lombok.Data; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputSplit; @@ -59,13 +75,20 @@ import java.util.OptionalLong; public class IcebergScanProvider extends HiveScanProvider { private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; + public static final String V2_DELETE_TBL = "iceberg#delete#tbl"; + public static final String V2_DELETE_DB = "iceberg#delete#db"; + private static final DeleteFileTempTable scanDeleteTable = + new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE); + private final Analyzer analyzer; - public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, - Map<String, ColumnRange> columnNameToRange) { + public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { super(hmsTable, desc, columnNameToRange); + this.analyzer = analyzer; } - public static void setIcebergParams(ExternalFileScanNode.ParamCreateContext context, IcebergSplit icebergSplit) { + public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) + throws UserException { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); TIcebergFileDesc fileDesc = new TIcebergFileDesc(); @@ -74,6 +97,7 @@ public class IcebergScanProvider extends HiveScanProvider { if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { + setPathSelectConjunct(fileDesc, icebergSplit); for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); deleteFileDesc.setPath(filter.getDeleteFilePath()); @@ -99,7 +123,20 @@ public class IcebergScanProvider extends HiveScanProvider { } } tableFormatFileDesc.setIcebergParams(fileDesc); - context.params.setTableFormatParams(tableFormatFileDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit) + throws UserException { + BaseTableRef tableRef = icebergSplit.getDeleteTableRef(); + fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt()); + SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH); + lhs.analyze(icebergSplit.getAnalyzer()); + lhs.getDesc().setIsMaterialized(true); + StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString()); + BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs); + pathSelectConjunct.analyze(icebergSplit.getAnalyzer()); + fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift()); } @Override @@ -134,18 +171,27 @@ public class IcebergScanProvider extends HiveScanProvider { scan = scan.filter(predicate); } List<InputSplit> splits = new ArrayList<>(); - int formatVersion = ((BaseTable) table).operations().current().formatVersion(); + BaseTableRef tableRef = null; + if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { + TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName()); + fullName.analyze(analyzer); + TableRef ref = new TableRef(fullName, fullName.toString(), null); + tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName()); + tableRef.analyze(analyzer); + } for (FileScanTask task : scan.planFiles()) { for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) { String dataFilePath = spitTask.file().path().toString(); IcebergSplit split = new IcebergSplit(new Path(dataFilePath), spitTask.start(), spitTask.length(), new String[0]); split.setFormatVersion(formatVersion); - if (formatVersion == 2) { + if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(spitTask)); + split.setDeleteTableRef(tableRef); } split.setTableFormatType(TableFormatType.ICEBERG); + split.setAnalyzer(analyzer); splits.add(split); } } @@ -193,4 +239,32 @@ public class IcebergScanProvider extends HiveScanProvider { public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException { return Collections.emptyList(); } + + @Data + static class DeleteFileTempTable extends ExternalTable { + public static final String DATA_FILE_PATH = "file_path"; + private final TableName tableName; + private final List<Column> fullSchema = new ArrayList<>(); + + public DeleteFileTempTable(TableType type) { + super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type); + this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL); + Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true); + this.fullSchema.add(dataFilePathCol); + } + + @Override + public List<Column> getFullSchema() { + return fullSchema; + } + + @Override + public TTableDescriptor toThrift() { + TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE, + fullSchema.size(), 0, getName(), ""); + tTableDescriptor.setIcebergTable(tIcebergTable); + return tTableDescriptor; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java index 05ae3c69d3..14fce6caf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java @@ -17,6 +17,9 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BaseTableRef; + import lombok.Data; import org.apache.hadoop.fs.Path; @@ -28,8 +31,10 @@ public class IcebergSplit extends HiveSplit { super(file, start, length, hosts); } + private Analyzer analyzer; private String dataFilePath; private Integer formatVersion; + private BaseTableRef deleteTableRef; private List<IcebergDeleteFileFilter> deleteFileFilters; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index 3066b7354a..a2ce044bb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -73,9 +73,6 @@ public abstract class QueryScanProvider implements FileScanProviderIf { context.params.setFileAttributes(getFileAttributes()); } - if (inputSplit instanceof IcebergSplit) { - IcebergScanProvider.setIcebergParams(context, (IcebergSplit) inputSplit); - } // set hdfs params for hdfs file type. Map<String, String> locationProperties = getLocationProperties(); if (locationType == TFileType.FILE_HDFS) { @@ -96,7 +93,6 @@ public abstract class QueryScanProvider implements FileScanProviderIf { } else if (locationType == TFileType.FILE_S3) { context.params.setProperties(locationProperties); } - TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); @@ -108,7 +104,10 @@ public abstract class QueryScanProvider implements FileScanProviderIf { pathPartitionKeys, false); TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); - + // external data lake table + if (split instanceof IcebergSplit) { + IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) inputSplit); + } curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 2725c67acc..0dd3373b74 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -268,7 +268,10 @@ struct TIcebergFileDesc { 1: optional i32 format_version; // Iceberg file type, 0: data, 1: position delete, 2: equality delete. 2: optional i32 content; + // When open a delete file, filter the data file path with the 'file_path' property 3: optional list<TIcebergDeleteFileDesc> delete_files; + 4: optional Types.TTupleId delete_table_tuple_id; + 5: optional Exprs.TExpr file_select_conjunct; } struct TTableFormatFileDesc { @@ -308,7 +311,7 @@ struct TFileScanRangeParams { 14: optional list<Types.TNetworkAddress> broker_addresses 15: optional TFileAttributes file_attributes 16: optional Exprs.TExpr pre_filter_exprs - // For data lake table format + // Deprecated, For data lake table format 17: optional TTableFormatFileDesc table_format_params // For csv query task, same the column index in file, order by dest_tuple 18: optional list<i32> column_idxs @@ -329,6 +332,8 @@ struct TFileRangeDesc { 6: optional list<string> columns_from_path; // column names from file path, in the same order with columns_from_path 7: optional list<string> columns_from_path_keys; + // For data lake table format + 8: optional TTableFormatFileDesc table_format_params } // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org