This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d494674ff494214da6902ec7969f35626b973c1d Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Thu Jan 11 20:45:53 2024 +0800 [opt](parquet-reader) Opt parquet decimal type reading. (#29825) --- .../exec/format/parquet/fix_length_plain_decoder.h | 29 +++-- .../exec/format/parquet/parquet_column_convert.h | 138 ++++++++++++++++----- .../exec/format/parquet/vparquet_column_reader.cpp | 12 +- .../exec/format/parquet/vparquet_column_reader.h | 4 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 4 +- 5 files changed, 137 insertions(+), 50 deletions(-) diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h index b21f58601d3..c06e33c550c 100644 --- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h @@ -103,14 +103,29 @@ Status FixLengthPlainDecoder<PhysicalType>::_decode_string(MutableColumnPtr& dor while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) { switch (read_type) { case ColumnSelectVector::CONTENT: { - std::vector<StringRef> string_values; - string_values.reserve(run_length); - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _data->data + _offset; - string_values.emplace_back(buf_start, _type_length); - _offset += _type_length; + auto* column_string = assert_cast<ColumnString*>(doris_column.get()); + auto& chars = column_string->get_chars(); + auto& offsets = column_string->get_offsets(); + size_t bytes_size = chars.size(); + + // copy chars + size_t data_size = run_length * _type_length; + size_t old_size = chars.size(); + chars.resize(old_size + data_size); + memcpy(chars.data() + old_size, _data->data, data_size); + + // copy offsets + offsets.resize(offsets.size() + run_length); + auto* offsets_data = offsets.data() + offsets.size() - run_length; + + int i = 0; + for (; i < run_length; i++) { + bytes_size += _type_length; + *(offsets_data++) = bytes_size; } - doris_column->insert_many_strings(&string_values[0], run_length); + + //doris_column->insert_many_strings_fixed_length<_type_length>(&string_values[0], run_length); + _offset += data_size; break; } case ColumnSelectVector::NULL_DATA: { diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.h b/be/src/vec/exec/format/parquet/parquet_column_convert.h index e5ee2104176..b5c3ffb7c88 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.h +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#pragma once + #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/parquet_types.h> @@ -219,7 +221,7 @@ struct ColumnConvert { } public: - ConvertParams* _convert_params = nullptr; + std::unique_ptr<ConvertParams> _convert_params; }; template <tparquet::Type::type parquet_physical_type, typename dst_type> @@ -349,6 +351,91 @@ public: } }; +template <typename DecimalType, DecimalScaleParams::ScaleType ScaleType> +class FixedStringToDecimal : public ColumnConvert { +public: + FixedStringToDecimal(int32_t type_length) : ColumnConvert(), _type_length(type_length) {} + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + +#define M(FixedTypeLength, ValueCopyType) \ + case FixedTypeLength: \ + return _convert_internal<FixedTypeLength, ValueCopyType>(src_col, dst_col); + +#define APPLY_FOR_DECIMALS() \ + M(1, int64_t) \ + M(2, int64_t) \ + M(3, int64_t) \ + M(4, int64_t) \ + M(5, int64_t) \ + M(6, int64_t) \ + M(7, int64_t) \ + M(8, int64_t) \ + M(9, int128_t) \ + M(10, int128_t) \ + M(11, int128_t) \ + M(12, int128_t) \ + M(13, int128_t) \ + M(14, int128_t) \ + M(15, int128_t) \ + M(16, int128_t) + + switch (_type_length) { + APPLY_FOR_DECIMALS() + default: + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + return Status::OK(); +#undef APPLY_FOR_DECIMALS +#undef M + } + + template <int fixed_type_length, typename ValueCopyType> + Status _convert_internal(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + size_t start_idx = dst_col->size(); + dst_col->resize(start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + if (len == 0) { + continue; + } + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + //memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], fixed_type_length); + // For performance, we can copy sizeof(value) because `ColumnDecimal::Container` use `PaddedPODArray` which has 15 bytes pad_right and the max fixed_type_length is 16 bytes. + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], sizeof(value)); + //memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - fixed_type_length) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } + +private: + int32_t _type_length; +}; + template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> class StringToDecimal : public ColumnConvert { public: @@ -497,8 +584,11 @@ public: inline Status get_converter(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, std::shared_ptr<const IDataType> dst_data_type, - std::unique_ptr<ColumnConvert>* converter, - ConvertParams* convert_params) { + std::unique_ptr<ColumnConvert>* converter, FieldSchema* field_schema, + cctz::time_zone* ctz) { + std::unique_ptr<ParquetConvert::ConvertParams> convert_params = + std::make_unique<ParquetConvert::ConvertParams>(); + convert_params->init(field_schema, ctz); auto dst_type = remove_nullable(dst_data_type)->get_type_id(); switch (dst_type) { #define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ @@ -592,34 +682,18 @@ inline Status get_converter(tparquet::Type::type parquet_physical_type, Primitiv DecimalScaleParams& scale_params = convert_params->decimal_scale; \ if (tparquet::Type::FIXED_LEN_BYTE_ARRAY == parquet_physical_type) { \ size_t string_length = convert_params->field_schema->parquet_schema.type_length; \ - if (string_length <= 8) { \ - if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { \ - *converter = \ - std::make_unique<StringToDecimal<DECIMAL_TYPE, int64_t, \ - DecimalScaleParams::SCALE_UP>>(); \ - } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { \ - *converter = \ - std::make_unique<StringToDecimal<DECIMAL_TYPE, int64_t, \ - DecimalScaleParams::SCALE_DOWN>>(); \ - } else { \ - *converter = \ - std::make_unique<StringToDecimal<DECIMAL_TYPE, int64_t, \ - DecimalScaleParams::NO_SCALE>>(); \ - } \ - } else if (string_length <= 16) { \ - if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { \ - *converter = \ - std::make_unique<StringToDecimal<DECIMAL_TYPE, int128_t, \ - DecimalScaleParams::SCALE_UP>>(); \ - } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { \ - *converter = \ - std::make_unique<StringToDecimal<DECIMAL_TYPE, int128_t, \ - DecimalScaleParams::SCALE_DOWN>>(); \ - } else { \ - *converter = \ - std::make_unique<StringToDecimal<DECIMAL_TYPE, int128_t, \ - DecimalScaleParams::NO_SCALE>>(); \ - } \ + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { \ + *converter = std::make_unique< \ + FixedStringToDecimal<DECIMAL_TYPE, DecimalScaleParams::SCALE_UP>>( \ + string_length); \ + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { \ + *converter = std::make_unique< \ + FixedStringToDecimal<DECIMAL_TYPE, DecimalScaleParams::SCALE_DOWN>>( \ + string_length); \ + } else { \ + *converter = std::make_unique< \ + FixedStringToDecimal<DECIMAL_TYPE, DecimalScaleParams::NO_SCALE>>( \ + string_length); \ } \ } else if (tparquet::Type::BYTE_ARRAY == parquet_physical_type) { \ convert_params->init_decimal_converter<PRIMARY_TYPE>(dst_data_type); \ @@ -674,7 +748,7 @@ inline Status get_converter(tparquet::Type::type parquet_physical_type, Primitiv tparquet::to_string(parquet_physical_type), getTypeName(dst_type)); } - (*converter)->_convert_params = convert_params; + (*converter)->_convert_params = std::move(convert_params); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 7c088394131..cfddb5346e8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -25,7 +25,6 @@ #include <algorithm> #include <utility> -#include "parquet_column_convert.h" #include "runtime/define_primitive_type.h" #include "schema_desc.h" #include "util/runtime_profile.h" @@ -575,13 +574,12 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } while (false); if (need_convert) { - std::unique_ptr<ParquetConvert::ColumnConvert> converter; - ParquetConvert::ConvertParams convert_params; - convert_params.init(_field_schema, _ctz); - RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, type, - &converter, &convert_params)); + if (_converter == nullptr) { + RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, type, + &_converter, _field_schema, _ctz)); + } auto x = doris_column->assume_mutable(); - RETURN_IF_ERROR(converter->convert(src_column, x)); + RETURN_IF_ERROR(_converter->convert(src_column, x)); } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 994908711b2..d15d6d5efa1 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -29,6 +29,7 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader_writer_fwd.h" +#include "parquet_column_convert.h" #include "vec/columns/columns_number.h" #include "vec/data_types/data_type.h" #include "vec/exec/format/parquet/parquet_common.h" @@ -183,6 +184,7 @@ private: std::unique_ptr<ColumnChunkReader> _chunk_reader; std::vector<level_t> _rep_levels; std::vector<level_t> _def_levels; + std::unique_ptr<ParquetConvert::ColumnConvert> _converter = nullptr; Status _skip_values(size_t num_values); Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, @@ -288,4 +290,4 @@ private: std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers; }; -}; // namespace doris::vectorized \ No newline at end of file +}; // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index cb850cf5d57..03a6ac5eed1 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -266,10 +266,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column } if (need_convert) { std::unique_ptr<ParquetConvert::ColumnConvert> converter; - ParquetConvert::ConvertParams convert_params; - convert_params.init(field_schema, &ctz); RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, data_type, - &converter, &convert_params)); + &converter, field_schema, &ctz)); auto x = doris_column->assume_mutable(); RETURN_IF_ERROR(converter->convert(src_column, x)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org