This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1a5933c354b [feature](Paimon) support deletion vector for Paimon naive reader (#34743) 1a5933c354b is described below commit 1a5933c354b00c0b98df23d631a9bc65c01df61b Author: 苏小刚 <suxiaogang...@icloud.com> AuthorDate: Mon May 20 15:26:30 2024 +0800 [feature](Paimon) support deletion vector for Paimon naive reader (#34743) support deletion vector for Paimon naive reader --- be/src/util/deletion_vector.h | 81 +++++++++++++ be/src/vec/exec/format/orc/vorc_reader.cpp | 20 +++- be/src/vec/exec/format/orc/vorc_reader.h | 2 + be/src/vec/exec/format/table/iceberg_reader.cpp | 11 -- be/src/vec/exec/format/table/iceberg_reader.h | 7 -- .../{paimon_reader.cpp => paimon_jni_reader.cpp} | 2 +- .../table/{paimon_reader.h => paimon_jni_reader.h} | 4 +- be/src/vec/exec/format/table/paimon_reader.cpp | 126 +++++++++++---------- be/src/vec/exec/format/table/paimon_reader.h | 88 +++++++------- be/src/vec/exec/format/table/table_format_reader.h | 9 ++ .../format/table/transactional_hive_reader.cpp | 11 -- .../exec/format/table/transactional_hive_reader.h | 7 -- be/src/vec/exec/scan/vfile_scanner.cpp | 24 ++++ .../datasource/paimon/source/PaimonScanNode.java | 117 +++++++++++++++++-- .../datasource/paimon/source/PaimonSplit.java | 27 +++-- fe/pom.xml | 4 +- gensrc/thrift/PlanNodes.thrift | 7 ++ .../paimon/test_paimon_catalog.out | 20 ++++ .../paimon/test_paimon_catalog.groovy | 11 ++ 19 files changed, 407 insertions(+), 171 deletions(-) diff --git a/be/src/util/deletion_vector.h b/be/src/util/deletion_vector.h new file mode 100644 index 00000000000..45bd066c2ed --- /dev/null +++ b/be/src/util/deletion_vector.h @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <cstdint> +#include <cstring> +#include <stdexcept> + +#include "common/status.h" +#include "roaring/roaring.hh" + +namespace doris { +class DeletionVector { +public: + const static uint32_t MAGIC_NUMBER = 1581511376; + DeletionVector(roaring::Roaring roaring_bitmap) : _roaring_bitmap(std::move(roaring_bitmap)) {}; + ~DeletionVector() = default; + + bool checked_delete(uint32_t postition) { return _roaring_bitmap.addChecked(postition); } + + bool is_delete(uint32_t postition) const { return _roaring_bitmap.contains(postition); } + + bool is_empty() const { return _roaring_bitmap.isEmpty(); } + + uint32_t maximum() const { return _roaring_bitmap.maximum(); } + + uint32_t minimum() const { return _roaring_bitmap.minimum(); } + + static Result<DeletionVector> deserialize(const char* buf, size_t length) { + uint32_t actual_length; + std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4); + // change byte order to big endian + std::reverse(reinterpret_cast<char*>(&actual_length), + reinterpret_cast<char*>(&actual_length) + 4); + buf += 4; + if (actual_length != length - 4) { + return ResultError( + Status::RuntimeError("DeletionVector deserialize error: length not match, " + "actual length: {}, expect length: {}", + actual_length, length - 4)); + } + uint32_t magic_number; + std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4); + // change byte order to big endian + std::reverse(reinterpret_cast<char*>(&magic_number), + reinterpret_cast<char*>(&magic_number) + 4); + buf += 4; + if (magic_number != MAGIC_NUMBER) { + return ResultError(Status::RuntimeError( + "DeletionVector deserialize error: invalid magic number {}", magic_number)); + } + roaring::Roaring roaring_bitmap; + try { + roaring_bitmap = roaring::Roaring::readSafe(buf, length); + } catch (std::runtime_error) { + return ResultError(Status::RuntimeError( + "DeletionVector deserialize error: failed to deserialize roaring bitmap")); + } + return DeletionVector(roaring_bitmap); + } + +private: + roaring::Roaring _roaring_bitmap; +}; +} // namespace doris diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 942eba59d08..44cfe8b4060 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -192,6 +192,7 @@ void OrcReader::_collect_profile_before_close() { COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time); COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time); COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time); + COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time); if (_file_input_stream != nullptr) { _file_input_stream->collect_profile_before_close(); @@ -225,6 +226,8 @@ void OrcReader::_init_profile() { ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1); _orc_profile.decode_null_map_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1); + _orc_profile.filter_block_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1); } } @@ -1605,8 +1608,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } _execute_filter_position_delete_rowids(*_filter); - - RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, *_filter)); + { + SCOPED_RAW_TIMER(&_statistics.decode_null_map_time); + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, *_filter)); + } if (!_not_single_slot_filter_conjuncts.empty()) { static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); RETURN_IF_CATCH_EXCEPTION( @@ -1733,8 +1739,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } _execute_filter_position_delete_rowids(result_filter); - RETURN_IF_CATCH_EXCEPTION( - Block::filter_block_internal(block, columns_to_filter, result_filter)); + { + SCOPED_RAW_TIMER(&_statistics.filter_block_time); + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, result_filter)); + } if (!_not_single_slot_filter_conjuncts.empty()) { static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); RETURN_IF_CATCH_EXCEPTION( @@ -1748,15 +1757,16 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } else { if (_delete_rows_filter_ptr) { _execute_filter_position_delete_rowids(*_delete_rows_filter_ptr); + SCOPED_RAW_TIMER(&_statistics.filter_block_time); RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, (*_delete_rows_filter_ptr))); } else { std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1)); _execute_filter_position_delete_rowids(*filter); + SCOPED_RAW_TIMER(&_statistics.filter_block_time); RETURN_IF_CATCH_EXCEPTION( Block::filter_block_internal(block, columns_to_filter, (*filter))); } - Block::erase_useless_column(block, column_to_keep); static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index f5bb7004ca2..08b576fe90c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -127,6 +127,7 @@ public: int64_t set_fill_column_time = 0; int64_t decode_value_time = 0; int64_t decode_null_map_time = 0; + int64_t filter_block_time = 0; }; OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, @@ -222,6 +223,7 @@ private: RuntimeProfile::Counter* set_fill_column_time = nullptr; RuntimeProfile::Counter* decode_value_time = nullptr; RuntimeProfile::Counter* decode_null_map_time = nullptr; + RuntimeProfile::Counter* filter_block_time = nullptr; }; class ORCFilterImpl : public orc::ORCFilter { diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 8e4cdbd311a..730f7e44aef 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -151,17 +151,6 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* return _shrink_block_if_need(block); } -Status IcebergTableReader::set_fill_columns( - const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& - partition_columns, - const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) { - return _file_format_reader->set_fill_columns(partition_columns, missing_columns); -} - -bool IcebergTableReader::fill_all_columns() const { - return _file_format_reader->fill_all_columns(); -}; - Status IcebergTableReader::get_columns( std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) { diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index cda50015911..c0992095c83 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -84,13 +84,6 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) final; - Status set_fill_columns( - const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& - partition_columns, - const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) final; - - bool fill_all_columns() const final; - Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) final; diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp similarity index 99% copy from be/src/vec/exec/format/table/paimon_reader.cpp copy to be/src/vec/exec/format/table/paimon_jni_reader.cpp index 28282a2bf97..06d24466104 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "paimon_reader.h" +#include "paimon_jni_reader.h" #include <map> #include <ostream> diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h similarity index 97% copy from be/src/vec/exec/format/table/paimon_reader.h copy to be/src/vec/exec/format/table/paimon_jni_reader.h index 9a0ae8d5648..162c6ff2cdb 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -17,8 +17,7 @@ #pragma once -#include <stddef.h> - +#include <cstddef> #include <memory> #include <string> #include <unordered_map> @@ -28,6 +27,7 @@ #include "common/status.h" #include "exec/olap_common.h" #include "vec/exec/format/generic_reader.h" +#include "vec/exec/format/table/table_format_reader.h" #include "vec/exec/jni_connector.h" namespace doris { diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 28282a2bf97..6a9bad3fb1d 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -17,78 +17,80 @@ #include "paimon_reader.h" -#include <map> -#include <ostream> +#include <vector> -#include "runtime/descriptors.h" -#include "runtime/types.h" -#include "vec/core/types.h" - -namespace doris { -class RuntimeProfile; -class RuntimeState; - -namespace vectorized { -class Block; -} // namespace vectorized -} // namespace doris +#include "common/status.h" +#include "util/deletion_vector.h" namespace doris::vectorized { +PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader, + RuntimeProfile* profile, const TFileScanRangeParams& params) + : TableFormatReader(std::move(file_format_reader)), _profile(profile), _params(params) { + static const char* paimon_profile = "PaimonProfile"; + ADD_TIMER(_profile, paimon_profile); + _paimon_profile.num_delete_rows = + ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, paimon_profile); + _paimon_profile.delete_files_read_time = + ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile); +} -const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix."; - -PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, - RuntimeState* state, RuntimeProfile* profile, - const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { - std::vector<std::string> column_names; - std::vector<std::string> column_types; - for (auto& desc : _file_slot_descs) { - column_names.emplace_back(desc->col_name()); - column_types.emplace_back(JniConnector::get_jni_type(desc->type())); +Status PaimonReader::init_row_filters(const TFileRangeDesc& range) { + const auto& table_desc = range.table_format_params.paimon_params; + if (!table_desc.__isset.deletion_file) { + return Status::OK(); } - std::map<String, String> params; - params["db_name"] = range.table_format_params.paimon_params.db_name; - params["table_name"] = range.table_format_params.paimon_params.table_name; - params["paimon_split"] = range.table_format_params.paimon_params.paimon_split; - params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names; - params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate; - params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id); - params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id); - params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id); - params["last_update_time"] = - std::to_string(range.table_format_params.paimon_params.last_update_time); - params["required_fields"] = join(column_names, ","); - params["columns_types"] = join(column_types, "#"); - // Used to create paimon option - for (auto& kv : range.table_format_params.paimon_params.paimon_options) { - params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; + const auto& deletion_file = table_desc.deletion_file; + io::FileSystemProperties properties = { + .system_type = _params.file_type, + .properties = _params.properties, + .hdfs_params = _params.hdfs_params, + }; + if (range.__isset.file_type) { + // for compatibility + properties.system_type = range.file_type; } - _jni_connector = std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner", - params, column_names); -} - -Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof)); - if (*eof) { - RETURN_IF_ERROR(_jni_connector->close()); + if (_params.__isset.broker_addresses) { + properties.broker_addresses.assign(_params.broker_addresses.begin(), + _params.broker_addresses.end()); } - return Status::OK(); -} -Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, - std::unordered_set<std::string>* missing_cols) { - for (auto& desc : _file_slot_descs) { - name_to_type->emplace(desc->col_name(), desc->type()); + io::FileDescription file_description = { + .path = deletion_file.path, + .file_size = -1, + .mtime = 0, + .fs_name = range.fs_name, + }; + + // TODO: cache the file in local + auto delete_file_reader = DORIS_TRY(FileFactory::create_file_reader( + properties, file_description, io::FileReaderOptions::DEFAULT)); + // the reason of adding 4: https://github.com/apache/paimon/issues/3313 + size_t bytes_read = deletion_file.length + 4; + // TODO: better way to alloc memeory + std::vector<char> buf(bytes_read); + Slice result(buf.data(), bytes_read); + { + SCOPED_TIMER(_paimon_profile.delete_files_read_time); + RETURN_IF_ERROR(delete_file_reader->read_at(deletion_file.offset, result, &bytes_read)); + } + if (bytes_read != deletion_file.length + 4) { + return Status::IOError( + "failed to read deletion vector, deletion file path: {}, offset: {}, expect " + "length: {}, real " + "length: {}", + deletion_file.path, deletion_file.offset, deletion_file.length + 4, bytes_read); + } + auto deletion_vector = DORIS_TRY(DeletionVector::deserialize(result.data, result.size)); + if (!deletion_vector.is_empty()) { + for (auto i = deletion_vector.minimum(); i <= deletion_vector.maximum(); i++) { + if (deletion_vector.is_delete(i)) { + _delete_rows.push_back(i); + } + } + COUNTER_UPDATE(_paimon_profile.num_delete_rows, _delete_rows.size()); + set_delete_rows(); } return Status::OK(); } - -Status PaimonJniReader::init_reader( - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { - _colname_to_value_range = colname_to_value_range; - RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); - return _jni_connector->open(_state, _profile); -} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 9a0ae8d5648..942faf8d7be 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -17,61 +17,61 @@ #pragma once -#include <stddef.h> - #include <memory> -#include <string> -#include <unordered_map> -#include <unordered_set> #include <vector> -#include "common/status.h" -#include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" - -namespace doris { -class RuntimeProfile; -class RuntimeState; -class SlotDescriptor; -namespace vectorized { -class Block; -} // namespace vectorized -struct TypeDescriptor; -} // namespace doris +#include "vec/exec/format/orc/vorc_reader.h" +#include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/exec/format/table/table_format_reader.h" namespace doris::vectorized { - -/** - * The demo usage of JniReader, showing how to read data from java scanner. - * The java side is also a mock reader that provide values for each type. - * This class will only be retained during the functional testing phase to verify that - * the communication and data exchange with the jvm are correct. - */ -class PaimonJniReader : public GenericReader { - ENABLE_FACTORY_CREATOR(PaimonJniReader); - +class PaimonReader : public TableFormatReader { public: - static const std::string PAIMON_OPTION_PREFIX; - PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, - RuntimeProfile* profile, const TFileRangeDesc& range); + PaimonReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, + const TFileScanRangeParams& params); + ~PaimonReader() override = default; - ~PaimonJniReader() override = default; + Status init_row_filters(const TFileRangeDesc& range) final; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; +protected: + struct PaimonProfile { + RuntimeProfile::Counter* num_delete_rows; + RuntimeProfile::Counter* delete_files_read_time; + }; + std::vector<int64_t> _delete_rows; + RuntimeProfile* _profile; + PaimonProfile _paimon_profile; + virtual void set_delete_rows() = 0; - Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, - std::unordered_set<std::string>* missing_cols) override; +private: + const TFileScanRangeParams& _params; +}; - Status init_reader( - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); +class PaimonOrcReader final : public PaimonReader { +public: + ENABLE_FACTORY_CREATOR(PaimonOrcReader); + PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, + const TFileScanRangeParams& params) + : PaimonReader(std::move(file_format_reader), profile, params) {}; + ~PaimonOrcReader() final = default; -private: - const std::vector<SlotDescriptor*>& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; - std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; - std::unique_ptr<JniConnector> _jni_connector; + void set_delete_rows() override { + (reinterpret_cast<OrcReader*>(_file_format_reader.get())) + ->set_position_delete_rowids(&_delete_rows); + } }; +class PaimonParquetReader final : public PaimonReader { +public: + ENABLE_FACTORY_CREATOR(PaimonParquetReader); + PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, + const TFileScanRangeParams& params) + : PaimonReader(std::move(file_format_reader), profile, params) {}; + ~PaimonParquetReader() final = default; + + void set_delete_rows() override { + (reinterpret_cast<ParquetReader*>(_file_format_reader.get())) + ->set_delete_rows(&_delete_rows); + } +}; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 9426d116334..7660a443712 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -55,6 +55,15 @@ public: return _file_format_reader->get_parsed_schema(col_names, col_types); } + Status set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override { + return _file_format_reader->set_fill_columns(partition_columns, missing_columns); + } + + bool fill_all_columns() const override { return _file_format_reader->fill_all_columns(); } + virtual Status init_row_filters(const TFileRangeDesc& range) = 0; protected: diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index c2ce9353694..85bfbed0713 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -84,17 +84,6 @@ Status TransactionalHiveReader::get_next_block(Block* block, size_t* read_rows, return res; } -Status TransactionalHiveReader::set_fill_columns( - const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& - partition_columns, - const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) { - return _file_format_reader->set_fill_columns(partition_columns, missing_columns); -} - -bool TransactionalHiveReader::fill_all_columns() const { - return _file_format_reader->fill_all_columns(); -}; - Status TransactionalHiveReader::get_columns( std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) { diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index 41d611a468e..81f472728b4 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -93,13 +93,6 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; - Status set_fill_columns( - const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& - partition_columns, - const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override; - - bool fill_all_columns() const override; - Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 421f7ab5b22..afa5ad21457 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -62,6 +62,7 @@ #include "vec/exec/format/table/hudi_jni_reader.h" #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/format/table/max_compute_jni_reader.h" +#include "vec/exec/format/table/paimon_jni_reader.h" #include "vec/exec/format/table/paimon_reader.h" #include "vec/exec/format/table/transactional_hive_reader.h" #include "vec/exec/format/table/trino_connector_jni_reader.h" @@ -851,6 +852,19 @@ Status VFileScanner::_get_next_reader() { _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(iceberg_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "paimon") { + std::vector<std::string> place_holder; + init_status = parquet_reader->init_reader( + _file_col_names, place_holder, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + std::unique_ptr<PaimonParquetReader> paimon_reader = + PaimonParquetReader::create_unique(std::move(parquet_reader), _profile, + *_params); + RETURN_IF_ERROR(paimon_reader->init_row_filters(range)); + _cur_reader = std::move(paimon_reader); } else { std::vector<std::string> place_holder; init_status = parquet_reader->init_reader( @@ -902,6 +916,16 @@ Status VFileScanner::_get_next_reader() { _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(iceberg_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "paimon") { + init_status = orc_reader->init_reader( + &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + std::unique_ptr<PaimonOrcReader> paimon_reader = + PaimonOrcReader::create_unique(std::move(orc_reader), _profile, *_params); + RETURN_IF_ERROR(paimon_reader->init_row_filters(range)); + _cur_reader = std::move(paimon_reader); } else { init_status = orc_reader->init_reader( &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index b9672f70c41..003ced7ead7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -38,6 +38,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPaimonDeletionFileDesc; import org.apache.doris.thrift.TPaimonFileDesc; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TTableFormatFileDesc; @@ -51,6 +52,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.InstantiationUtil; @@ -66,11 +68,46 @@ import java.util.Set; import java.util.stream.Collectors; public class PaimonScanNode extends FileQueryScanNode { + private enum SplitReadType { + JNI, + NATIVE, + } + + private class SplitStat { + SplitReadType type = SplitReadType.JNI; + private long rowCount = 0; + private boolean rawFileConvertable = false; + private boolean hasDeletionVector = false; + + public void setType(SplitReadType type) { + this.type = type; + } + + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + + public void setRawFileConvertable(boolean rawFileConvertable) { + this.rawFileConvertable = rawFileConvertable; + } + + public void setHasDeletionVector(boolean hasDeletionVector) { + this.hasDeletionVector = hasDeletionVector; + } + + @Override + public String toString() { + return "SplitStat [type=" + type + ", rowCount=" + rowCount + ", rawFileConvertable=" + rawFileConvertable + + ", hasDeletionVector=" + hasDeletionVector + "]"; + } + } + private static final Logger LOG = LogManager.getLogger(PaimonScanNode.class); private PaimonSource source = null; private List<Predicate> predicates; private int rawFileSplitNum = 0; private int paimonSplitNum = 0; + private List<SplitStat> splitStats = new ArrayList<>(); public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); @@ -133,6 +170,15 @@ public class PaimonScanNode extends FileQueryScanNode { fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getLastUpdateTime()); + Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile(); + if (optDeletionFile.isPresent()) { + DeletionFile deletionFile = optDeletionFile.get(); + TPaimonDeletionFileDesc tDeletionFile = new TPaimonDeletionFileDesc(); + tDeletionFile.setPath(deletionFile.path()); + tDeletionFile.setOffset(deletionFile.offset()); + tDeletionFile.setLength(deletionFile.length()); + fileDesc.setDeletionFile(tDeletionFile); + } tableFormatFileDesc.setPaimonParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } @@ -152,19 +198,28 @@ public class PaimonScanNode extends FileQueryScanNode { // Just for counting the number of selected partitions for this paimon table Set<BinaryRow> selectedPartitionValues = Sets.newHashSet(); for (org.apache.paimon.table.source.Split split : paimonSplits) { + SplitStat splitStat = new SplitStat(); + splitStat.setRowCount(split.rowCount()); if (!forceJniScanner && supportNative && split instanceof DataSplit) { DataSplit dataSplit = (DataSplit) split; BinaryRow partitionValue = dataSplit.partition(); selectedPartitionValues.add(partitionValue); Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles(); + Optional<List<DeletionFile>> optDeletionFiles = dataSplit.deletionFiles(); if (optRawFiles.isPresent()) { + splitStat.setType(SplitReadType.NATIVE); + splitStat.setRawFileConvertable(true); List<RawFile> rawFiles = optRawFiles.get(); - for (RawFile file : rawFiles) { - LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); - Path finalDataFilePath = locationPath.toStorageLocation(); - try { - splits.addAll( - splitFile( + if (optDeletionFiles.isPresent()) { + List<DeletionFile> deletionFiles = optDeletionFiles.get(); + for (int i = 0; i < rawFiles.size(); i++) { + RawFile file = rawFiles.get(i); + DeletionFile deletionFile = deletionFiles.get(i); + LocationPath locationPath = new LocationPath(file.path(), + source.getCatalog().getProperties()); + Path finalDataFilePath = locationPath.toStorageLocation(); + try { + List<Split> dorisSplits = splitFile( finalDataFilePath, 0, null, @@ -172,10 +227,40 @@ public class PaimonScanNode extends FileQueryScanNode { -1, true, null, - PaimonSplit.PaimonSplitCreator.DEFAULT)); - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); + PaimonSplit.PaimonSplitCreator.DEFAULT); + for (Split dorisSplit : dorisSplits) { + // the element in DeletionFiles might be null + if (deletionFile != null) { + splitStat.setHasDeletionVector(true); + ((PaimonSplit) dorisSplit).setDeletionFile(deletionFile); + } + splits.add(dorisSplit); + } + ++rawFileSplitNum; + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); + } + } + } else { + for (RawFile file : rawFiles) { + LocationPath locationPath = new LocationPath(file.path(), + source.getCatalog().getProperties()); + Path finalDataFilePath = locationPath.toStorageLocation(); + try { + splits.addAll( + splitFile( + finalDataFilePath, + 0, + null, + file.length(), + -1, + true, + null, + PaimonSplit.PaimonSplitCreator.DEFAULT)); + ++rawFileSplitNum; + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); + } } } } else { @@ -186,6 +271,7 @@ public class PaimonScanNode extends FileQueryScanNode { splits.add(new PaimonSplit(split)); ++paimonSplitNum; } + splitStats.add(splitStat); } this.readPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number @@ -260,8 +346,15 @@ public class PaimonScanNode extends FileQueryScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - return super.getNodeExplainString(prefix, detailLevel) + String result = super.getNodeExplainString(prefix, detailLevel) + String.format("%spaimonNativeReadSplits=%d/%d\n", - prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)); + prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)); + if (detailLevel == TExplainLevel.VERBOSE) { + result += prefix + "PaimonSplitStats: \n"; + for (SplitStat splitStat : splitStats) { + result += String.format("%s %s\n", prefix, splitStat); + } + } + return result; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 932620d5674..6cca70577f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -22,24 +22,29 @@ import org.apache.doris.datasource.SplitCreator; import org.apache.doris.datasource.TableFormatType; import org.apache.hadoop.fs.Path; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.Split; import java.util.List; +import java.util.Optional; public class PaimonSplit extends FileSplit { private Split split; private TableFormatType tableFormatType; + private Optional<DeletionFile> optDeletionFile; public PaimonSplit(Split split) { super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; + this.optDeletionFile = Optional.empty(); } public PaimonSplit(Path file, long start, long length, long fileLength, String[] hosts, - List<String> partitionList) { + List<String> partitionList) { super(file, start, length, fileLength, hosts, partitionList); this.tableFormatType = TableFormatType.PAIMON; + this.optDeletionFile = Optional.empty(); } public Split getSplit() { @@ -58,18 +63,26 @@ public class PaimonSplit extends FileSplit { this.tableFormatType = tableFormatType; } + public Optional<DeletionFile> getDeletionFile() { + return optDeletionFile; + } + + public void setDeletionFile(DeletionFile deletionFile) { + this.optDeletionFile = Optional.of(deletionFile); + } + public static class PaimonSplitCreator implements SplitCreator { static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator(); @Override public org.apache.doris.spi.Split create(Path path, - long start, - long length, - long fileLength, - long modificationTime, - String[] hosts, - List<String> partitionValues) { + long start, + long length, + long fileLength, + long modificationTime, + String[] hosts, + List<String> partitionValues) { return new PaimonSplit(path, start, length, fileLength, hosts, partitionValues); } } diff --git a/fe/pom.xml b/fe/pom.xml index a574f95ae12..cd90827f217 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -228,7 +228,7 @@ under the License. <doris.home>${fe.dir}/../</doris.home> <revision>1.2-SNAPSHOT</revision> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <doris.hive.catalog.shade.version>2.0.2</doris.hive.catalog.shade.version> + <doris.hive.catalog.shade.version>2.1.0</doris.hive.catalog.shade.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <!--plugin parameters--> @@ -349,7 +349,7 @@ under the License. <!--todo waiting release--> <quartz.version>2.3.2</quartz.version> <!-- paimon --> - <paimon.version>0.7.0-incubating</paimon.version> + <paimon.version>0.8.0</paimon.version> <disruptor.version>3.4.4</disruptor.version> <!-- arrow flight sql --> <arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier> diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6b0cecb9144..1795fc7cec0 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -309,6 +309,12 @@ struct TIcebergFileDesc { 5: optional Exprs.TExpr file_select_conjunct; } +struct TPaimonDeletionFileDesc { + 1: optional string path; + 2: optional i64 offset; + 3: optional i64 length; +} + struct TPaimonFileDesc { 1: optional string paimon_split 2: optional string paimon_column_names @@ -321,6 +327,7 @@ struct TPaimonFileDesc { 9: optional i64 tbl_id 10: optional i64 last_update_time 11: optional string file_format + 12: optional TPaimonDeletionFileDesc deletion_file; } struct TTrinoConnectorFileDesc { diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out index 92cfe08294c..262ee6a79de 100644 --- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out +++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out @@ -558,3 +558,23 @@ bbb 2 {"c_boolean": 1, "c_tinyint": 5, "c_smallint": 6, "c_int": 7, "c_bigint": 8, "c_float": 10.2, "c_double": 11.4, "c_decimal": 12.60, "c_char": "7", "c_varchar": "10", "c_binary": "c", "c_varbinary": "dddd", "c_date": "2021-06-15", "c_timestamp": "2021-06-15 06:00:00.000000", "c_array": [4, 5, 6], "c_map": {"a":6, "b":7}, "c_row": {"c_int": 8, "c_float": 9.2, "c_varchar": "10"}} 3 {"c_boolean": 1, "c_tinyint": 9, "c_smallint": 10, "c_int": 11, "c_bigint": 12, "c_float": 13.3, "c_double": 14.6, "c_decimal": 15.90, "c_char": "8", "c_varchar": "11", "c_binary": "e", "c_varbinary": "ffff", "c_date": "2022-12-31", "c_timestamp": "2022-12-31 12:59:59.000000", "c_array": [7, 8, 9], "c_map": {"a":11, "b":12}, "c_row": {"c_int": 13, "c_float": 14.3, "c_varchar": "15"}} +-- !c104 -- +2 2_2 +3 3_1 +4 4_1 + +-- !c105 -- +2 2_2 +3 3_1 +4 4_1 + +-- !c106 -- +2 2_2 +3 3_1 +4 4_1 + +-- !c107 -- +2 2_2 +3 3_1 +4 4_1 + diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 53ab596a82f..b8bc174d7c5 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -174,6 +174,11 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ def c102= """select * from row_native_test order by id;""" def c103= """select * from row_jni_test order by id;""" + def c104= """select * from deletion_vector_orc;""" + def c105= """select * from deletion_vector_parquet;""" + def c106= """select * from deletion_vector_orc;""" + def c107= """select * from deletion_vector_parquet;""" + String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") String catalog_name = "ctl_test_paimon_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -275,6 +280,12 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ sql """ set force_jni_scanner=true; """ qt_c103 c103 sql """ set force_jni_scanner=false; """ + qt_c104 c104 + qt_c105 c105 + sql """ set force_jni_scanner=true; """ + qt_c106 c106 + qt_c107 c107 + sql """ set force_jni_scanner=false; """ // test view from jion paimon sql """ switch internal """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org