This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 54bb19f1fbd [enhance](paimon) opt count pushdown for paimon and refactor be logic (#46911) 54bb19f1fbd is described below commit 54bb19f1fbd582ddd894e007cd644f0284f958a0 Author: Socrates <suyit...@selectdb.com> AuthorDate: Sun Mar 2 22:40:34 2025 +0800 [enhance](paimon) opt count pushdown for paimon and refactor be logic (#46911) ### What problem does this PR solve? Related PR: #44038 1. Obtain directly from statistics for select count(*) from paimon_table 2. Refactor TableFormatReader, move same logic of IcebergTableReader, PaimonTableReader and TransactionalHiveReader to TableFormatReader --- be/src/vec/exec/format/table/iceberg_reader.cpp | 43 ++---- be/src/vec/exec/format/table/iceberg_reader.h | 24 +--- be/src/vec/exec/format/table/paimon_jni_reader.cpp | 23 ++- be/src/vec/exec/format/table/paimon_jni_reader.h | 1 + be/src/vec/exec/format/table/paimon_reader.cpp | 28 ++-- be/src/vec/exec/format/table/paimon_reader.h | 26 ++-- be/src/vec/exec/format/table/table_format_reader.h | 57 ++++++-- .../format/table/transactional_hive_reader.cpp | 15 +- .../exec/format/table/transactional_hive_reader.h | 13 +- be/src/vec/exec/scan/vfile_scanner.cpp | 13 +- .../org/apache/doris/datasource/FileScanNode.java | 46 +++--- .../datasource/iceberg/source/IcebergScanNode.java | 12 +- .../datasource/paimon/source/PaimonScanNode.java | 146 +++++++++++-------- .../datasource/paimon/source/PaimonSplit.java | 15 +- .../paimon/source/PaimonScanNodeTest.java | 155 +++++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 3 + .../paimon/test_paimon_catalog.out | Bin 795479 -> 795039 bytes .../paimon/test_paimon_deletion_vector.out | Bin 0 -> 525 bytes .../paimon/test_paimon_catalog.groovy | 14 -- .../paimon/test_paimon_deletion_vector.groovy | 96 +++++++++++++ 20 files changed, 517 insertions(+), 213 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 3d6d4df129d..daaf3167369 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -77,13 +77,8 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx) - : TableFormatReader(std::move(file_format_reader)), - _profile(profile), - _state(state), - _params(params), - _range(range), - _kv_cache(kv_cache), - _io_ctx(io_ctx) { + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx), + _kv_cache(kv_cache) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -94,31 +89,9 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); - if (range.table_format_params.iceberg_params.__isset.row_count) { - _remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count; - } else { - _remaining_table_level_row_count = -1; - } } -Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - // already get rows from be - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { - auto rows = std::min(_remaining_table_level_row_count, - (int64_t)_state->query_options().batch_size); - _remaining_table_level_row_count -= rows; - auto mutate_columns = block->mutate_columns(); - for (auto& col : mutate_columns) { - col->resize(rows); - } - block->set_columns(std::move(mutate_columns)); - *read_rows = rows; - if (_remaining_table_level_row_count == 0) { - *eof = true; - } - - return Status::OK(); - } +Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_expand_block_if_need(block)); // To support iceberg schema evolution. We change the column name in block to @@ -161,13 +134,13 @@ Status IcebergTableReader::get_columns( return _file_format_reader->get_columns(name_to_type, missing_cols); } -Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { +Status IcebergTableReader::init_row_filters() { // We get the count value by doris's be, so we don't need to read the delete file - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { return Status::OK(); } - const auto& table_desc = range.table_format_params.iceberg_params; + const auto& table_desc = _range.table_format_params.iceberg_params; const auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { return Status::OK(); @@ -545,7 +518,7 @@ Status IcebergParquetReader::init_reader( _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); parquet_reader->iceberg_sanitize(_all_required_col_names); - RETURN_IF_ERROR(init_row_filters(_range, _io_ctx)); + RETURN_IF_ERROR(init_row_filters()); return parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, @@ -617,7 +590,7 @@ Status IcebergOrcReader::init_reader( _gen_file_col_names(); _gen_new_colname_to_value_range(); orc_reader->set_table_col_to_file_col(_table_col_to_file_col); - RETURN_IF_ERROR(init_row_filters(_range, _io_ctx)); + RETURN_IF_ERROR(init_row_filters()); return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index e500538b6f9..2fbf7b5904f 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -29,10 +29,8 @@ #include "exec/olap_common.h" #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" -#include "runtime/runtime_state.h" #include "runtime/types.h" #include "table_format_reader.h" -#include "util/runtime_profile.h" #include "vec/columns/column_dictionary.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" @@ -80,9 +78,9 @@ public: io::IOContext* io_ctx); ~IcebergTableReader() override = default; - Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final; + Status init_row_filters() final; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) final; + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) final; @@ -135,10 +133,6 @@ protected: // Remove the added delete columns Status _shrink_block_if_need(Block* block); - RuntimeProfile* _profile; - RuntimeState* _state; - const TFileScanRangeParams& _params; - const TFileRangeDesc& _range; // owned by scan node ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; @@ -162,13 +156,9 @@ protected: std::vector<std::string> _expand_col_names; std::vector<ColumnWithTypeAndName> _expand_columns; - io::IOContext* _io_ctx; bool _has_schema_change = false; bool _has_iceberg_schema = false; - // the table level row count for optimizing query like: - // select count(*) from table; - int64_t _remaining_table_level_row_count; Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; @@ -212,9 +202,9 @@ public: const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); Status _read_position_delete_file(const TFileRangeDesc* delete_range, - DeleteFile* position_delete) override; + DeleteFile* position_delete) final; - void set_delete_rows() override { + void set_delete_rows() final { auto* parquet_reader = (ParquetReader*)(_file_format_reader.get()); parquet_reader->set_delete_rows(&_iceberg_delete_rows); } @@ -223,7 +213,7 @@ public: protected: std::unique_ptr<GenericReader> _create_equality_reader( - const TFileRangeDesc& delete_desc) override { + const TFileRangeDesc& delete_desc) final { return ParquetReader::create_unique( _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE, const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, _state); @@ -234,7 +224,7 @@ public: ENABLE_FACTORY_CREATOR(IcebergOrcReader); Status _read_position_delete_file(const TFileRangeDesc* delete_range, - DeleteFile* position_delete) override; + DeleteFile* position_delete) final; IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, @@ -242,7 +232,7 @@ public: : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, kv_cache, io_ctx) {} - void set_delete_rows() override { + void set_delete_rows() final { auto* orc_reader = (OrcReader*)_file_format_reader.get(); orc_reader->set_position_delete_rowids(&_iceberg_delete_rows); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index cbcdd2d81bd..a05ea4511f4 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -20,9 +20,9 @@ #include <map> #include "runtime/descriptors.h" +#include "runtime/runtime_state.h" #include "runtime/types.h" #include "vec/core/types.h" - namespace doris { class RuntimeProfile; class RuntimeState; @@ -64,6 +64,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d if (range_params->__isset.serialized_table) { params["serialized_table"] = range_params->serialized_table; } + if (range.table_format_params.__isset.table_level_row_count) { + _remaining_table_level_row_count = range.table_format_params.table_level_row_count; + } else { + _remaining_table_level_row_count = -1; + } // Used to create paimon option for (const auto& kv : range.table_format_params.paimon_params.paimon_options) { @@ -79,6 +84,22 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d } Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { + auto rows = std::min(_remaining_table_level_row_count, + (int64_t)_state->query_options().batch_size); + _remaining_table_level_row_count -= rows; + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { + col->resize(rows); + } + block->set_columns(std::move(mutate_columns)); + *read_rows = rows; + if (_remaining_table_level_row_count == 0) { + *eof = true; + } + + return Status::OK(); + } return _jni_connector->get_next_block(block, read_rows, eof); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 64ef962f0de..b5744428392 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -67,6 +67,7 @@ public: private: std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; + int64_t _remaining_table_level_row_count; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 8e4be026bab..dba8efc20e2 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -20,13 +20,16 @@ #include <vector> #include "common/status.h" +#include "runtime/runtime_state.h" #include "util/deletion_vector.h" namespace doris::vectorized { #include "common/compile_check_begin.h" PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader, - RuntimeProfile* profile, const TFileScanRangeParams& params) - : TableFormatReader(std::move(file_format_reader)), _profile(profile), _params(params) { + RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx) + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx) { static const char* paimon_profile = "PaimonProfile"; ADD_TIMER(_profile, paimon_profile); _paimon_profile.num_delete_rows = @@ -35,15 +38,18 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader, ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile); } -Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { - const auto& table_desc = range.table_format_params.paimon_params; +Status PaimonReader::init_row_filters() { + const auto& table_desc = _range.table_format_params.paimon_params; if (!table_desc.__isset.deletion_file) { return Status::OK(); } // set push down agg type to NONE because we can not do count push down opt // if there are delete files. - _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); + if (!_range.table_format_params.paimon_params.__isset.row_count) { + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); + } + const auto& deletion_file = table_desc.deletion_file; io::FileSystemProperties properties = { .system_type = _params.file_type, @@ -51,9 +57,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext .hdfs_params = _params.hdfs_params, .broker_addresses {}, }; - if (range.__isset.file_type) { + if (_range.__isset.file_type) { // for compatibility - properties.system_type = range.file_type; + properties.system_type = _range.file_type; } if (_params.__isset.broker_addresses) { properties.broker_addresses.assign(_params.broker_addresses.begin(), @@ -64,7 +70,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext .path = deletion_file.path, .file_size = -1, .mtime = 0, - .fs_name = range.fs_name, + .fs_name = _range.fs_name, }; // TODO: cache the file in local @@ -78,7 +84,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext { SCOPED_TIMER(_paimon_profile.delete_files_read_time); RETURN_IF_ERROR( - delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, io_ctx)); + delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, _io_ctx)); } if (bytes_read != deletion_file.length + 4) { return Status::IOError( @@ -99,5 +105,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext } return Status::OK(); } + +Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { + return _file_format_reader->get_next_block(block, read_rows, eof); +} #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 3776b485a45..09501e55773 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -29,10 +29,13 @@ namespace doris::vectorized { class PaimonReader : public TableFormatReader { public: PaimonReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, - const TFileScanRangeParams& params); + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx); ~PaimonReader() override = default; - Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final; + Status init_row_filters() final; + + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; protected: struct PaimonProfile { @@ -40,23 +43,21 @@ protected: RuntimeProfile::Counter* delete_files_read_time; }; std::vector<int64_t> _delete_rows; - RuntimeProfile* _profile; PaimonProfile _paimon_profile; - virtual void set_delete_rows() = 0; -private: - const TFileScanRangeParams& _params; + virtual void set_delete_rows() = 0; }; class PaimonOrcReader final : public PaimonReader { public: ENABLE_FACTORY_CREATOR(PaimonOrcReader); PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, - const TFileScanRangeParams& params) - : PaimonReader(std::move(file_format_reader), profile, params) {}; + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; ~PaimonOrcReader() final = default; - void set_delete_rows() override { + void set_delete_rows() final { (reinterpret_cast<OrcReader*>(_file_format_reader.get())) ->set_position_delete_rowids(&_delete_rows); } @@ -66,11 +67,12 @@ class PaimonParquetReader final : public PaimonReader { public: ENABLE_FACTORY_CREATOR(PaimonParquetReader); PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, - const TFileScanRangeParams& params) - : PaimonReader(std::move(file_format_reader), profile, params) {}; + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; ~PaimonParquetReader() final = default; - void set_delete_rows() override { + void set_delete_rows() final { (reinterpret_cast<ParquetReader*>(_file_format_reader.get())) ->set_delete_rows(&_delete_rows); } diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index b143149c8a6..6c03715c59f 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -17,13 +17,14 @@ #pragma once +#include <algorithm> #include <cstddef> -#include <memory> #include <string> -#include <unordered_map> -#include <unordered_set> #include "common/status.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" #include "vec/exec/format/generic_reader.h" namespace doris { @@ -38,12 +39,44 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" class TableFormatReader : public GenericReader { public: - TableFormatReader(std::unique_ptr<GenericReader> file_format_reader) - : _file_format_reader(std::move(file_format_reader)) {} + TableFormatReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeState* state, + RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : _file_format_reader(std::move(file_format_reader)), + _state(state), + _profile(profile), + _params(params), + _range(range), + _io_ctx(io_ctx) { + if (range.table_format_params.__isset.table_level_row_count) { + _table_level_row_count = range.table_format_params.table_level_row_count; + } else { + _table_level_row_count = -1; + } + } ~TableFormatReader() override = default; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) override { - return _file_format_reader->get_next_block(block, read_rows, eof); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) final { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count >= 0) { + auto rows = + std::min(_table_level_row_count, (int64_t)_state->query_options().batch_size); + _table_level_row_count -= rows; + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { + col->resize(rows); + } + block->set_columns(std::move(mutate_columns)); + *read_rows = rows; + if (_table_level_row_count == 0) { + *eof = true; + } + + return Status::OK(); + } + return get_next_block_inner(block, read_rows, eof); } + + virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) = 0; + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override { return _file_format_reader->get_columns(name_to_type, missing_cols); @@ -63,11 +96,17 @@ public: bool fill_all_columns() const override { return _file_format_reader->fill_all_columns(); } - virtual Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) = 0; + virtual Status init_row_filters() = 0; protected: - std::string _table_format; // hudi, iceberg + std::string _table_format; // hudi, iceberg, paimon std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc + RuntimeState* _state = nullptr; // for query options + RuntimeProfile* _profile = nullptr; + const TFileScanRangeParams& _params; + const TFileRangeDesc& _range; + io::IOContext* _io_ctx = nullptr; + int64_t _table_level_row_count = -1; // for optimization of count(*) push down void _collect_profile_before_close() override { if (_file_format_reader != nullptr) { _file_format_reader->collect_profile_before_close(); diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index d550ff5e1d9..406d58813e9 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -19,7 +19,6 @@ #include <re2/re2.h> -#include "runtime/runtime_state.h" #include "transactional_hive_common.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/exec/format/orc/vorc_reader.h" @@ -41,12 +40,7 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, io::IOContext* io_ctx) - : TableFormatReader(std::move(file_format_reader)), - _profile(profile), - _state(state), - _params(params), - _range(range), - _io_ctx(io_ctx) { + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx) { static const char* transactional_hive_profile = "TransactionalHiveProfile"; ADD_TIMER(_profile, transactional_hive_profile); _transactional_orc_profile.num_delete_files = @@ -74,7 +68,7 @@ Status TransactionalHiveReader::init_reader( return status; } -Status TransactionalHiveReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { +Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { for (const auto& i : TransactionalHive::READ_PARAMS) { DataTypePtr data_type = DataTypeFactory::instance().create_data_type(TypeDescriptor(i.type), false); @@ -93,8 +87,7 @@ Status TransactionalHiveReader::get_columns( return _file_format_reader->get_columns(name_to_type, missing_cols); } -Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, - io::IOContext* io_ctx) { +Status TransactionalHiveReader::init_row_filters() { std::string data_file_path = _range.path; // the path in _range is remove the namenode prefix, // and the file_path in delete file is full path, so we should add it back. @@ -128,7 +121,7 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time); for (const auto& delete_delta : - range.table_format_params.transactional_hive_params.delete_deltas) { + _range.table_format_params.transactional_hive_params.delete_deltas) { const std::string file_name = file_path.filename().string(); //need opt. diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index 9c3f284464c..217f40b3e78 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -28,7 +28,6 @@ #include "common/status.h" #include "exec/olap_common.h" #include "table_format_reader.h" -#include "util/runtime_profile.h" #include "vec/common/hash_table/phmap_fwd_decl.h" namespace doris { @@ -87,12 +86,12 @@ public: io::IOContext* io_ctx); ~TransactionalHiveReader() override = default; - Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) override; + Status init_row_filters() final; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, - std::unordered_set<std::string>* missing_cols) override; + std::unordered_set<std::string>* missing_cols) final; Status init_reader( const std::vector<std::string>& column_names, @@ -109,16 +108,10 @@ private: RuntimeProfile::Counter* delete_files_read_time = nullptr; }; - RuntimeProfile* _profile = nullptr; - RuntimeState* _state = nullptr; - const TFileScanRangeParams& _params; - const TFileRangeDesc& _range; TransactionalHiveProfile _transactional_orc_profile; AcidRowIDSet _delete_rows; std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr; std::vector<std::string> _col_names; - - io::IOContext* _io_ctx = nullptr; }; inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 1812ddbc737..fe0c7315c5d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -27,7 +27,6 @@ #include <algorithm> #include <boost/iterator/iterator_facade.hpp> -#include <iterator> #include <map> #include <ranges> #include <tuple> @@ -1006,8 +1005,8 @@ Status VFileScanner::_get_next_reader() { &_slot_id_to_filter_conjuncts); std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(std::move(parquet_reader), _profile, - *_params); - RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get())); + _state, *_params, range, _io_ctx.get()); + RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); } else { bool hive_parquet_use_column_names = true; @@ -1048,7 +1047,7 @@ Status VFileScanner::_get_next_reader() { _file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); - RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range, _io_ctx.get())); + RETURN_IF_ERROR(tran_orc_reader->init_row_filters()); _cur_reader = std::move(tran_orc_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { @@ -1068,9 +1067,9 @@ Status VFileScanner::_get_next_reader() { &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); - std::unique_ptr<PaimonOrcReader> paimon_reader = - PaimonOrcReader::create_unique(std::move(orc_reader), _profile, *_params); - RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get())); + std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique( + std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get()); + RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); } else { bool hive_orc_use_column_names = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index b7d34312313..8d3aeaa6a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -61,6 +61,8 @@ public abstract class FileScanNode extends ExternalScanNode { // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; + // For display pushdown agg result + protected long tableLevelRowCount = -1; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -82,11 +84,12 @@ public abstract class FileScanNode extends ExternalScanNode { super.toThrift(planNode); } - public long getPushDownCount() { - // 1. Do not use `0`: If the number of entries in the table is 0, - // it is unclear whether optimization has been performed. - // 2. Do not use `null` or `-`: This makes it easier for the program to parse the `explain` data. - return -1; + protected void setPushDownCount(long count) { + tableLevelRowCount = count; + } + + private long getPushDownCount() { + return tableLevelRowCount; } @Override @@ -106,9 +109,9 @@ public abstract class FileScanNode extends ExternalScanNode { output.append("(approximate)"); } output.append("inputSplitNum=").append(selectedSplitNum).append(", totalFileSize=") - .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); + .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum) - .append("\n"); + .append("\n"); if (detailLevel == TExplainLevel.VERBOSE && !isBatchMode()) { output.append(prefix).append("backends:").append("\n"); @@ -133,25 +136,25 @@ public abstract class FileScanNode extends ExternalScanNode { if (size <= 4) { for (TFileRangeDesc file : fileRangeDescs) { output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); } } else { for (int i = 0; i < 3; i++) { TFileRangeDesc file = fileRangeDescs.get(i); output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); } int other = size - 4; output.append(prefix).append(" ... other ").append(other).append(" files ...\n"); TFileRangeDesc file = fileRangeDescs.get(size - 1); output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); } } } @@ -182,10 +185,10 @@ public abstract class FileScanNode extends ExternalScanNode { } protected void setDefaultValueExprs(TableIf tbl, - Map<String, SlotDescriptor> slotDescByName, - Map<String, Expr> exprByName, - TFileScanRangeParams params, - boolean useVarcharAsNull) throws UserException { + Map<String, SlotDescriptor> slotDescByName, + Map<String, Expr> exprByName, + TFileScanRangeParams params, + boolean useVarcharAsNull) throws UserException { Preconditions.checkNotNull(tbl); TExpr tExpr = new TExpr(); tExpr.setNodes(Lists.newArrayList()); @@ -222,7 +225,8 @@ public abstract class FileScanNode extends ExternalScanNode { // if slot desc is null, which mean it is an unrelated slot, just skip. // eg: // (a, b, c) set (x=a, y=b, z=c) - // c does not exist in file, the z will be filled with null, even if z has default value. + // c does not exist in file, the z will be filled with null, even if z has + // default value. // and if z is not nullable, the load will fail. if (slotDesc != null) { if (expr != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 0a9269ce860..e5c140da53b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -153,12 +153,12 @@ public class IcebergScanNode extends FileQueryScanNode { private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); + if (tableLevelPushDownCount) { + tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount()); + } TIcebergFileDesc fileDesc = new TIcebergFileDesc(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); - if (tableLevelPushDownCount) { - fileDesc.setRowCount(icebergSplit.getTableLevelRowCount()); - } if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { @@ -336,6 +336,7 @@ public class IcebergScanNode extends FileQueryScanNode { } else { pushDownCountSplits = Collections.singletonList(splits.get(0)); } + setPushDownCount(countFromSnapshot); assignCountToSplits(pushDownCountSplits, countFromSnapshot); return pushDownCountSplits; } @@ -476,11 +477,6 @@ public class IcebergScanNode extends FileQueryScanNode { super.toThrift(planNode); } - @Override - public long getPushDownCount() { - return getCountFromSnapshot(); - } - @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { if (pushdownIcebergPredicates.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 59e7eed5d42..beb59e40462 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPaimonFileDesc; import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -56,6 +57,7 @@ import org.apache.paimon.utils.InstantiationUtil; import java.io.IOException; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,8 +107,6 @@ public class PaimonScanNode extends FileQueryScanNode { private int paimonSplitNum = 0; private List<SplitStat> splitStats = new ArrayList<>(); private String serializedTable; - - private boolean pushDownCount = false; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; public PaimonScanNode(PlanNodeId id, @@ -187,7 +187,8 @@ public class PaimonScanNode extends FileQueryScanNode { fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); - // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + // The hadoop conf should be same with + // PaimonExternalCatalog.createCatalog()#getConfiguration() fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { @@ -198,10 +199,41 @@ public class PaimonScanNode extends FileQueryScanNode { tDeletionFile.setLength(deletionFile.length()); fileDesc.setDeletionFile(tDeletionFile); } + if (paimonSplit.getRowCount().isPresent()) { + tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get()); + } tableFormatFileDesc.setPaimonParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } + @VisibleForTesting + public static Optional<Long> calcuteTableLevelCount(List<org.apache.paimon.table.source.Split> paimonSplits) { + // check if all splits don't have deletion vector or cardinality of every + // deletion vector is not null + long totalCount = 0; + long deletionVectorCount = 0; + + for (org.apache.paimon.table.source.Split s : paimonSplits) { + totalCount += s.rowCount(); + + Optional<List<DeletionFile>> deletionFiles = s.deletionFiles(); + if (deletionFiles.isPresent()) { + for (DeletionFile dv : deletionFiles.get()) { + if (dv != null) { + Long cardinality = dv.cardinality(); + if (cardinality == null) { + // if there is a null deletion vector, we can't calculate the table level count + return Optional.empty(); + } else { + deletionVectorCount += cardinality; + } + } + } + } + } + return Optional.of(totalCount - deletionVectorCount); + } + @Override public List<Split> getSplits(int numBackends) throws UserException { boolean forceJniScanner = sessionVariable.isForceJniScanner(); @@ -245,38 +277,34 @@ public class PaimonScanNode extends FileQueryScanNode { splitStat.setType(SplitReadType.NATIVE); splitStat.setRawFileConvertable(true); List<RawFile> rawFiles = optRawFiles.get(); - if (optDeletionFiles.isPresent()) { - List<DeletionFile> deletionFiles = optDeletionFiles.get(); - for (int i = 0; i < rawFiles.size(); i++) { - RawFile file = rawFiles.get(i); - DeletionFile deletionFile = deletionFiles.get(i); - LocationPath locationPath = new LocationPath(file.path(), - source.getCatalog().getProperties()); - try { - List<Split> dorisSplits = FileSplitter.splitFile( - locationPath, - getRealFileSplitSize(0), - null, - file.length(), - -1, - true, - null, - PaimonSplit.PaimonSplitCreator.DEFAULT); - for (Split dorisSplit : dorisSplits) { - // the element in DeletionFiles might be null - if (deletionFile != null) { - splitStat.setHasDeletionVector(true); - ((PaimonSplit) dorisSplit).setDeletionFile(deletionFile); - } - splits.add(dorisSplit); + for (int i = 0; i < rawFiles.size(); i++) { + RawFile file = rawFiles.get(i); + LocationPath locationPath = new LocationPath(file.path(), + source.getCatalog().getProperties()); + try { + List<Split> dorisSplits = FileSplitter.splitFile( + locationPath, + // if applyCountPushdown is true, we can't to split the file + // becasue the raw file and deletion vector is one-to-one mapping + getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0), + null, + file.length(), + -1, + true, + null, + PaimonSplit.PaimonSplitCreator.DEFAULT); + for (Split dorisSplit : dorisSplits) { + // try to set deletion file + if (optDeletionFiles.isPresent() && optDeletionFiles.get().get(i) != null) { + ((PaimonSplit) dorisSplit).setDeletionFile(optDeletionFiles.get().get(i)); + splitStat.setHasDeletionVector(true); } - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); } + splits.addAll(dorisSplits); + ++rawFileSplitNum; + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); } - } else { - createRawFileSplits(rawFiles, splits, applyCountPushdown ? Long.MAX_VALUE : 0); } } else { if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { @@ -295,31 +323,30 @@ public class PaimonScanNode extends FileQueryScanNode { splitStats.add(splitStat); } - this.selectedPartitionNum = selectedPartitionValues.size(); - // TODO: get total partition number - return splits; - } - - private void createRawFileSplits(List<RawFile> rawFiles, List<Split> splits, long blockSize) throws UserException { - for (RawFile file : rawFiles) { - LocationPath locationPath = new LocationPath(file.path(), - source.getCatalog().getProperties()); - try { - splits.addAll( - FileSplitter.splitFile( - locationPath, - getRealFileSplitSize(blockSize), - null, - file.length(), - -1, - true, - null, - PaimonSplit.PaimonSplitCreator.DEFAULT)); - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); + // if applyCountPushdown is true, calcute row count for count pushdown + if (applyCountPushdown) { + // we can create a special empty split and skip the plan process + if (splits.isEmpty()) { + return splits; + } + Optional<Long> optTableLevelCount = calcuteTableLevelCount(paimonSplits); + if (optTableLevelCount.isPresent()) { + long tableLevelRowCount = optTableLevelCount.get(); + List<Split> pushDownCountSplits; + if (tableLevelRowCount > COUNT_WITH_PARALLEL_SPLITS) { + int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends; + pushDownCountSplits = splits.subList(0, Math.min(splits.size(), minSplits)); + } else { + pushDownCountSplits = Collections.singletonList(splits.get(0)); + } + setPushDownCount(tableLevelRowCount); + assignCountToSplits(pushDownCountSplits, tableLevelRowCount); + return pushDownCountSplits; } } + + this.selectedPartitionNum = selectedPartitionValues.size(); + return splits; } private String getFileFormat(String path) { @@ -405,4 +432,13 @@ public class PaimonScanNode extends FileQueryScanNode { } return sb.toString(); } + + private void assignCountToSplits(List<Split> splits, long totalCount) { + int size = splits.size(); + long countPerSplit = totalCount / size; + for (int i = 0; i < size - 1; i++) { + ((PaimonSplit) splits.get(i)).setRowCount(countPerSplit); + } + ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 988f043ad0e..f4d3d724089 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -36,13 +36,13 @@ public class PaimonSplit extends FileSplit { private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap()); private Split split; private TableFormatType tableFormatType; - private Optional<DeletionFile> optDeletionFile; + private Optional<DeletionFile> optDeletionFile = Optional.empty(); + private Optional<Long> optRowCount = Optional.empty(); public PaimonSplit(Split split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; - this.optDeletionFile = Optional.empty(); if (split instanceof DataSplit) { List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles(); @@ -57,7 +57,6 @@ public class PaimonSplit extends FileSplit { String[] hosts, List<String> partitionList) { super(file, start, length, fileLength, modificationTime, hosts, partitionList); this.tableFormatType = TableFormatType.PAIMON; - this.optDeletionFile = Optional.empty(); this.selfSplitWeight = length; } @@ -90,6 +89,14 @@ public class PaimonSplit extends FileSplit { this.optDeletionFile = Optional.of(deletionFile); } + public Optional<Long> getRowCount() { + return optRowCount; + } + + public void setRowCount(long rowCount) { + this.optRowCount = Optional.of(rowCount); + } + public static class PaimonSplitCreator implements SplitCreator { static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator(); @@ -103,7 +110,7 @@ public class PaimonSplit extends FileSplit { long modificationTime, String[] hosts, List<String> partitionValues) { - PaimonSplit split = new PaimonSplit(path, start, length, fileLength, + PaimonSplit split = new PaimonSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); split.setTargetSplitSize(fileSplitSize); return split; diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java new file mode 100644 index 00000000000..f67f3a93977 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon.source; + +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.table.source.Split; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class PaimonScanNodeTest { + + @Test + public void testCalcuteTableLevelCount() { + List<Split> splits = new ArrayList<>(); + + // Create mock splits with row count and deletion files + Split split1 = new Split() { + @Override + public long rowCount() { + return 100; + } + + @Override + public Optional<List<DeletionFile>> deletionFiles() { + List<DeletionFile> deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path1", 0, 10, 10L)); + deletionFiles.add(new DeletionFile("path2", 0, 20, 20L)); + return Optional.of(deletionFiles); + } + }; + + Split split2 = new Split() { + @Override + public long rowCount() { + return 200; + } + + @Override + public Optional<List<DeletionFile>> deletionFiles() { + List<DeletionFile> deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path3", 0, 30, 30L)); + deletionFiles.add(new DeletionFile("path4", 0, 40, 40L)); + return Optional.of(deletionFiles); + } + }; + + splits.add(split1); + splits.add(split2); + + Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(200, result.get().longValue()); + } + + @Test + public void testCalcuteTableLevelCountWithNullDeletionFile() { + List<Split> splits = new ArrayList<>(); + + // Create mock splits with row count and null deletion files + Split split1 = new Split() { + @Override + public long rowCount() { + return 100; + } + + @Override + public Optional<List<DeletionFile>> deletionFiles() { + List<DeletionFile> deletionFiles = new ArrayList<>(); + deletionFiles.add(null); + deletionFiles.add(new DeletionFile("path2", 0, 20, 20L)); + return Optional.of(deletionFiles); + } + }; + + Split split2 = new Split() { + @Override + public long rowCount() { + return 200; + } + + @Override + public Optional<List<DeletionFile>> deletionFiles() { + return Optional.empty(); + } + }; + + splits.add(split1); + splits.add(split2); + + Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(280, result.get().longValue()); + } + + @Test + public void testCalcuteTableLevelCountWithNullCardinality() { + List<Split> splits = new ArrayList<>(); + + // Create mock splits with row count and deletion files with null cardinality + Split split1 = new Split() { + @Override + public long rowCount() { + return 100; + } + + @Override + public Optional<List<DeletionFile>> deletionFiles() { + List<DeletionFile> deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path1", 0, 10, null)); + deletionFiles.add(new DeletionFile("path2", 0, 20, 20L)); + return Optional.of(deletionFiles); + } + }; + + Split split2 = new Split() { + @Override + public long rowCount() { + return 200; + } + + @Override + public Optional<List<DeletionFile>> deletionFiles() { + List<DeletionFile> deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path3", 0, 30, 30L)); + deletionFiles.add(null); + return Optional.of(deletionFiles); + } + }; + + splits.add(split1); + splits.add(split2); + + Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits); + Assert.assertFalse(result.isPresent()); + } +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index a46cc8f7299..f6d795259c1 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -296,6 +296,7 @@ struct TIcebergFileDesc { // Deprecated 5: optional Exprs.TExpr file_select_conjunct; 6: optional string original_file_path; + // Deprecated 7: optional i64 row_count; } @@ -320,6 +321,7 @@ struct TPaimonFileDesc { 12: optional TPaimonDeletionFileDesc deletion_file; 13: optional map<string, string> hadoop_conf // deprecated 14: optional string paimon_table // deprecated + 15: optional i64 row_count // deprecated } struct TTrinoConnectorFileDesc { @@ -387,6 +389,7 @@ struct TTableFormatFileDesc { 6: optional TMaxComputeFileDesc max_compute_params 7: optional TTrinoConnectorFileDesc trino_connector_params 8: optional TLakeSoulFileDesc lakesoul_params + 9: optional i64 table_level_row_count } enum TTextSerdeType { diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out index a394836625d..f3b44964915 100644 Binary files a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out and b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out differ diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out new file mode 100644 index 00000000000..f0b1e92a088 Binary files /dev/null and b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out differ diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 9668cbb0950..41afb02e0f9 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -181,13 +181,6 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ def c108= """ select id from tb_with_upper_case where id = 1 """ def c109= """ select id from tb_with_upper_case where id < 1 """ - def c110 = """select count(*) from deletion_vector_orc;""" - def c111 = """select count(*) from deletion_vector_parquet;""" - def c112 = """select count(*) from deletion_vector_orc where id > 2;""" - def c113 = """select count(*) from deletion_vector_parquet where id > 2;""" - def c114 = """select * from deletion_vector_orc where id > 2;""" - def c115 = """select * from deletion_vector_parquet where id > 2;""" - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") String catalog_name = "ctl_test_paimon_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -296,13 +289,6 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ qt_c107 c107 qt_c108 c108 qt_c109 c109 - - qt_c110 c110 - qt_c111 c111 - qt_c112 c112 - qt_c113 c113 - qt_c114 c114 - qt_c115 c115 } test_cases("false", "false") diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy new file mode 100644 index 00000000000..fade251ed56 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_deletion_vector", "p0,external,doris,external_docker,external_docker_doris") { + + logger.info("start paimon test") + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + + try { + String catalog_name = "test_paimon_deletion_vector" + String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type" = "paimon", + "paimon.catalog.type"="filesystem", + "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" + );""" + sql """use `${catalog_name}`.`db1`""" + + def test_cases = { String force -> + sql """ set force_jni_scanner=${force} """ + qt_1 """select count(*) from deletion_vector_orc;""" + qt_2 """select count(*) from deletion_vector_parquet;""" + qt_3 """select count(*) from deletion_vector_orc where id > 2;""" + qt_4 """select count(*) from deletion_vector_parquet where id > 2;""" + qt_5 """select * from deletion_vector_orc where id > 2 order by id;""" + qt_6 """select * from deletion_vector_parquet where id > 2 order by id;""" + qt_7 """select * from deletion_vector_table_1_0 order by id;""" + qt_8 """select count(*) from deletion_vector_table_1_0;""" + qt_9 """select count(*) from deletion_vector_table_1_0 where id > 2;""" + } + + def test_table_count_push_down = { String force -> + sql """ set force_jni_scanner=${force} """ + explain { + sql("select count(*) from deletion_vector_orc;") + contains "pushdown agg=COUNT (-1)" + } + explain { + sql("select count(*) from deletion_vector_parquet;") + contains "pushdown agg=COUNT (-1)" + } + explain { + sql("select count(*) from deletion_vector_table_1_0;") + contains "pushdown agg=COUNT (8)" + } + } + + def test_not_table_count_push_down = { String force -> + sql """ set enable_count_push_down_for_external_table=false; """ + sql """ set force_jni_scanner=${force} """ + explain { + sql("select count(*) from deletion_vector_orc;") + contains "pushdown agg=NONE" + } + explain { + sql("select count(*) from deletion_vector_parquet;") + contains "pushdown agg=NONE" + } + explain { + sql("select count(*) from deletion_vector_table_1_0;") + contains "pushdown agg=NONE" + } + } + + test_cases("false") + test_cases("true") + test_table_count_push_down("false") + test_table_count_push_down("true") + test_not_table_count_push_down("false") + test_not_table_count_push_down("true") + } finally { + sql """ set enable_count_push_down_for_external_table=true; """ + sql """set force_jni_scanner=false""" + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org