This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 20634ab7e3 [feature-wip](multi-catalog) support partition&missing columns in parquet lazy read (#14264) 20634ab7e3 is described below commit 20634ab7e3d90de41395543e1f4cdb5569bca6f2 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Wed Nov 16 08:43:11 2022 +0800 [feature-wip](multi-catalog) support partition&missing columns in parquet lazy read (#14264) PR https://github.com/apache/doris/pull/13917 has supported lazy read for non-predicate columns in ParquetReader, but can't trigger lazy read when predicate columns are partition or missing columns. This PR support such case, and fill partition and missing columns in `FileReader`. --- be/src/exec/text_converter.cpp | 164 ++++++++++++++++++++- be/src/exec/text_converter.h | 5 + be/src/vec/exec/format/generic_reader.h | 19 +++ .../exec/format/parquet/vparquet_group_reader.cpp | 135 ++++++++++++----- .../exec/format/parquet/vparquet_group_reader.h | 35 +++-- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 59 ++++++-- be/src/vec/exec/format/parquet/vparquet_reader.h | 6 +- be/src/vec/exec/scan/vfile_scanner.cpp | 66 +++++++-- be/src/vec/exec/scan/vfile_scanner.h | 1 + be/test/vec/exec/parquet/parquet_reader_test.cpp | 4 + 10 files changed, 425 insertions(+), 69 deletions(-) diff --git a/be/src/exec/text_converter.cpp b/be/src/exec/text_converter.cpp index 5fac00569c..5888eefc43 100644 --- a/be/src/exec/text_converter.cpp +++ b/be/src/exec/text_converter.cpp @@ -17,15 +17,177 @@ #include "text_converter.h" -#include <boost/algorithm/string.hpp> +#include <sql.h> +#include "runtime/decimalv2_value.h" +#include "runtime/descriptors.h" #include "runtime/mem_pool.h" #include "runtime/string_value.h" +#include "runtime/tuple.h" +#include "util/string_parser.hpp" +#include "util/types.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_nullable.h" +#include "vec/runtime/vdatetime_value.h" namespace doris { TextConverter::TextConverter(char escape_char) : _escape_char(escape_char) {} +bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, + vectorized::IColumn* nullable_col_ptr, const char* data, + size_t len, bool copy_string, bool need_escape, size_t rows) { + vectorized::IColumn* col_ptr = nullable_col_ptr; + // \N means it's NULL + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr); + if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { + nullable_column->insert_many_defaults(rows); + return true; + } else { + auto& null_map = nullable_column->get_null_map_data(); + null_map.resize_fill(null_map.size() + rows, 0); + col_ptr = &nullable_column->get_nested_column(); + } + } + + StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; + size_t origin_size = col_ptr->size(); + // Parse the raw-text data. Translate the text string to internal format. + switch (slot_desc->type().type) { + case TYPE_HLL: { + HyperLogLog hyper_log_log(Slice(data, len)); + auto& hyper_data = reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data(); + for (size_t i = 0; i < rows; ++i) { + hyper_data.emplace_back(hyper_log_log); + } + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: { + if (need_escape) { + unescape_string_on_spot(data, &len); + } + reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_many_data(data, len, rows); + break; + } + + case TYPE_BOOLEAN: { + bool num = StringParser::string_to_bool(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, (uint8_t)num); + break; + } + case TYPE_TINYINT: { + int8_t num = StringParser::string_to_int<int8_t>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + case TYPE_SMALLINT: { + int16_t num = StringParser::string_to_int<int16_t>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + case TYPE_INT: { + int32_t num = StringParser::string_to_int<int32_t>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + case TYPE_BIGINT: { + int64_t num = StringParser::string_to_int<int64_t>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + case TYPE_LARGEINT: { + __int128 num = StringParser::string_to_int<__int128>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + + case TYPE_FLOAT: { + float num = StringParser::string_to_float<float>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + case TYPE_DOUBLE: { + double num = StringParser::string_to_float<double>(data, len, &parse_result); + reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, num); + break; + } + case TYPE_DATE: { + vectorized::VecDateTimeValue ts_slot; + if (!ts_slot.from_date_str(data, len)) { + parse_result = StringParser::PARSE_FAILURE; + break; + } + ts_slot.cast_to_date(); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, *reinterpret_cast<int64_t*>(&ts_slot)); + break; + } + + case TYPE_DATETIME: { + vectorized::VecDateTimeValue ts_slot; + if (!ts_slot.from_date_str(data, len)) { + parse_result = StringParser::PARSE_FAILURE; + break; + } + ts_slot.to_datetime(); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, *reinterpret_cast<int64_t*>(&ts_slot)); + break; + } + + case TYPE_DECIMALV2: { + DecimalV2Value decimal_slot; + if (decimal_slot.parse_from_str(data, len)) { + parse_result = StringParser::PARSE_FAILURE; + break; + } + reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr) + ->get_data() + .resize_fill(origin_size + rows, decimal_slot.value()); + break; + } + + default: + DCHECK(false) << "bad slot type: " << slot_desc->type(); + break; + } + + if (UNLIKELY(parse_result == StringParser::PARSE_FAILURE)) { + if (true == slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr); + size_t size = nullable_column->get_null_map_data().size(); + doris::vectorized::NullMap& null_map_data = nullable_column->get_null_map_data(); + for (int i = 1; i <= rows; ++i) { + null_map_data[size - i] = 1; + } + nullable_column->get_nested_column().insert_many_defaults(rows); + } + return false; + } + return true; +} + void TextConverter::unescape_string(StringValue* value, MemPool* pool) { char* new_data = reinterpret_cast<char*>(pool->allocate(value->len)); unescape_string(value->ptr, new_data, &value->len); diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h index a3d3fe5ff6..79deac95ff 100644 --- a/be/src/exec/text_converter.h +++ b/be/src/exec/text_converter.h @@ -56,6 +56,11 @@ public: bool write_vec_column(const SlotDescriptor* slot_desc, vectorized::IColumn* nullable_col_ptr, const char* data, size_t len, bool copy_string, bool need_escape); + /// Write consecutive rows of the same data. + bool write_vec_column(const SlotDescriptor* slot_desc, vectorized::IColumn* nullable_col_ptr, + const char* data, size_t len, bool copy_string, bool need_escape, + size_t rows); + // Removes escape characters from len characters of the null-terminated string src, // and copies the unescaped string into dest, changing *len to the unescaped length. // No null-terminator is added to dest. diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index e098557b82..dd2bdd249c 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -19,6 +19,7 @@ #include "common/status.h" #include "runtime/types.h" +#include "vec/exprs/vexpr_context.h" namespace doris::vectorized { @@ -43,6 +44,24 @@ public: return Status::NotSupported("get_parser_schema is not implemented for this reader."); } virtual ~GenericReader() = default; + + /// If the underlying FileReader has filled the partition&missing columns, + /// The FileScanner does not need to fill + bool fill_all_columns() const { return _fill_all_columns; } + + /// Tell the underlying FileReader the partition&missing columns, + /// and the FileReader determine to fill columns or not. + /// Should set _fill_all_columns = true, if fill the columns. + virtual Status set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) { + return Status::OK(); + } + +protected: + /// Whether the underlying FileReader has filled the partition&missing columns + bool _fill_all_columns = false; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 0f71990b2b..e607cd510d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -33,13 +33,7 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader, _row_group_meta(row_group), _remaining_rows(row_group.num_rows), _ctz(ctz), - _vconjunct_ctx(lazy_read_ctx.vconjunct_ctx), - _can_lazy_read(lazy_read_ctx.can_lazy_read), - _resize_first_column(lazy_read_ctx.resize_first_column), - _all_read_columns(lazy_read_ctx.all_read_columns), - _predicate_columns(lazy_read_ctx.predicate_columns), - _predicate_col_ids(lazy_read_ctx.predicate_col_ids), - _lazy_read_columns(lazy_read_ctx.lazy_read_columns) {} + _lazy_read_ctx(lazy_read_ctx) {} RowGroupReader::~RowGroupReader() { _column_readers.clear(); @@ -54,7 +48,6 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20; const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20; size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size()); - std::set<std::string> predicate_columns(_predicate_columns.begin(), _predicate_columns.end()); for (auto& read_col : _read_columns) { auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name)); std::unique_ptr<ParquetColumnReader> reader; @@ -70,13 +63,6 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> return Status::Corruption("Init row group reader failed"); } _column_readers[read_col._file_slot_name] = std::move(reader); - PrimitiveType column_type = field->type.type; - if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || column_type == TYPE_STRUCT) { - _can_lazy_read = false; - } - } - if (_vconjunct_ctx == nullptr) { - _can_lazy_read = false; } return Status::OK(); } @@ -85,16 +71,29 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ bool* _batch_eof) { // Process external table query task that select columns are all from path. if (_read_columns.empty()) { - return _read_empty_batch(batch_size, read_rows, _batch_eof); + RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, _batch_eof)); + RETURN_IF_ERROR( + _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); + RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); + + Status st = + VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, block, block->columns()); + *read_rows = block->rows(); + return st; } - if (_can_lazy_read) { + if (_lazy_read_ctx.can_lazy_read) { // call _do_lazy_read recursively when current batch is skipped return _do_lazy_read(block, batch_size, read_rows, _batch_eof); } else { ColumnSelectVector run_length_vector; - RETURN_IF_ERROR(_read_column_data(block, _all_read_columns, batch_size, read_rows, - _batch_eof, run_length_vector)); - Status st = VExprContext::filter_block(_vconjunct_ctx, block, block->columns()); + RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size, + read_rows, _batch_eof, run_length_vector)); + RETURN_IF_ERROR( + _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); + RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); + + Status st = + VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, block, block->columns()); *read_rows = block->rows(); return st; } @@ -132,7 +131,6 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st col_idx++; } *read_rows = batch_read_rows; - _read_rows += batch_read_rows; *_batch_eof = has_eof; return Status::OK(); } @@ -143,19 +141,23 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re size_t pre_read_rows; bool pre_eof; ColumnSelectVector run_length_vector; - RETURN_IF_ERROR(_read_column_data(block, _predicate_columns, batch_size, &pre_read_rows, - &pre_eof, run_length_vector)); + RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size, + &pre_read_rows, &pre_eof, run_length_vector)); + RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, + _lazy_read_ctx.predicate_partition_columns)); + RETURN_IF_ERROR( + _fill_missing_columns(block, pre_read_rows, _lazy_read_ctx.predicate_missing_columns)); // generate filter vector - if (_resize_first_column) { + if (_lazy_read_ctx.resize_first_column) { // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 // The following process may be tricky and time-consuming, but we have no other way. block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows); } size_t origin_column_num = block->columns(); int filter_column_id = -1; - RETURN_IF_ERROR(_vconjunct_ctx->execute(block, &filter_column_id)); + RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id)); ColumnPtr& sv = block->get_by_position(filter_column_id).column; - if (_resize_first_column) { + if (_lazy_read_ctx.resize_first_column) { // We have to clean the first column to insert right data. block->get_by_position(0).column->assume_mutable()->clear(); } @@ -167,10 +169,16 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re if (select_vector.filter_all() && !pre_eof) { // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; - for (auto& col : _predicate_columns) { + for (auto& col : _lazy_read_ctx.predicate_columns) { // clean block to read predicate columns block->get_by_name(col).column->assume_mutable()->clear(); } + for (auto& col : _lazy_read_ctx.predicate_partition_columns) { + block->get_by_name(col.first).column->assume_mutable()->clear(); + } + for (auto& col : _lazy_read_ctx.predicate_missing_columns) { + block->get_by_name(col.first).column->assume_mutable()->clear(); + } Block::erase_useless_column(block, origin_column_num); return _do_lazy_read(block, batch_size, read_rows, batch_eof); } @@ -184,8 +192,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // lazy read columns size_t lazy_read_rows; bool lazy_eof; - RETURN_IF_ERROR(_read_column_data(block, _lazy_read_columns, pre_read_rows, &lazy_read_rows, - &lazy_eof, select_vector)); + RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows, + &lazy_read_rows, &lazy_eof, select_vector)); if (pre_read_rows != lazy_read_rows) { return Status::Corruption("Can't read the same number of rows when doing lazy read"); } @@ -194,24 +202,29 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // filter data in predicate columns, and remove filter column if (select_vector.has_filter()) { - Block::filter_block(block, _predicate_col_ids, filter_column_id, origin_column_num); + Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id, + origin_column_num); } else { Block::erase_useless_column(block, origin_column_num); } + size_t column_num = block->columns(); - size_t column_size = -1; + size_t column_size = 0; for (int i = 0; i < column_num; ++i) { size_t cz = block->get_by_position(i).column->size(); - if (column_size != -1) { + if (column_size != 0 && cz != 0) { DCHECK_EQ(column_size, cz); } - column_size = cz; + if (cz != 0) { + column_size = cz; + } } _lazy_read_filtered_rows += pre_read_rows - column_size; *read_rows = column_size; *batch_eof = pre_eof; - return Status::OK(); + RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); + return _fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns); } const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows, @@ -271,6 +284,60 @@ void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector, select_vector.build(map, total_rows, false); } +Status RowGroupReader::_fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns) { + for (auto& kv : partition_columns) { + auto doris_column = block->get_by_name(kv.first).column; + IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); + auto& [value, slot_desc] = kv.second; + if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()), + value.size(), true, false, rows)) { + return Status::InternalError("Failed to fill partition column: {}={}", + slot_desc->col_name(), value); + } + } + return Status::OK(); +} + +Status RowGroupReader::_fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, VExprContext*>& missing_columns) { + for (auto& kv : missing_columns) { + if (kv.second == nullptr) { + // no default column, fill with null + auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(block->get_by_name(kv.first).column)).mutate().get()); + nullable_column->insert_many_defaults(rows); + } else { + // fill with default value + auto* ctx = kv.second; + auto origin_column_num = block->columns(); + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); + bool is_origin_column = result_column_id < origin_column_num; + if (!is_origin_column) { + // call resize because the first column of _src_block_ptr may not be filled by reader, + // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()` + // has only one row. + std::move(*block->get_by_position(result_column_id).column).mutate()->resize(rows); + auto result_column_ptr = block->get_by_position(result_column_id).column; + // result_column_ptr maybe a ColumnConst, convert it to a normal column + result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); + auto origin_column_type = block->get_by_name(kv.first).type; + bool is_nullable = origin_column_type->is_nullable(); + block->replace_by_position( + block->get_position_by_name(kv.first), + is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); + block->erase(result_column_id); + } + } + } + return Status::OK(); +} + Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) { if (batch_size < _remaining_rows) { *read_rows = batch_size; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index d1e0315c68..661d248f12 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -17,6 +17,7 @@ #pragma once #include <common/status.h> +#include "exec/text_converter.h" #include "io/file_reader.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" @@ -29,11 +30,22 @@ public: struct LazyReadContext { VExprContext* vconjunct_ctx = nullptr; bool can_lazy_read = false; + // block->rows() returns the number of rows of the first column, + // so we should check and resize the first column bool resize_first_column = true; std::vector<std::string> all_read_columns; + // include predicate_partition_columns & predicate_missing_columns + std::vector<uint32_t> all_predicate_col_ids; std::vector<std::string> predicate_columns; - std::vector<uint32_t> predicate_col_ids; std::vector<std::string> lazy_read_columns; + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + predicate_partition_columns; + // lazy read partition columns or all partition columns + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + partition_columns; + std::unordered_map<std::string, VExprContext*> predicate_missing_columns; + // lazy read missing columns or all missing columns + std::unordered_map<std::string, VExprContext*> missing_columns; }; RowGroupReader(doris::FileReader* file_reader, @@ -45,7 +57,7 @@ public: Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof); - int64_t lazy_read_filtered_rows() { return _lazy_read_filtered_rows; } + int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } ParquetColumnReader::Statistics statistics(); @@ -58,6 +70,13 @@ private: const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* can_filter_all); void _rebuild_select_vector(ColumnSelectVector& select_vector, std::unique_ptr<uint8_t[]>& filter_map, size_t pre_read_rows); + Status _fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns); + Status _fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, VExprContext*>& missing_columns); doris::FileReader* _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; @@ -65,20 +84,12 @@ private: const int32_t _row_group_id; const tparquet::RowGroup& _row_group_meta; int64_t _remaining_rows; - int64_t _read_rows = 0; cctz::time_zone* _ctz; - VExprContext* _vconjunct_ctx; - bool _can_lazy_read; - // block->rows() returns the number of rows of the first column, - // so we should check and resize the first column - const bool _resize_first_column; - const std::vector<std::string>& _all_read_columns; - const std::vector<std::string>& _predicate_columns; - const std::vector<uint32_t>& _predicate_col_ids; - const std::vector<std::string>& _lazy_read_columns; + const LazyReadContext& _lazy_read_ctx; int64_t _lazy_read_filtered_rows = 0; // If continuous batches are skipped, we can cache them to skip a whole page size_t _cached_filtered_rows = 0; + std::unique_ptr<TextConverter> _text_converter = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 2ab2a1dfdc..63efce2882 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -160,13 +160,15 @@ Status ParquetReader::init_reader( RETURN_IF_ERROR(_init_read_columns()); // build column predicates for column lazy read _lazy_read_ctx.vconjunct_ctx = vconjunct_ctx; - _init_lazy_read(); - RETURN_IF_ERROR(_init_row_group_readers()); - return Status::OK(); + return _init_row_group_readers(); } -void ParquetReader::_init_lazy_read() { +Status ParquetReader::set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) { + SCOPED_RAW_TIMER(&_statistics.parse_meta_time); std::unordered_map<std::string, uint32_t> predicate_columns; std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) { if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) { @@ -200,27 +202,62 @@ void ParquetReader::_init_lazy_read() { if (_lazy_read_ctx.vconjunct_ctx != nullptr) { visit_slot(_lazy_read_ctx.vconjunct_ctx->root()); } + + bool has_complex_type = false; + const FieldDescriptor& schema = _file_metadata->schema(); for (auto& read_col : _read_columns) { _lazy_read_ctx.all_read_columns.emplace_back(read_col._file_slot_name); + PrimitiveType column_type = schema.get_column(read_col._file_slot_name)->type.type; + if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || column_type == TYPE_STRUCT) { + has_complex_type = true; + } if (predicate_columns.size() > 0) { auto iter = predicate_columns.find(read_col._file_slot_name); if (iter == predicate_columns.end()) { _lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name); } else { _lazy_read_ctx.predicate_columns.emplace_back(iter->first); - _lazy_read_ctx.predicate_col_ids.emplace_back(iter->second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second); } } } - if (_lazy_read_ctx.predicate_columns.size() > 0 && + + for (auto& kv : partition_columns) { + auto iter = predicate_columns.find(kv.first); + if (iter == predicate_columns.end()) { + _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second); + } else { + _lazy_read_ctx.predicate_partition_columns.emplace(kv.first, kv.second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second); + } + } + + for (auto& kv : missing_columns) { + auto iter = predicate_columns.find(kv.first); + if (iter == predicate_columns.end()) { + _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second); + } else { + _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second); + } + } + + if (!has_complex_type && _lazy_read_ctx.predicate_columns.size() > 0 && _lazy_read_ctx.lazy_read_columns.size() > 0) { - if (predicate_columns.size() == _lazy_read_ctx.predicate_columns.size()) { - // TODO: support partition columns - // _vconjunct_ctx has partition columns, and will push down to row group reader. - // However, row group reader can't get partition column values now. - _lazy_read_ctx.can_lazy_read = true; + _lazy_read_ctx.can_lazy_read = true; + } + + if (!_lazy_read_ctx.can_lazy_read) { + for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { + _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second); + } + for (auto& kv : _lazy_read_ctx.predicate_missing_columns) { + _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second); } } + + _fill_all_columns = true; + return Status::OK(); } Status ParquetReader::_init_read_columns() { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 9f602e6901..b726a401ad 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -82,6 +82,11 @@ public: Statistics& statistics() { return _statistics; } + Status set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) override; + private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups; @@ -108,7 +113,6 @@ private: void _init_profile(); bool _next_row_group_reader(); - void _init_lazy_read(); Status _init_read_columns(); Status _init_row_group_readers(); // Page Index Filter diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index a2400b1dad..8e6aabbd47 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -151,10 +151,13 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo if (read_rows > 0) { // Convert the src block columns type to string in-place. RETURN_IF_ERROR(_cast_to_input_block(block)); - // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) - RETURN_IF_ERROR(_fill_columns_from_path(read_rows)); - // Fill columns not exist in file with null or default value - RETURN_IF_ERROR(_fill_missing_columns(read_rows)); + // FileReader can fill partition and missing columns itself + if (!_cur_reader->fill_all_columns()) { + // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) + RETURN_IF_ERROR(_fill_columns_from_path(read_rows)); + // Fill columns not exist in file with null or default value + RETURN_IF_ERROR(_fill_missing_columns(read_rows)); + } // Apply _pre_conjunct_ctx_ptr to filter src block. RETURN_IF_ERROR(_pre_filter_src_block()); // Convert src block to output block (dest block), string to dest data type and apply filters. @@ -261,10 +264,11 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) { auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column; IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); - for (size_t j = 0; j < rows; ++j) { - _text_converter->write_vec_column(slot_desc, col_ptr, - const_cast<char*>(column_from_path.c_str()), - column_from_path.size(), true, false); + if (!_text_converter->write_vec_column(slot_desc, col_ptr, + const_cast<char*>(column_from_path.c_str()), + column_from_path.size(), true, false, rows)) { + return Status::InternalError("Failed to fill partition column: {}={}", + slot_desc->col_name(), column_from_path); } } } @@ -473,8 +477,7 @@ Status VFileScanner::_get_next_reader() { _cur_reader.reset(new ParquetReader( _profile, _params, range, _file_col_names, _state->query_options().batch_size, const_cast<cctz::time_zone*>(&_state->timezone_obj()))); - if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr && - _partition_slot_descs.empty()) { // TODO: support partition columns + if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr) { RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr)); _discard_conjuncts(); } @@ -521,6 +524,7 @@ Status VFileScanner::_get_next_reader() { _name_to_col_type.clear(); _missing_cols.clear(); _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); + RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; for (auto& col : _missing_cols) { @@ -535,6 +539,48 @@ Status VFileScanner::_get_next_reader() { return Status::OK(); } +Status VFileScanner::_generate_fill_columns() { + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + partition_columns; + std::unordered_map<std::string, VExprContext*> missing_columns; + + const TFileRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { + for (const auto& slot_desc : _partition_slot_descs) { + if (slot_desc) { + auto it = _partition_slot_index_map.find(slot_desc->id()); + if (it == std::end(_partition_slot_index_map)) { + return Status::InternalError("Unknown source slot descriptor, slot_id={}", + slot_desc->id()); + } + const std::string& column_from_path = range.columns_from_path[it->second]; + partition_columns.emplace(slot_desc->col_name(), + std::make_tuple(column_from_path, slot_desc)); + } + } + } + + if (!_missing_cols.empty()) { + for (auto slot_desc : _real_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) { + continue; + } + + auto it = _col_default_value_ctx.find(slot_desc->col_name()); + if (it == _col_default_value_ctx.end()) { + return Status::InternalError("failed to find default value expr for slot: {}", + slot_desc->col_name()); + } + missing_columns.emplace(slot_desc->col_name(), it->second); + } + } + + return _cur_reader->set_fill_columns(partition_columns, missing_columns); +} + Status VFileScanner::_init_expr_ctxes() { DCHECK(!_ranges.empty()); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 3edd75a5ac..cfe26d9753 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -135,6 +135,7 @@ private: Status _fill_missing_columns(size_t rows); Status _pre_filter_src_block(); Status _convert_to_output_block(Block* block); + Status _generate_fill_columns(); void _reset_counter() { _counter.num_rows_unselected = 0; diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 4b99ccae0b..743b2291cf 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -113,6 +113,10 @@ TEST_F(ParquetReaderTest, normal) { std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range; p_reader->init_reader(&colname_to_value_range, nullptr); + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + partition_columns; + std::unordered_map<std::string, VExprContext*> missing_columns; + p_reader->set_fill_columns(partition_columns, missing_columns); Block* block = new Block(); for (const auto& slot_desc : tuple_desc->slots()) { auto data_type = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org