github-actions[bot] commented on code in PR #16818: URL: https://github.com/apache/doris/pull/16818#discussion_r1109529424
########## be/src/vec/columns/column_string.h: ########## @@ -280,6 +280,98 @@ class ColumnString final : public COWHelper<IColumn, ColumnString> { } } +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template <typename T, size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) + __attribute__((noinline)); + + template <size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { + size_t new_size = 0; + for (size_t i = 0; i < num; i++) { + new_size += strings[i].size; + } + + const size_t old_size = chars.size(); + check_chars_length(old_size + new_size, offsets.size() + num); + chars.resize(old_size + new_size + copy_length); + + Char* data = chars.data(); + size_t offset = old_size; + for (size_t i = 0; i < num; i++) { + uint32_t len = strings[i].size; + if (len) { + memcpy(data + offset, strings[i].data, copy_length); + offset += len; + } + offsets.push_back(offset); + } + chars.resize(old_size + new_size); + } + + void insert_many_strings_overflow(const StringRef* strings, size_t num, + size_t max_length) override { + if (max_length <= 8) { + insert_many_strings_fixed_length<8>(strings, num); + } else if (max_length <= 16) { + insert_many_strings_fixed_length<16>(strings, num); + } else if (max_length <= 32) { + insert_many_strings_fixed_length<32>(strings, num); + } else if (max_length <= 64) { + insert_many_strings_fixed_length<64>(strings, num); + } else if (max_length <= 128) { + insert_many_strings_fixed_length<128>(strings, num); + } else { + insert_many_strings(strings, num); + } + } + +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template <typename T, size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) Review Comment: warning: class member cannot be redeclared [clang-diagnostic-error] ```cpp void insert_many_strings_fixed_length(const StringRef* strings, size_t num) ^ ``` **be/src/vec/columns/column_string.h:284:** previous declaration is here ```cpp void insert_many_strings_fixed_length(const StringRef* strings, size_t num) ^ ``` ########## be/src/vec/columns/column_string.h: ########## @@ -280,6 +280,98 @@ } } +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template <typename T, size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) + __attribute__((noinline)); + + template <size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { + size_t new_size = 0; + for (size_t i = 0; i < num; i++) { + new_size += strings[i].size; + } + + const size_t old_size = chars.size(); + check_chars_length(old_size + new_size, offsets.size() + num); + chars.resize(old_size + new_size + copy_length); + + Char* data = chars.data(); + size_t offset = old_size; + for (size_t i = 0; i < num; i++) { + uint32_t len = strings[i].size; + if (len) { + memcpy(data + offset, strings[i].data, copy_length); + offset += len; + } + offsets.push_back(offset); + } + chars.resize(old_size + new_size); + } + + void insert_many_strings_overflow(const StringRef* strings, size_t num, + size_t max_length) override { + if (max_length <= 8) { + insert_many_strings_fixed_length<8>(strings, num); + } else if (max_length <= 16) { + insert_many_strings_fixed_length<16>(strings, num); + } else if (max_length <= 32) { + insert_many_strings_fixed_length<32>(strings, num); + } else if (max_length <= 64) { + insert_many_strings_fixed_length<64>(strings, num); + } else if (max_length <= 128) { + insert_many_strings_fixed_length<128>(strings, num); + } else { + insert_many_strings(strings, num); + } + } + +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template <typename T, size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) + __attribute__((noinline)); + + template <size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { Review Comment: warning: class member cannot be redeclared [clang-diagnostic-error] ```cpp void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { ^ ``` **be/src/vec/columns/column_string.h:288:** previous definition is here ```cpp void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { ^ ``` ########## be/src/vec/exec/format/parquet/decoder.h: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "schema_desc.h" +#include "util/rle_encoding.h" +#include "vec/columns/column_dictionary.h" +#include "vec/data_types/data_type.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::UInt8, UInt8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::UInt16, UInt16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::UInt32, UInt32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::UInt64, UInt64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +struct DecodeParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; +}; + +class Decoder { +public: + Decoder() = default; + virtual ~Decoder() = default; + + static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder); + + // The type with fix length + void set_type_length(int32_t type_length) { _type_length = type_length; } + + // Set the data to be decoded + virtual void set_data(Slice* data) { + _data = data; + _offset = 0; + } + + void init(FieldSchema* field_schema, cctz::time_zone* ctz); + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type); + + // Write the decoded values batch to doris's column + virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) = 0; + + virtual Status skip_values(size_t num_values) = 0; + + virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) { + return Status::NotSupported("set_dict is not supported"); + } + +protected: + int32_t _type_length; + Slice* _data = nullptr; + uint32_t _offset = 0; + FieldSchema* _field_schema = nullptr; + std::unique_ptr<DecodeParams> _decode_params = nullptr; +}; + +class BaseDictDecoder : public Decoder { +public: + BaseDictDecoder() = default; + virtual ~BaseDictDecoder() override = default; Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ~BaseDictDecoder() override = default; ``` ########## be/src/vec/exec/format/parquet/decoder.h: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "schema_desc.h" +#include "util/rle_encoding.h" +#include "vec/columns/column_dictionary.h" +#include "vec/data_types/data_type.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::UInt8, UInt8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::UInt16, UInt16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::UInt32, UInt32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::UInt64, UInt64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +struct DecodeParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; +}; + +class Decoder { +public: + Decoder() = default; + virtual ~Decoder() = default; + + static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder); + + // The type with fix length + void set_type_length(int32_t type_length) { _type_length = type_length; } + + // Set the data to be decoded + virtual void set_data(Slice* data) { + _data = data; + _offset = 0; + } + + void init(FieldSchema* field_schema, cctz::time_zone* ctz); + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type); + + // Write the decoded values batch to doris's column + virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) = 0; + + virtual Status skip_values(size_t num_values) = 0; + + virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) { + return Status::NotSupported("set_dict is not supported"); + } + +protected: + int32_t _type_length; + Slice* _data = nullptr; + uint32_t _offset = 0; + FieldSchema* _field_schema = nullptr; + std::unique_ptr<DecodeParams> _decode_params = nullptr; +}; + +class BaseDictDecoder : public Decoder { +public: + BaseDictDecoder() = default; + virtual ~BaseDictDecoder() override = default; + + // Set the data to be decoded + virtual void set_data(Slice* data) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion void set_data(Slice* data) override { ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { Review Comment: warning: expected class name [clang-diagnostic-error] ```cpp class FixLengthDictDecoder final : public BaseDictDecoder { ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; Review Comment: warning: only virtual member functions can be marked 'override' [clang-diagnostic-error] ```suggestion ~FixLengthDictDecoder() = default; ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast<char*>(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + column_data[data_index++] = + static_cast<Numeric>(_dict_items[_indexes[dict_index++]]); + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value * 24 * 60 * 60, + *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { Review Comment: warning: unknown type name 'ColumnSelectVector' [clang-diagnostic-error] ```cpp Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast<char*>(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + column_data[data_index++] = + static_cast<Numeric>(_dict_items[_indexes[dict_index++]]); + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value * 24 * 60 * 60, + *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // nanoseconds will be ignored. + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); + // TODO: the precision of datetime v1 + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { Review Comment: warning: unknown type name 'ColumnSelectVector' [clang-diagnostic-error] ```cpp Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast<char*>(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + column_data[data_index++] = + static_cast<Numeric>(_dict_items[_indexes[dict_index++]]); + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value * 24 * 60 * 60, + *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // nanoseconds will be ignored. + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); + // TODO: the precision of datetime v1 + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + ParquetInt96& datetime96 = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + int64_t micros = datetime96.to_timestamp_micros(); + v.from_unixtime(micros / 1000000, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision. + // only keep microseconds. + v.set_microsecond(micros % 1000000); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename DecimalPrimitiveType, typename DecimalPhysicalType> + Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter<DecimalPrimitiveType>(data_type); + auto& column_data = + static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column) + .get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + // we should use decimal128 to scale up/down + Int128 value = static_cast<Int128>(_dict_items[_indexes[dict_index++]]); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + tparquet::Type::type _physical_type; Review Comment: warning: use of undeclared identifier 'tparquet' [clang-diagnostic-error] ```cpp tparquet::Type::type _physical_type; ^ ``` ########## be/src/vec/columns/column_string.h: ########## @@ -280,6 +280,98 @@ } } +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template <typename T, size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) + __attribute__((noinline)); + + template <size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { + size_t new_size = 0; + for (size_t i = 0; i < num; i++) { + new_size += strings[i].size; + } + + const size_t old_size = chars.size(); + check_chars_length(old_size + new_size, offsets.size() + num); + chars.resize(old_size + new_size + copy_length); + + Char* data = chars.data(); + size_t offset = old_size; + for (size_t i = 0; i < num; i++) { + uint32_t len = strings[i].size; + if (len) { + memcpy(data + offset, strings[i].data, copy_length); + offset += len; + } + offsets.push_back(offset); + } + chars.resize(old_size + new_size); + } + + void insert_many_strings_overflow(const StringRef* strings, size_t num, + size_t max_length) override { + if (max_length <= 8) { + insert_many_strings_fixed_length<8>(strings, num); + } else if (max_length <= 16) { + insert_many_strings_fixed_length<16>(strings, num); + } else if (max_length <= 32) { + insert_many_strings_fixed_length<32>(strings, num); + } else if (max_length <= 64) { + insert_many_strings_fixed_length<64>(strings, num); + } else if (max_length <= 128) { + insert_many_strings_fixed_length<128>(strings, num); + } else { + insert_many_strings(strings, num); + } + } + +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template <typename T, size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) + __attribute__((noinline)); + + template <size_t copy_length> + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { + size_t new_size = 0; + for (size_t i = 0; i < num; i++) { + new_size += strings[i].size; + } + + const size_t old_size = chars.size(); + check_chars_length(old_size + new_size, offsets.size() + num); + chars.resize(old_size + new_size + copy_length); + + Char* data = chars.data(); + size_t offset = old_size; + for (size_t i = 0; i < num; i++) { + uint32_t len = strings[i].size; + if (len) { + memcpy(data + offset, strings[i].data, copy_length); + offset += len; + } + offsets.push_back(offset); + } + chars.resize(old_size + new_size); + } + + void insert_many_strings_overflow(const StringRef* strings, size_t num, Review Comment: warning: class member cannot be redeclared [clang-diagnostic-error] ```cpp void insert_many_strings_overflow(const StringRef* strings, size_t num, ^ ``` **be/src/vec/columns/column_string.h:311:** previous definition is here ```cpp void insert_many_strings_overflow(const StringRef* strings, size_t num, ^ ``` ########## be/src/vec/exec/format/parquet/decoder.h: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "schema_desc.h" +#include "util/rle_encoding.h" +#include "vec/columns/column_dictionary.h" +#include "vec/data_types/data_type.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::UInt8, UInt8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::UInt16, UInt16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::UInt32, UInt32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::UInt64, UInt64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +struct DecodeParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; +}; + +class Decoder { +public: + Decoder() = default; + virtual ~Decoder() = default; + + static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder); + + // The type with fix length + void set_type_length(int32_t type_length) { _type_length = type_length; } + + // Set the data to be decoded + virtual void set_data(Slice* data) { + _data = data; + _offset = 0; + } + + void init(FieldSchema* field_schema, cctz::time_zone* ctz); + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type); + + // Write the decoded values batch to doris's column + virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) = 0; + + virtual Status skip_values(size_t num_values) = 0; + + virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) { + return Status::NotSupported("set_dict is not supported"); + } + +protected: + int32_t _type_length; + Slice* _data = nullptr; + uint32_t _offset = 0; + FieldSchema* _field_schema = nullptr; + std::unique_ptr<DecodeParams> _decode_params = nullptr; +}; + +class BaseDictDecoder : public Decoder { +public: + BaseDictDecoder() = default; + virtual ~BaseDictDecoder() override = default; + + // Set the data to be decoded + virtual void set_data(Slice* data) override { + _data = data; + _offset = 0; + uint8_t bit_width = *data->data; + _index_batch_decoder.reset( + new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1, + static_cast<int>(data->size) - 1, bit_width)); + } + +protected: + /** + * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type, + * and the coded values must be read into _indexes previously. + */ + Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + DCHECK(doris_column->is_column_dictionary()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data(); + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + uint32_t* start_index = &_indexes[0]; + column_data.insert(start_index + dict_index, start_index + dict_index + run_length); + dict_index += run_length; + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + break; + } + } + } + return Status::OK(); + } + + Status skip_values(size_t num_values) override { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + return Status::OK(); + } + +protected: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` **be/src/vec/exec/format/parquet/decoder.h:109:** previously declared here ```cpp protected: ^ ``` ########## be/src/vec/columns/column.h: ########## @@ -265,6 +265,16 @@ class IColumn : public COW<IColumn> { LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name(); } + virtual void insert_many_strings_overflow(const StringRef* strings, size_t num, + size_t max_length) { + LOG(FATAL) << "Method insert_many_strings_overflow is not supported for " << get_name(); + } + + virtual void insert_many_strings_overflow(const StringRef* strings, size_t num, Review Comment: warning: class member cannot be redeclared [clang-diagnostic-error] ```cpp virtual void insert_many_strings_overflow(const StringRef* strings, size_t num, ^ ``` **be/src/vec/columns/column.h:267:** previous definition is here ```cpp virtual void insert_many_strings_overflow(const StringRef* strings, size_t num, ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { Review Comment: warning: unknown type name 'ColumnSelectVector' [clang-diagnostic-error] ```cpp ColumnSelectVector& select_vector) override { ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); Review Comment: warning: use of undeclared identifier '_indexes' [clang-diagnostic-error] ```cpp _indexes.resize(non_null_size); ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; Review Comment: warning: member initializer 'BaseDictDecoder' does not name a non-static data member or base class [clang-diagnostic-error] ```cpp : BaseDictDecoder(), _physical_type(physical_type) {}; ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); Review Comment: warning: use of undeclared identifier '_index_batch_decoder' [clang-diagnostic-error] ```cpp _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) Review Comment: warning: use of undeclared identifier 'tparquet' [clang-diagnostic-error] ```cpp FixLengthDictDecoder(tparquet::Type::type physical_type) ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); Review Comment: warning: use of undeclared identifier '_type_length' [clang-diagnostic-error] ```cpp dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast<char*>(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { Review Comment: warning: unknown type name 'ColumnSelectVector' [clang-diagnostic-error] ```cpp Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { Review Comment: warning: only virtual member functions can be marked 'override' [clang-diagnostic-error] ```suggestion Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) { ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast<char*>(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + column_data[data_index++] = + static_cast<Numeric>(_dict_items[_indexes[dict_index++]]); + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { Review Comment: warning: unknown type name 'ColumnSelectVector' [clang-diagnostic-error] ```cpp Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { ^ ``` ########## be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template <typename T> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast<ColumnDictI32&>(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v<T, ParquetInt96>) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v<T, Int32>) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v<T, Int64>) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast<char*>(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + column_data[data_index++] = + static_cast<Numeric>(_dict_items[_indexes[dict_index++]]); + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value * 24 * 60 * 60, + *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // nanoseconds will be ignored. + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); + // TODO: the precision of datetime v1 + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename CppType, typename ColumnType> + Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + ParquetInt96& datetime96 = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast<CppType&>(column_data[data_index++]); + int64_t micros = datetime96.to_timestamp_micros(); + v.from_unixtime(micros / 1000000, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision. + // only keep microseconds. + v.set_microsecond(micros % 1000000); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template <typename DecimalPrimitiveType, typename DecimalPhysicalType> + Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { Review Comment: warning: unknown type name 'ColumnSelectVector' [clang-diagnostic-error] ```cpp ColumnSelectVector& select_vector) { ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org