This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8f79742f7d9 branch-2.1: [fix](arrow) Fix Arrow serialization and deserialization of Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types (#49244) 8f79742f7d9 is described below commit 8f79742f7d98a3252abe8a9ebdc056fd2399a56e Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Thu Mar 20 09:57:04 2025 +0800 branch-2.1: [fix](arrow) Fix Arrow serialization and deserialization of Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types (#49244) ### What problem does this PR solve? pick #48944 [fix](arrow) Fix UT DataTypeSerDeArrowTest of Array/Map/Struct/Bitmap/HLL/Decimal256 types pick #48398 [fix](arrow) Fix UT DataTypeSerDeArrowTest of Date type --- be/src/runtime/types.cpp | 27 +- be/src/runtime/types.h | 3 +- be/src/util/arrow/block_convertor.cpp | 272 +----------- be/src/util/arrow/row_batch.cpp | 2 - be/src/vec/columns/column_array.cpp | 1 + be/src/vec/columns/column_map.cpp | 1 + be/src/vec/columns/column_string.h | 4 + be/src/vec/columns/column_struct.cpp | 1 + be/src/vec/columns/column_vector.h | 5 +- be/src/vec/data_types/data_type_time_v2.h | 4 +- .../data_types/serde/data_type_date64_serde.cpp | 33 +- .../vec/data_types/serde/data_type_date64_serde.h | 7 + .../serde/data_type_datetimev2_serde.cpp | 5 +- .../data_types/serde/data_type_datev2_serde.cpp | 11 +- .../data_types/serde/data_type_decimal_serde.cpp | 45 +- .../vec/data_types/serde/data_type_ipv6_serde.cpp | 18 +- .../data_types/serde/data_type_number_serde.cpp | 20 +- .../vec/data_types/common_data_type_serder_test.h | 445 +++++++++++++++++++ be/test/vec/data_types/data_type_map_test.cpp | 178 ++++++++ be/test/vec/data_types/data_type_struct_test.cpp | 115 +++++ .../serde/data_type_serde_arrow_test.cpp | 481 +++++++-------------- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 4 +- be/test/vec/exprs/vexpr_test.cpp | 4 +- 23 files changed, 1037 insertions(+), 649 deletions(-) diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp index 14ba4b2cebd..f0112ebd2bc 100644 --- a/be/src/runtime/types.cpp +++ b/be/src/runtime/types.cpp @@ -46,12 +46,20 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& types, int* idx) DCHECK(scalar_type.__isset.len); len = scalar_type.len; } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == TYPE_DECIMAL64 || - type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type == TYPE_DATETIMEV2 || - type == TYPE_TIMEV2) { + type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) { DCHECK(scalar_type.__isset.precision); DCHECK(scalar_type.__isset.scale); precision = scalar_type.precision; scale = scalar_type.scale; + } else if (type == TYPE_DATETIMEV2) { + DCHECK(scalar_type.__isset.scale); + scale = scalar_type.scale; + } else if (type == TYPE_TIMEV2) { + if (scalar_type.__isset.scale) { + scale = scalar_type.scale; + } else { + scale = 0; + } } else if (type == TYPE_STRING) { if (scalar_type.__isset.len) { len = scalar_type.len; @@ -152,11 +160,14 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) const { // DCHECK_NE(len, -1); scalar_type.__set_len(len); } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == TYPE_DECIMAL64 || - type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type == TYPE_DATETIMEV2) { + type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) { DCHECK_NE(precision, -1); DCHECK_NE(scale, -1); scalar_type.__set_precision(precision); scalar_type.__set_scale(scale); + } else if (type == TYPE_DATETIMEV2) { + DCHECK_NE(scale, -1); + scalar_type.__set_scale(scale); } } } @@ -169,11 +180,14 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const { if (type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_HLL || type == TYPE_STRING) { scalar_type->set_len(len); } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == TYPE_DECIMAL64 || - type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type == TYPE_DATETIMEV2) { + type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) { DCHECK_NE(precision, -1); DCHECK_NE(scale, -1); scalar_type->set_precision(precision); scalar_type->set_scale(scale); + } else if (type == TYPE_DATETIMEV2) { + DCHECK_NE(scale, -1); + scalar_type->set_scale(scale); } else if (type == TYPE_ARRAY) { node->set_type(TTypeNodeType::ARRAY); node->set_contains_null(contains_nulls[0]); @@ -219,11 +233,14 @@ TypeDescriptor::TypeDescriptor(const google::protobuf::RepeatedPtrField<PTypeNod DCHECK(scalar_type.has_len()); len = scalar_type.len(); } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == TYPE_DECIMAL64 || - type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type == TYPE_DATETIMEV2) { + type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) { DCHECK(scalar_type.has_precision()); DCHECK(scalar_type.has_scale()); precision = scalar_type.precision(); scale = scalar_type.scale(); + } else if (type == TYPE_DATETIMEV2) { + DCHECK(scalar_type.has_scale()); + scale = scalar_type.scale(); } else if (type == TYPE_STRING) { if (scalar_type.has_len()) { len = scalar_type.len(); diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index 4cb7d51e4b5..c96480274fc 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -49,6 +49,7 @@ struct TypeDescriptor { /// Only set if type == TYPE_DECIMAL int precision; + /// Only set if type == TYPE_DECIMAL or type = TYPE_DATETIMEV2 int scale; std::vector<TypeDescriptor> children; @@ -68,11 +69,11 @@ struct TypeDescriptor { // explicit TypeDescriptor(PrimitiveType type) : TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1), scale(-1) { + // TODO, should not initialize default values, force initialization by parameters or external. if (type == TYPE_DECIMALV2) { precision = 27; scale = 9; } else if (type == TYPE_DATETIMEV2) { - precision = 18; scale = 6; } } diff --git a/be/src/util/arrow/block_convertor.cpp b/be/src/util/arrow/block_convertor.cpp index 817231e02ba..4db60144ea5 100644 --- a/be/src/util/arrow/block_convertor.cpp +++ b/be/src/util/arrow/block_convertor.cpp @@ -66,10 +66,7 @@ class Array; namespace doris { -// Convert Block to an Arrow::Array -// We should keep this function to keep compatible with arrow's type visitor -// Now we inherit TypeVisitor to use default Visit implementation -class FromBlockConverter : public arrow::TypeVisitor { +class FromBlockConverter { public: FromBlockConverter(const vectorized::Block& block, const std::shared_ptr<arrow::Schema>& schema, arrow::MemoryPool* pool, const cctz::time_zone& timezone_obj) @@ -79,276 +76,11 @@ public: _cur_field_idx(-1), _timezone_obj(timezone_obj) {} - ~FromBlockConverter() override = default; - - // Use base class function - using arrow::TypeVisitor::Visit; - -#define PRIMITIVE_VISIT(TYPE) \ - arrow::Status Visit(const arrow::TYPE& type) override { return _visit(type); } - - PRIMITIVE_VISIT(Int8Type) - PRIMITIVE_VISIT(Int16Type) - PRIMITIVE_VISIT(Int32Type) - PRIMITIVE_VISIT(Int64Type) - PRIMITIVE_VISIT(FloatType) - PRIMITIVE_VISIT(DoubleType) - -#undef PRIMITIVE_VISIT - - // process string-transformable field - arrow::Status Visit(const arrow::StringType& type) override { - auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder); - size_t start = _cur_start; - size_t num_rows = _cur_rows; - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = _cur_col->get_data_at(i); - vectorized::TypeIndex type_idx = vectorized::remove_nullable(_cur_type)->get_type_id(); - switch (type_idx) { - case vectorized::TypeIndex::String: - case vectorized::TypeIndex::FixedString: - case vectorized::TypeIndex::HLL: { - if (data_ref.size == 0) { - // 0x01 is a magic num, not useful actually, just for present "" - //char* tmp_val = reinterpret_cast<char*>(0x01); - ARROW_RETURN_NOT_OK(builder.Append("")); - } else { - ARROW_RETURN_NOT_OK(builder.Append(data_ref.data, data_ref.size)); - } - break; - } - case vectorized::TypeIndex::Date: - case vectorized::TypeIndex::DateTime: { - char buf[64]; - const VecDateTimeValue* time_val = (const VecDateTimeValue*)(data_ref.data); - int len = time_val->to_buffer(buf); - ARROW_RETURN_NOT_OK(builder.Append(buf, len)); - break; - } - case vectorized::TypeIndex::DateV2: { - char buf[64]; - const DateV2Value<DateV2ValueType>* time_val = - (const DateV2Value<DateV2ValueType>*)(data_ref.data); - int len = time_val->to_buffer(buf); - ARROW_RETURN_NOT_OK(builder.Append(buf, len)); - break; - } - case vectorized::TypeIndex::DateTimeV2: { - char buf[64]; - const DateV2Value<DateTimeV2ValueType>* time_val = - (const DateV2Value<DateTimeV2ValueType>*)(data_ref.data); - int len = time_val->to_buffer(buf); - ARROW_RETURN_NOT_OK(builder.Append(buf, len)); - break; - } - case vectorized::TypeIndex::Int128: { - auto string_temp = LargeIntValue::to_string( - reinterpret_cast<const PackedInt128*>(data_ref.data)->value); - ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), string_temp.size())); - break; - } - case vectorized::TypeIndex::JSONB: { - std::string string_temp = - JsonbToJson::jsonb_to_json_string(data_ref.data, data_ref.size); - ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), string_temp.size())); - break; - } - default: { - LOG(WARNING) << "can't convert this type = " << vectorized::getTypeName(type_idx) - << " to arrow type"; - return arrow::Status::TypeError("unsupported column type"); - } - } - } - return arrow::Status::OK(); - } - - // process doris Decimal - arrow::Status Visit(const arrow::Decimal128Type& type) override { - auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder); - size_t start = _cur_start; - size_t num_rows = _cur_rows; - if (auto* decimalv2_column = vectorized::check_and_get_column< - vectorized::ColumnDecimal<vectorized::Decimal128V2>>( - *vectorized::remove_nullable(_cur_col))) { - std::shared_ptr<arrow::DataType> s_decimal_ptr = - std::make_shared<arrow::Decimal128Type>(27, 9); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = decimalv2_column->get_data_at(i); - const PackedInt128* p_value = reinterpret_cast<const PackedInt128*>(data_ref.data); - int64_t high = (p_value->value) >> 64; - uint64 low = p_value->value; - arrow::Decimal128 value(high, low); - ARROW_RETURN_NOT_OK(builder.Append(value)); - } - return arrow::Status::OK(); - } else if (auto* decimal128_column = vectorized::check_and_get_column< - vectorized::ColumnDecimal<vectorized::Decimal128V3>>( - *vectorized::remove_nullable(_cur_col))) { - std::shared_ptr<arrow::DataType> s_decimal_ptr = - std::make_shared<arrow::Decimal128Type>(38, decimal128_column->get_scale()); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = decimal128_column->get_data_at(i); - const PackedInt128* p_value = reinterpret_cast<const PackedInt128*>(data_ref.data); - int64_t high = (p_value->value) >> 64; - uint64 low = p_value->value; - arrow::Decimal128 value(high, low); - ARROW_RETURN_NOT_OK(builder.Append(value)); - } - return arrow::Status::OK(); - } else if (auto* decimal32_column = vectorized::check_and_get_column< - vectorized::ColumnDecimal<vectorized::Decimal32>>( - *vectorized::remove_nullable(_cur_col))) { - std::shared_ptr<arrow::DataType> s_decimal_ptr = - std::make_shared<arrow::Decimal128Type>(8, decimal32_column->get_scale()); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = decimal32_column->get_data_at(i); - const int32_t* p_value = reinterpret_cast<const int32_t*>(data_ref.data); - int64_t high = *p_value > 0 ? 0 : 1UL << 63; - arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value); - ARROW_RETURN_NOT_OK(builder.Append(value)); - } - return arrow::Status::OK(); - } else if (auto* decimal64_column = vectorized::check_and_get_column< - vectorized::ColumnDecimal<vectorized::Decimal64>>( - *vectorized::remove_nullable(_cur_col))) { - std::shared_ptr<arrow::DataType> s_decimal_ptr = - std::make_shared<arrow::Decimal128Type>(18, decimal64_column->get_scale()); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = decimal64_column->get_data_at(i); - const int64_t* p_value = reinterpret_cast<const int64_t*>(data_ref.data); - int64_t high = *p_value > 0 ? 0 : 1UL << 63; - arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value); - ARROW_RETURN_NOT_OK(builder.Append(value)); - } - return arrow::Status::OK(); - } else { - return arrow::Status::TypeError("Unsupported column:" + _cur_col->get_name()); - } - } - // process boolean - arrow::Status Visit(const arrow::BooleanType& type) override { - auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder); - size_t start = _cur_start; - size_t num_rows = _cur_rows; - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = _cur_col->get_data_at(i); - ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data)); - } - return arrow::Status::OK(); - } - - // process array type - arrow::Status Visit(const arrow::ListType& type) override { - auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder); - auto orignal_col = _cur_col; - size_t start = _cur_start; - size_t num_rows = _cur_rows; - - const vectorized::ColumnArray* array_column = nullptr; - if (orignal_col->is_nullable()) { - auto nullable_column = - assert_cast<const vectorized::ColumnNullable*>(orignal_col.get()); - array_column = assert_cast<const vectorized::ColumnArray*>( - &nullable_column->get_nested_column()); - } else { - array_column = assert_cast<const vectorized::ColumnArray*>(orignal_col.get()); - } - const auto& offsets = array_column->get_offsets(); - vectorized::ColumnPtr nested_column = array_column->get_data_ptr(); - - // set current col/type/builder to nested - _cur_col = nested_column; - if (_cur_type->is_nullable()) { - auto nullable_type = assert_cast<const vectorized::DataTypeNullable*>(_cur_type.get()); - _cur_type = assert_cast<const vectorized::DataTypeArray*>( - nullable_type->get_nested_type().get()) - ->get_nested_type(); - } else { - _cur_type = assert_cast<const vectorized::DataTypeArray*>(_cur_type.get()) - ->get_nested_type(); - } - _cur_builder = builder.value_builder(); - - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = orignal_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - // append array elements in row i - ARROW_RETURN_NOT_OK(builder.Append()); - _cur_start = offsets[i - 1]; - _cur_rows = offsets[i] - offsets[i - 1]; - ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(), this)); - } - - return arrow::Status::OK(); - } + ~FromBlockConverter() = default; Status convert(std::shared_ptr<arrow::RecordBatch>* out); private: - template <typename T> - arrow::Status _visit(const T& type) { - auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder); - size_t start = _cur_start; - size_t num_rows = _cur_rows; - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - if (_cur_col->is_nullable()) { - for (size_t i = start; i < start + num_rows; ++i) { - bool is_null = _cur_col->is_null_at(i); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - const auto& data_ref = _cur_col->get_data_at(i); - ARROW_RETURN_NOT_OK(builder.Append(*(const typename T::c_type*)data_ref.data)); - } - } else { - ARROW_RETURN_NOT_OK(builder.AppendValues( - (const typename T::c_type*)_cur_col->get_data_at(start).data, num_rows)); - } - return arrow::Status::OK(); - } - const vectorized::Block& _block; const std::shared_ptr<arrow::Schema>& _schema; arrow::MemoryPool* _pool; diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index ecdd733e76b..38dd40ca4c4 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -105,8 +105,6 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow:: } break; case TYPE_DECIMALV2: - *result = std::make_shared<arrow::Decimal128Type>(27, 9); - break; case TYPE_DECIMAL32: case TYPE_DECIMAL64: case TYPE_DECIMAL128I: diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 9c40676af47..949e992af1d 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -410,6 +410,7 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp } void ColumnArray::insert(const Field& x) { + DCHECK_EQ(x.get_type(), Field::Types::Array); if (x.is_null()) { get_data().insert(Null()); get_offsets().push_back(get_offsets().back() + 1); diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index 694d2e39f5c..46df3c4b59e 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -142,6 +142,7 @@ void ColumnMap::insert_data(const char*, size_t) { } void ColumnMap::insert(const Field& x) { + DCHECK_EQ(x.get_type(), Field::Types::Map); const auto& map = doris::vectorized::get<const Map&>(x); CHECK_EQ(map.size(), 2); const auto& k_f = doris::vectorized::get<const Array&>(map[0]); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 81a495eabd1..b9d684eb53a 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -151,6 +151,10 @@ public: const auto& real_field = vectorized::get<const JsonbField&>(x); s = StringRef(real_field.get_value(), real_field.get_size()); } else { + DCHECK_EQ(x.get_type(), Field::Types::String); + // If `x.get_type()` is not String, such as UInt64, may get the error + // `string column length is too large: total_length=13744632839234567870` + // because `<String>(x).size() = 13744632839234567870` s.data = vectorized::get<const String&>(x).data(); s.size = vectorized::get<const String&>(x).size(); } diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index be0f4e7adde..8cd71d822b2 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -134,6 +134,7 @@ bool ColumnStruct::is_default_at(size_t n) const { } void ColumnStruct::insert(const Field& x) { + DCHECK_EQ(x.get_type(), Field::Types::Tuple); const auto& tuple = x.get<const Tuple&>(); const size_t tuple_size = columns.size(); if (tuple.size() != tuple_size) { diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 444603c1d87..a7c1ddc7ccb 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -356,8 +356,9 @@ public: // For example, during create column_const(1, uint8), will use NearestFieldType // to cast a uint8 to int64, so that the Field is int64, but the column is created - // using data_type, so that T == uint8. After the field is created, it will be inserted - // into the column, but its type is different from column's data type, so that during column + // using data_type, so that T == uint8, NearestFieldType<T> == uint64. + // After the field is created, it will be inserted into the column, + // but its type is different from column's data type (int64 vs uint64), so that during column // insert method, should use NearestFieldType<T> to get the Field and get it actual // uint8 value and then insert into column. void insert(const Field& x) override { diff --git a/be/src/vec/data_types/data_type_time_v2.h b/be/src/vec/data_types/data_type_time_v2.h index 86ce7836c56..28360f7e4de 100644 --- a/be/src/vec/data_types/data_type_time_v2.h +++ b/be/src/vec/data_types/data_type_time_v2.h @@ -114,7 +114,9 @@ public: DataTypeDateTimeV2(const DataTypeDateTimeV2& rhs) : _scale(rhs._scale) {} TypeIndex get_type_id() const override { return TypeIndex::DateTimeV2; } TypeDescriptor get_type_as_type_descriptor() const override { - return TypeDescriptor(TYPE_DATETIMEV2); + auto desc = TypeDescriptor(TYPE_DATETIMEV2); + desc.scale = _scale; + return desc; } doris::FieldType get_storage_field_type() const override { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index 7077229d861..d5b2e1e70d7 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -157,6 +157,12 @@ Status DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl return Status::OK(); } +void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + _read_column_from_arrow<false>(column, arrow_array, start, end, ctz); +} + void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int start, int end, const cctz::time_zone& ctz) const { @@ -196,9 +202,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type unit) { } } -void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, - int start, int end, - const cctz::time_zone& ctz) const { +template <bool is_date> +void DataTypeDate64SerDe::_read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data(); int64_t divisor = 1; int64_t multiplier = 1; @@ -237,13 +244,15 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::A } } else if (arrow_array->type()->id() == arrow::Type::STRING) { // to be compatible with old version, we use string type for date. - auto concrete_array = dynamic_cast<const arrow::StringArray*>(arrow_array); - for (size_t value_i = start; value_i < end; ++value_i) { - Int64 val = 0; + const auto* concrete_array = dynamic_cast<const arrow::StringArray*>(arrow_array); + for (auto value_i = start; value_i < end; ++value_i) { auto val_str = concrete_array->GetString(value_i); - ReadBuffer rb(val_str.data(), val_str.size()); - read_datetime_text_impl(val, rb, ctz); - col_data.emplace_back(val); + VecDateTimeValue v; + v.from_date_str(val_str.c_str(), val_str.length(), ctz); + if constexpr (is_date) { + v.cast_to_date(); + } + col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v)); } } else { throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, @@ -251,6 +260,12 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::A } } +void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + _read_column_from_arrow<true>(column, arrow_array, start, end, ctz); +} + template <bool is_binary_format> Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h b/be/src/vec/data_types/serde/data_type_date64_serde.h index 497ac2aeff4..5f5fc4f1c38 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.h +++ b/be/src/vec/data_types/serde/data_type_date64_serde.h @@ -73,6 +73,11 @@ public: int start, int end, std::vector<StringRef>& buffer_list) const override; +protected: + template <bool is_date> + void _read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const; + private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, @@ -94,6 +99,8 @@ public: Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices, int* num_deserialized, const FormatOptions& options) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp index 12d30961e3e..e8dd2274765 100644 --- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp @@ -160,7 +160,10 @@ void DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column, // convert second v.from_unixtime(utc_epoch / divisor, ctz); // get rest time - v.set_microsecond(utc_epoch % divisor); + // add 0 on the right to make it 6 digits. DateTimeV2Value microsecond is 6 digits, + // the scale decides to keep the first few digits, so the valid digits should be kept at the front. + // "2022-01-01 11:11:11.111", utc_epoch = 1641035471111, divisor = 1000, set_microsecond(111000) + v.set_microsecond((utc_epoch % divisor) * DIVISOR_FOR_MICRO / divisor); col_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v)); } } else { diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp index 95109ee408c..f07c449851e 100644 --- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp @@ -102,15 +102,10 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow::A int start, int end, const cctz::time_zone& ctz) const { auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data(); - auto concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array); - int64_t divisor = 1; - int64_t multiplier = 1; - - multiplier = 24 * 60 * 60; // day => secs - for (size_t value_i = start; value_i < end; ++value_i) { + const auto* concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array); + for (auto value_i = start; value_i < end; ++value_i) { DateV2Value<DateV2ValueType> v; - v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, - ctz); + v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold); col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, UInt32>(v)); } } diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp index d98f6cae2b0..92b69dbfca9 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp @@ -88,8 +88,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const arrow::ArrayBuilder* array_builder, int start, int end, const cctz::time_zone& ctz) const { auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column); - auto& builder = reinterpret_cast<arrow::Decimal128Builder&>(*array_builder); if constexpr (std::is_same_v<T, Decimal<Int128>>) { + auto& builder = reinterpret_cast<arrow::Decimal128Builder&>(*array_builder); std::shared_ptr<arrow::DataType> s_decimal_ptr = std::make_shared<arrow::Decimal128Type>(27, 9); for (size_t i = start; i < end; ++i) { @@ -108,6 +108,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const } // TODO: decimal256 } else if constexpr (std::is_same_v<T, Decimal128V3>) { + auto& builder = reinterpret_cast<arrow::Decimal128Builder&>(*array_builder); std::shared_ptr<arrow::DataType> s_decimal_ptr = std::make_shared<arrow::Decimal128Type>(38, col.get_scale()); for (size_t i = start; i < end; ++i) { @@ -125,6 +126,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const array_builder->type()->name()); } } else if constexpr (std::is_same_v<T, Decimal<Int32>>) { + auto& builder = reinterpret_cast<arrow::Decimal128Builder&>(*array_builder); std::shared_ptr<arrow::DataType> s_decimal_ptr = std::make_shared<arrow::Decimal128Type>(8, col.get_scale()); for (size_t i = start; i < end; ++i) { @@ -139,6 +141,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const array_builder->type()->name()); } } else if constexpr (std::is_same_v<T, Decimal<Int64>>) { + auto& builder = reinterpret_cast<arrow::Decimal128Builder&>(*array_builder); std::shared_ptr<arrow::DataType> s_decimal_ptr = std::make_shared<arrow::Decimal128Type>(18, col.get_scale()); for (size_t i = start; i < end; ++i) { @@ -152,6 +155,28 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const checkArrowStatus(builder.Append(value), column.get_name(), array_builder->type()->name()); } + } else if constexpr (std::is_same_v<T, Decimal256>) { + auto& builder = reinterpret_cast<arrow::Decimal256Builder&>(*array_builder); + std::shared_ptr<arrow::DataType> s_decimal_ptr = + std::make_shared<arrow::Decimal256Type>(76, col.get_scale()); + for (size_t i = start; i < end; ++i) { + if (null_map && (*null_map)[i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + auto p_value = wide::Int256(col.get_element(i)); + using half_type = wide::Int256::base_type; // uint64_t + half_type a0 = p_value.items[wide::Int256::_impl::little(0)]; + half_type a1 = p_value.items[wide::Int256::_impl::little(1)]; + half_type a2 = p_value.items[wide::Int256::_impl::little(2)]; + half_type a3 = p_value.items[wide::Int256::_impl::little(3)]; + + std::array<uint64_t, 4> word_array = {a0, a1, a2, a3}; + arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray, word_array); + checkArrowStatus(builder.Append(value), column.get_name(), + array_builder->type()->name()); + } } else { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "write_column_to_arrow with type " + column.get_name()); @@ -162,14 +187,14 @@ template <typename T> void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const { - auto concrete_array = dynamic_cast<const arrow::DecimalArray*>(arrow_array); - const auto* arrow_decimal_type = - static_cast<const arrow::DecimalType*>(arrow_array->type().get()); - const auto arrow_scale = arrow_decimal_type->scale(); auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data(); // Decimal<Int128> for decimalv2 // Decimal<Int128I> for deicmalv3 if constexpr (std::is_same_v<T, Decimal<Int128>>) { + const auto* concrete_array = dynamic_cast<const arrow::DecimalArray*>(arrow_array); + const auto* arrow_decimal_type = + static_cast<const arrow::DecimalType*>(arrow_array->type().get()); + const auto arrow_scale = arrow_decimal_type->scale(); // TODO check precision for (size_t value_i = start; value_i < end; ++value_i) { auto value = *reinterpret_cast<const vectorized::Decimal128V2*>( @@ -195,7 +220,13 @@ void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column, } } else if constexpr (std::is_same_v<T, Decimal128V3> || std::is_same_v<T, Decimal64> || std::is_same_v<T, Decimal32>) { - for (size_t value_i = start; value_i < end; ++value_i) { + const auto* concrete_array = dynamic_cast<const arrow::DecimalArray*>(arrow_array); + for (auto value_i = start; value_i < end; ++value_i) { + column_data.emplace_back(*reinterpret_cast<const T*>(concrete_array->Value(value_i))); + } + } else if constexpr (std::is_same_v<T, Decimal256>) { + const auto* concrete_array = dynamic_cast<const arrow::Decimal256Array*>(arrow_array); + for (auto value_i = start; value_i < end; ++value_i) { column_data.emplace_back(*reinterpret_cast<const T*>(concrete_array->Value(value_i))); } } else { @@ -257,7 +288,7 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const std::string& timezone, for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { auto& v = col_data[row_id]; - orc::Int128 value(v >> 64, (uint64_t)v); + orc::Int128 value(v >> 64, (uint64_t)v); // TODO, Decimal256 will lose precision cur_batch->values[row_id] = value; } } diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index 612c9ce4222..f59fc712d98 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -165,13 +165,19 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& column, const arrow::Arr buffer->data() + concrete_array->value_offset(offset_i)); const auto raw_data_len = concrete_array->value_length(offset_i); - IPv6 ipv6_val; - if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) { - throw doris::Exception(ErrorCode::INVALID_ARGUMENT, - "parse number fail, string: '{}'", - std::string(raw_data, raw_data_len).c_str()); + if (raw_data_len == 0) { + col_data.emplace_back(0); + } else { + IPv6 ipv6_val; + if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) { + throw doris::Exception(ErrorCode::INVALID_ARGUMENT, + "parse number fail, string: '{}'", + std::string(raw_data, raw_data_len).c_str()); + } + col_data.emplace_back(ipv6_val); } - col_data.emplace_back(ipv6_val); + } else { + col_data.emplace_back(0); } } } diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index f4fb6bbbb1f..522cf02c75f 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -215,14 +215,20 @@ void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column, const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); const auto raw_data_len = concrete_array->value_length(offset_i); - Int128 val = 0; - ReadBuffer rb(raw_data, raw_data_len); - if (!read_int_text_impl(val, rb)) { - throw doris::Exception(ErrorCode::INVALID_ARGUMENT, - "parse number fail, string: '{}'", - std::string(rb.position(), rb.count()).c_str()); + if (raw_data_len == 0) { + col_data.emplace_back(Int128()); // Int128() is NULL + } else { + Int128 val = 0; + ReadBuffer rb(raw_data, raw_data_len); + if (!read_int_text_impl(val, rb)) { + throw doris::Exception(ErrorCode::INVALID_ARGUMENT, + "parse number fail, string: '{}'", + std::string(rb.position(), rb.count()).c_str()); + } + col_data.emplace_back(val); } - col_data.emplace_back(val); + } else { + col_data.emplace_back(Int128()); // Int128() is NULL } } return; diff --git a/be/test/vec/data_types/common_data_type_serder_test.h b/be/test/vec/data_types/common_data_type_serder_test.h new file mode 100644 index 00000000000..c5db1d16f18 --- /dev/null +++ b/be/test/vec/data_types/common_data_type_serder_test.h @@ -0,0 +1,445 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <arrow/record_batch.h> +#include <gen_cpp/data.pb.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> + +#include <filesystem> +#include <fstream> +#include <iostream> +#include <memory> + +#include "arrow/array/array_base.h" +#include "arrow/type.h" +#include "runtime/descriptors.h" +#include "util/arrow/block_convertor.h" +#include "util/arrow/row_batch.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/runtime/ipv6_value.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +// this test is gonna to be a data type serialize and deserialize functions +// such as +// 1. standard hive text ser-deserialize +// deserialize_one_cell_from_hive_text (IColumn &column, Slice &slice, const FormatOptions &options, int hive_text_complex_type_delimiter_level=1) const +// deserialize_column_from_hive_text_vector (IColumn &column, std::vector< Slice > &slices, int *num_deserialized, const FormatOptions &options, int hive_text_complex_type_delimiter_level=1) const +// serialize_one_cell_to_hive_text (const IColumn &column, int row_num, BufferWritable &bw, FormatOptions &options, int hive_text_complex_type_delimiter_level=1) const +// 2. json format ser-deserialize which used in table not in doris database +// serialize_one_cell_to_json (const IColumn &column, int row_num, BufferWritable &bw, FormatOptions &options) const =0 +// serialize_column_to_json (const IColumn &column, int start_idx, int end_idx, BufferWritable &bw, FormatOptions &options) const =0 +// deserialize_one_cell_from_json (IColumn &column, Slice &slice, const FormatOptions &options) const =0 +// deserialize_column_from_json_vector (IColumn &column, std::vector< Slice > &slices, uint64_t *num_deserialized, const FormatOptions &options) const =0 +// deserialize_column_from_fixed_json (IColumn &column, Slice &slice, uint64_t rows, uint64_t *num_deserialized, const FormatOptions &options) const +// insert_column_last_value_multiple_times (IColumn &column, uint64_t times) const +// 3. fe|be protobuffer ser-deserialize +// write_column_to_pb (const IColumn &column, PValues &result, int start, int end) const =0 +// read_column_from_pb (IColumn &column, const PValues &arg) const =0 +// 4. jsonb ser-deserialize which used in row-store situation +// write_one_cell_to_jsonb (const IColumn &column, JsonbWriter &result, Arena *mem_pool, int32_t col_id, int row_num) const =0 +// read_one_cell_from_jsonb (IColumn &column, const JsonbValue *arg) const =0 +// 5. mysql text ser-deserialize +// write_column_to_mysql (const IColumn &column, MysqlRowBuffer< false > &row_buffer, int row_idx, bool col_const, const FormatOptions &options) const =0 +// write_column_to_mysql (const IColumn &column, MysqlRowBuffer< true > &row_buffer, int row_idx, bool col_const, const FormatOptions &options) const =0 +// 6. arrow ser-deserialize which used in spark-flink connector +// write_column_to_arrow (const IColumn &column, const NullMap *null_map, arrow::ArrayBuilder *array_builder, int start, int end, const cctz::time_zone &ctz) const =0 +// read_column_from_arrow (IColumn &column, const arrow::Array *arrow_array, int start, int end, const cctz::time_zone &ctz) const =0 +// 7. rapidjson ser-deserialize +// write_one_cell_to_json (const IColumn &column, rapidjson::Value &result, rapidjson::Document::AllocatorType &allocator, Arena &mem_pool, int row_num) const +// read_one_cell_from_json (IColumn &column, const rapidjson::Value &result) const +// convert_field_to_rapidjson (const vectorized::Field &field, rapidjson::Value &target, rapidjson::Document::AllocatorType &allocator) +// convert_array_to_rapidjson (const vectorized::Array &array, rapidjson::Value &target, rapidjson::Document::AllocatorType &allocator) + +namespace doris::vectorized { + +class CommonDataTypeSerdeTest : public ::testing::Test { +public: + ////================================================================================================================== + // this is common function to check data in column against expected results according different function in assert function + // which can be used in all column test + // such as run regress tests + // step1. we can set gen_check_data_in_assert to true, then we will generate a file for check data, otherwise we will read the file to check data + // step2. we should write assert callback function to check data + static void check_data( + MutableColumns& columns, DataTypeSerDeSPtrs serders, char col_spliter, + std::set<int> idxes, const std::string& column_data_file, + std::function<void(MutableColumns& load_cols, DataTypeSerDeSPtrs serders)> + assert_callback, + bool is_hive_format = false, DataTypes dataTypes = {}) { + ASSERT_EQ(serders.size(), columns.size()); + // Step 1: Insert data from `column_data_file` into the column and check result with `check_data_file` + // Load column data and expected data from CSV files + std::vector<std::vector<std::string>> res; + struct stat buff; + if (stat(column_data_file.c_str(), &buff) == 0) { + if (S_ISREG(buff.st_mode)) { + // file + if (is_hive_format) { + load_data_and_assert_from_csv<true, true>(serders, columns, column_data_file, + col_spliter, idxes); + } else { + load_data_and_assert_from_csv<false, true>(serders, columns, column_data_file, + col_spliter, idxes); + } + } else if (S_ISDIR(buff.st_mode)) { + // dir + std::filesystem::path fs_path(column_data_file); + for (const auto& entry : std::filesystem::directory_iterator(fs_path)) { + std::string file_path = entry.path().string(); + std::cout << "load data from file: " << file_path << std::endl; + if (is_hive_format) { + load_data_and_assert_from_csv<true, true>(serders, columns, file_path, + col_spliter, idxes); + } else { + load_data_and_assert_from_csv<false, true>(serders, columns, file_path, + col_spliter, idxes); + } + } + } + } + + // Step 2: Validate the data in `column` matches `expected_data` + assert_callback(columns, serders); + } + + // Helper function to load data from CSV, with index which splited by spliter and load to columns + template <bool is_hive_format, bool generate_res_file> + static void load_data_and_assert_from_csv(const DataTypeSerDeSPtrs serders, + MutableColumns& columns, const std::string& file_path, + const char spliter = ';', + const std::set<int> idxes = {0}) { + ASSERT_EQ(serders.size(), columns.size()) + << "serder size: " << serders.size() << " column size: " << columns.size(); + ASSERT_EQ(serders.size(), idxes.size()) + << "serder size: " << serders.size() << " idxes size: " << idxes.size(); + std::ifstream file(file_path); + if (!file) { + throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "can not open the file: {} ", + file_path); + } + + std::string line; + DataTypeSerDe::FormatOptions options; + std::vector<std::vector<std::string>> res; + MutableColumns assert_str_cols(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + assert_str_cols[i] = ColumnString::create(); + } + + while (std::getline(file, line)) { + std::stringstream lineStream(line); + std::string value; + int l_idx = 0; + int c_idx = 0; + std::vector<std::string> row; + while (std::getline(lineStream, value, spliter)) { + if (!value.starts_with("//") && idxes.contains(l_idx)) { + // load csv data + Slice string_slice(value.data(), value.size()); + Status st; + // deserialize data + if constexpr (is_hive_format) { + st = serders[c_idx]->deserialize_one_cell_from_hive_text( + *columns[c_idx], string_slice, options); + } else { + st = serders[c_idx]->deserialize_one_cell_from_json(*columns[c_idx], + string_slice, options); + } + if (!st.ok()) { + // deserialize if happen error now we do not insert any value for input column + // so we push a default value to column for row alignment + columns[c_idx]->insert_default(); + std::cout << "error in deserialize but continue: " << st.to_string() + << std::endl; + } + // serialize data + size_t row_num = columns[c_idx]->size() - 1; + assert_str_cols[c_idx]->reserve(columns[c_idx]->size()); + VectorBufferWriter bw(assert_cast<ColumnString&>(*assert_str_cols[c_idx])); + if constexpr (is_hive_format) { + st = serders[c_idx]->serialize_one_cell_to_hive_text(*columns[c_idx], + row_num, bw, options); + EXPECT_TRUE(st.ok()) << st.to_string(); + } else { + st = serders[c_idx]->serialize_one_cell_to_json(*columns[c_idx], row_num, + bw, options); + EXPECT_TRUE(st.ok()) << st.to_string(); + } + bw.commit(); + // assert data : origin data and serialized data should be equal or generated + // file to check data + size_t assert_size = assert_str_cols[c_idx]->size(); + if constexpr (!generate_res_file) { + EXPECT_EQ(assert_str_cols[c_idx]->get_data_at(assert_size - 1).to_string(), + string_slice.to_string()) + << "column: " << columns[c_idx]->get_name() << " row: " << row_num + << " is_hive_format: " << is_hive_format; + } + ++c_idx; + } + res.push_back(row); + ++l_idx; + } + } + + if (generate_res_file) { + // generate res + auto pos = file_path.find_last_of("."); + std::string hive_format = is_hive_format ? "_hive" : ""; + std::string res_file = file_path.substr(0, pos) + hive_format + "_serde_res.csv"; + std::ofstream res_f(res_file); + if (!res_f.is_open()) { + throw std::ios_base::failure("Failed to open file." + res_file); + } + for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) { + for (size_t c = 0; c < assert_str_cols.size(); ++c) { + res_f << assert_str_cols[c]->get_data_at(r).to_string() << spliter; + } + res_f << std::endl; + } + res_f.close(); + std::cout << "generate res file: " << res_file << std::endl; + } + } + + // standard hive text ser-deserialize assert function + // pb serde now is only used RPCFncall and fold_constant_executor which just write column data to pb value means + // just call write_column_to_pb + static void assert_pb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) { + for (size_t i = 0; i < load_cols.size(); ++i) { + auto& col = load_cols[i]; + std::cout << " now we are testing column : " << col->get_name() << std::endl; + // serialize to pb + PValues pv = PValues(); + Status st = serders[i]->write_column_to_pb(*col, pv, 0, col->size()); + if (!st.ok()) { + std::cerr << "write_column_to_pb error: " << st.msg() << std::endl; + continue; + } + // deserialize from pb + auto except_column = col->clone_empty(); + st = serders[i]->read_column_from_pb(*except_column, pv); + EXPECT_TRUE(st.ok()) << st.to_string(); + // check pb value from expected column + PValues as_pv = PValues(); + st = serders[i]->write_column_to_pb(*except_column, as_pv, 0, except_column->size()); + EXPECT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(pv.bytes_value_size(), as_pv.bytes_value_size()); + // check column value + for (size_t j = 0; j < col->size(); ++j) { + auto cell = col->operator[](j); + auto except_cell = except_column->operator[](j); + EXPECT_EQ(cell, except_cell) << "column: " << col->get_name() << " row: " << j; + } + } + } + + // actually this is block_to_jsonb and jsonb_to_block test + // static void assert_jsonb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) { + // Arena pool; + // auto jsonb_column = ColumnString::create(); // jsonb column + // // maybe these load_cols has different size, so we keep it same + // size_t max_row_size = load_cols[0]->size(); + // for (size_t i = 1; i < load_cols.size(); ++i) { + // if (load_cols[i]->size() > max_row_size) { + // max_row_size = load_cols[i]->size(); + // } + // } + // // keep same rows + // for (size_t i = 0; i < load_cols.size(); ++i) { + // if (load_cols[i]->size() < max_row_size) { + // load_cols[i]->insert_many_defaults(max_row_size - load_cols[i]->size()); + // } else if (load_cols[i]->size() > max_row_size) { + // load_cols[i]->resize(max_row_size); + // } + // } + // jsonb_column->reserve(load_cols[0]->size()); + // MutableColumns assert_cols; + // for (size_t i = 0; i < load_cols.size(); ++i) { + // assert_cols.push_back(load_cols[i]->assume_mutable()); + // } + // for (size_t r = 0; r < load_cols[0]->size(); ++r) { + // JsonbWriterT<JsonbOutStream> jw; + // jw.writeStartObject(); + // // serialize to jsonb + // for (size_t i = 0; i < load_cols.size(); ++i) { + // auto& col = load_cols[i]; + // serders[i]->write_one_cell_to_jsonb(*col, jw, &pool, i, r); + // } + // jw.writeEndObject(); + // jsonb_column->insert_data(jw.getOutput()->getBuffer(), jw.getOutput()->getSize()); + // } + // // deserialize jsonb column to assert column + // EXPECT_EQ(jsonb_column->size(), load_cols[0]->size()); + // for (size_t r = 0; r < jsonb_column->size(); ++r) { + // StringRef jsonb_data = jsonb_column->get_data_at(r); + // auto pdoc = JsonbDocument::checkAndCreateDocument(jsonb_data.data, jsonb_data.size); + // JsonbDocument& doc = *pdoc; + // size_t cIdx = 0; + // for (auto it = doc->begin(); it != doc->end(); ++it) { + // serders[cIdx]->read_one_cell_from_jsonb(*assert_cols[cIdx], it->value()); + // ++cIdx; + // } + // } + // // check column value + // for (size_t i = 0; i < load_cols.size(); ++i) { + // auto& col = load_cols[i]; + // auto& assert_col = assert_cols[i]; + // for (size_t j = 0; j < col->size(); ++j) { + // auto cell = col->operator[](j); + // auto assert_cell = assert_col->operator[](j); + // EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() << " row: " << j; + // } + // } + // } + + // assert mysql text format, now we just simple assert not to fatal or exception here + static void assert_mysql_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) { + MysqlRowBuffer<false> row_buffer; + for (size_t i = 0; i < load_cols.size(); ++i) { + auto& col = load_cols[i]; + for (size_t j = 0; j < col->size(); ++j) { + Status st; + EXPECT_NO_FATAL_FAILURE( + st = serders[i]->write_column_to_mysql(*col, row_buffer, j, false, {})); + EXPECT_TRUE(st.ok()) << st.to_string(); + } + } + } + + // assert arrow serialize + static void assert_arrow_format(MutableColumns& load_cols, DataTypes types) { + // make a block to write to arrow + auto block = std::make_shared<Block>(); + build_block(block, load_cols, types); + auto record_batch = serialize_arrow(block); + auto assert_block = std::make_shared<Block>(block->clone_empty()); + deserialize_arrow(assert_block, record_batch); + compare_two_blocks(block, assert_block); + } + + static void build_block(const std::shared_ptr<Block>& block, MutableColumns& load_cols, + DataTypes types) { + // maybe these load_cols has different size, so we keep it same + size_t max_row_size = load_cols[0]->size(); + for (size_t i = 1; i < load_cols.size(); ++i) { + if (load_cols[i]->size() > max_row_size) { + max_row_size = load_cols[i]->size(); + } + } + // keep same rows + for (auto& load_col : load_cols) { + if (load_col->size() < max_row_size) { + load_col->insert_many_defaults(max_row_size - load_col->size()); + } else if (load_col->size() > max_row_size) { + load_col->resize(max_row_size); + } + } + for (size_t i = 0; i < load_cols.size(); ++i) { + auto& col = load_cols[i]; + block->insert(ColumnWithTypeAndName(std::move(col), types[i], types[i]->get_name())); + } + // print block + std::cout << "build block structure: " << block->dump_structure() << std::endl; + std::cout << "build block data: " + << block->dump_data(0, std::min(max_row_size, static_cast<size_t>(5))) + << std::endl; + for (int i = 0; i < block->columns(); i++) { + auto col = block->get_by_position(i); + std::cout << "col: " << i << ", " << col.column->get_name() << ", " + << col.type->get_name() << ", " << col.to_string(0) << std::endl; + } + } + + static std::shared_ptr<arrow::RecordBatch> serialize_arrow( + const std::shared_ptr<Block>& block) { + std::shared_ptr<arrow::Schema> block_arrow_schema; + EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema, "UTC"), Status::OK()); + std::cout << "schema: " << block_arrow_schema->ToString(true) << std::endl; + // convert block to arrow + std::shared_ptr<arrow::RecordBatch> result; + cctz::time_zone _timezone_obj; //default UTC + Status stt = convert_to_arrow_batch(*block, block_arrow_schema, + arrow::default_memory_pool(), &result, _timezone_obj); + EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" << stt.to_string(); + std::cout << "arrow serialize result: " << result->num_columns() << ", " + << result->num_rows() << std::endl; + return result; + } + + static void deserialize_arrow(const std::shared_ptr<Block>& new_block, + std::shared_ptr<arrow::RecordBatch> record_batch) { + // deserialize arrow to block + auto rows = record_batch->num_rows(); + for (size_t i = 0; i < record_batch->num_columns(); ++i) { + auto array = record_batch->column(i); + std::cout << "arrow record_batch pos: " << i << ", array: " << array->ToString() + << std::endl; + auto& column_with_type_and_name = new_block->get_by_position(i); + std::cout << "now we are testing column: " + << column_with_type_and_name.column->get_name() + << ", type: " << column_with_type_and_name.type->get_name() << std::endl; + auto ret = + arrow_column_to_doris_column(array.get(), 0, column_with_type_and_name.column, + column_with_type_and_name.type, rows, "UTC"); + // do check data + std::cout << "arrow_column_to_doris_column done, column data: " + << column_with_type_and_name.to_string(0) + << ", column size: " << column_with_type_and_name.column->size() << std::endl; + EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" << ret.to_string(); + } + std::cout << "arrow deserialize block structure: " << new_block->dump_structure() + << std::endl; + std::cout << "arrow deserialize block data: " + << new_block->dump_data( + 0, std::min(static_cast<size_t>(rows), static_cast<size_t>(5))) + << std::endl; + } + + static void compare_two_blocks(const std::shared_ptr<Block>& frist_block, + const std::shared_ptr<Block>& second_block) { + for (size_t i = 0; i < frist_block->columns(); ++i) { + EXPECT_EQ(frist_block->get_by_position(i).type, second_block->get_by_position(i).type); + auto& col = frist_block->get_by_position(i).column; + auto& assert_col = second_block->get_by_position(i).column; + std::cout << "compare_two_blocks, column: " << col->get_name() + << ", type: " << frist_block->get_by_position(i).type->get_name() + << ", size: " << col->size() << ", assert size: " << assert_col->size() + << std::endl; + EXPECT_EQ(assert_col->size(), col->size()); + for (size_t j = 0; j < assert_col->size(); ++j) { + EXPECT_EQ(frist_block->get_by_position(i).to_string(j), + second_block->get_by_position(i).to_string(j)); + auto cell = col->operator[](j); + auto assert_cell = assert_col->operator[](j); + EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() << " row: " << j; + } + } + EXPECT_EQ(frist_block->dump_data(), second_block->dump_data()); + } + + // assert rapidjson format + // now rapidjson write_one_cell_to_json and read_one_cell_from_json only used in column_object + // can just be replaced by jsonb format +}; + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/data_type_map_test.cpp b/be/test/vec/data_types/data_type_map_test.cpp new file mode 100644 index 00000000000..548802b0d91 --- /dev/null +++ b/be/test/vec/data_types/data_type_map_test.cpp @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/data_types/data_type_map.h" + +#include <execinfo.h> // for backtrace on Linux +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> + +#include <iostream> + +#include "common/exception.h" +#include "vec/columns/column.h" +#include "vec/core/types.h" +#include "vec/data_types/common_data_type_serder_test.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_struct.h" +#include "vec/function/function_test_util.h" + +namespace doris::vectorized { + +// TODO `DataTypeMapSerDe::deserialize_one_cell_from_json` has a bug, +// `SerdeArrowTest` cannot test Map type nested Array and Struct and Map, +// so manually construct data to test them. +// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed. +TEST(DataTypeMapTest, SerdeNestedTypeArrowTest) { + auto block = std::make_shared<Block>(); + { + std::string col_name = "map_nesting_array"; + DataTypePtr f1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr f2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); + DataTypePtr dt1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1)); + DataTypePtr dt2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f2)); + DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2); + + Array a1, a2, a3, a4; + a1.push_back(Field("cute")); + a1.push_back(Null()); + a2.push_back(Field("clever")); + a1.push_back(Field("hello")); + a3.push_back(1); + a3.push_back(2); + a4.push_back(11); + a4.push_back(22); + + Array k1, v1; + k1.push_back(a1); + k1.push_back(a2); + v1.push_back(a3); + v1.push_back(a4); + + Map m1; + m1.push_back(k1); + m1.push_back(v1); + + MutableColumnPtr map_column = ma->create_column(); + map_column->reserve(1); + map_column->insert(m1); + vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), ma, col_name); + block->insert(type_and_name); + } + { + std::string col_name = "map_nesting_struct"; + DataTypePtr f1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr f2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>()); + DataTypePtr f3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()); + DataTypePtr f4 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr dt1 = std::make_shared<DataTypeNullable>( + std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f1, f2, f3})); + DataTypePtr dt2 = std::make_shared<DataTypeNullable>( + std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f4})); + DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2); + + Tuple t1, t2, t3, t4; + t1.push_back(Field("clever")); + t1.push_back(__int128_t(37)); + t1.push_back(true); + t2.push_back("null"); + t2.push_back(__int128_t(26)); + t2.push_back(false); + t3.push_back(Field("cute")); + t4.push_back("null"); + + Array k1, v1; + k1.push_back(t1); + k1.push_back(t2); + v1.push_back(t3); + v1.push_back(t4); + + Map m1; + m1.push_back(k1); + m1.push_back(v1); + + MutableColumnPtr map_column = ma->create_column(); + map_column->reserve(1); + map_column->insert(m1); + vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), ma, col_name); + block->insert(type_and_name); + } + { + std::string col_name = "map_nesting_map"; + DataTypePtr f1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); + DataTypePtr f2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr f3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>()); + DataTypePtr f4 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()); + DataTypePtr dt1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f1, f2)); + DataTypePtr dt2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f3, f4)); + DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2); + + Array k1, k2, k3, k4, v1, v2, v3, v4; + k1.push_back(1); + k1.push_back(2); + k2.push_back(11); + k2.push_back(22); + v1.push_back(Field("map")); + v1.push_back(Null()); + v2.push_back(Field("clever map")); + v2.push_back(Field("hello map")); + k3.push_back(__int128_t(37)); + k3.push_back(__int128_t(26)); + k4.push_back(__int128_t(1111)); + k4.push_back(__int128_t(432535423)); + v3.push_back(true); + v3.push_back(false); + v4.push_back(false); + v4.push_back(true); + + Map m11, m12, m21, m22; + m11.push_back(k1); + m11.push_back(v1); + m12.push_back(k2); + m12.push_back(v2); + m21.push_back(k3); + m21.push_back(v3); + m22.push_back(k4); + m22.push_back(v4); + + Array kk1, vv1; + kk1.push_back(m11); + kk1.push_back(m12); + vv1.push_back(m21); + vv1.push_back(m22); + + Map m1; + m1.push_back(kk1); + m1.push_back(vv1); + + MutableColumnPtr map_column = ma->create_column(); + map_column->reserve(1); + map_column->insert(m1); + vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), ma, col_name); + block->insert(type_and_name); + } + std::shared_ptr<arrow::RecordBatch> record_batch = + CommonDataTypeSerdeTest::serialize_arrow(block); + auto assert_block = std::make_shared<Block>(block->clone_empty()); + CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch); + CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/data_type_struct_test.cpp b/be/test/vec/data_types/data_type_struct_test.cpp new file mode 100644 index 00000000000..1fc8aac0312 --- /dev/null +++ b/be/test/vec/data_types/data_type_struct_test.cpp @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/data_types/data_type_struct.h" + +#include <execinfo.h> // for backtrace on Linux +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> + +#include <iostream> + +#include "vec/columns/column.h" +#include "vec/core/types.h" +#include "vec/data_types/common_data_type_serder_test.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_map.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/function/function_test_util.h" + +namespace doris::vectorized { + +// TODO `DataTypeStructSerDe::deserialize_one_cell_from_json` has a bug, +// `SerdeArrowTest` cannot test Struct type nested Array and Map and Struct, +// so manually construct data to test them. +// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed. +TEST(DataTypeStructTest, SerdeNestedTypeArrowTest) { + auto block = std::make_shared<Block>(); + { + std::string col_name = "struct_nesting_array_map_struct"; + DataTypePtr f1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr f2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); + DataTypePtr f3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr f4 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr f5 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>()); + DataTypePtr f6 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()); + DataTypePtr dt1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1)); + DataTypePtr dt2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f2, f3)); + DataTypePtr dt3 = std::make_shared<DataTypeNullable>( + std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f4, f5, f6})); + DataTypePtr st = std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {dt1, dt2, dt3}); + + // nested Array + Array a1, a2; + a1.push_back(Field("array")); + a1.push_back(Null()); + a2.push_back(Field("lucky array")); + a2.push_back(Field("cute array")); + + // nested Map + Array k1, k2, v1, v2; + k1.push_back(1); + k1.push_back(2); + k2.push_back(11); + k2.push_back(22); + v1.push_back(Field("map")); + v1.push_back(Null()); + v2.push_back(Field("clever map")); + v2.push_back(Field("hello map")); + + Map m1, m2; + m1.push_back(k1); + m1.push_back(v1); + m2.push_back(k2); + m2.push_back(v2); + + // nested Struct + Tuple t1, t2; + t1.push_back(Field("clever")); + t1.push_back(__int128_t(37)); + t1.push_back(true); + t2.push_back("null"); + t2.push_back(__int128_t(26)); + t2.push_back(false); + + // Struct + Tuple tt1, tt2; + tt1.push_back(a1); + tt1.push_back(m1); + tt1.push_back(t1); + tt2.push_back(a2); + tt2.push_back(m2); + tt2.push_back(t2); + + MutableColumnPtr struct_column = st->create_column(); + struct_column->reserve(2); + struct_column->insert(tt1); + struct_column->insert(tt2); + vectorized::ColumnWithTypeAndName type_and_name(struct_column->get_ptr(), st, col_name); + block->insert(type_and_name); + } + std::shared_ptr<arrow::RecordBatch> record_batch = + CommonDataTypeSerdeTest::serialize_arrow(block); + auto assert_block = std::make_shared<Block>(block->clone_empty()); + CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch); + CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp index eb960abdfc1..f4caa35069f 100644 --- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -31,11 +30,10 @@ #include <gen_cpp/types.pb.h> #include <gtest/gtest-message.h> #include <gtest/gtest-test-part.h> -#include <math.h> -#include <stdint.h> -#include <stdlib.h> -#include <time.h> +#include <gtest/gtest.h> +#include <cmath> +#include <cstdint> #include <iostream> #include <memory> #include <string> @@ -43,26 +41,21 @@ #include <utility> #include <vector> -#include "gtest/gtest_pred_impl.h" #include "olap/hll.h" #include "runtime/descriptors.cpp" -#include "runtime/descriptors.h" #include "util/arrow/block_convertor.h" #include "util/arrow/row_batch.h" -#include "util/bitmap_value.h" -#include "util/quantile_state.h" #include "util/string_parser.hpp" #include "vec/columns/column.h" -#include "vec/columns/column_array.h" #include "vec/columns/column_complex.h" #include "vec/columns/column_decimal.h" -#include "vec/columns/column_map.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" #include "vec/core/block.h" #include "vec/core/field.h" #include "vec/core/types.h" +#include "vec/data_types/common_data_type_serder_test.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_bitmap.h" @@ -79,62 +72,29 @@ #include "vec/data_types/data_type_string.h" #include "vec/data_types/data_type_struct.h" #include "vec/data_types/data_type_time_v2.h" -#include "vec/io/io_helper.h" #include "vec/runtime/vdatetime_value.h" #include "vec/utils/arrow_column_to_doris_column.h" namespace doris::vectorized { -template <bool is_scalar> -void serialize_and_deserialize_arrow_test() { - vectorized::Block block; - std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>> cols; - if constexpr (is_scalar) { - cols = { - {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false}, - {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true}, - {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING, false}, - {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3, TYPE_DECIMAL128I, false}, - {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11, TYPE_DATETIME, false}, - {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN, false}, - {"k5", FieldType::OLAP_FIELD_TYPE_DECIMAL32, 5, TYPE_DECIMAL32, false}, - {"k6", FieldType::OLAP_FIELD_TYPE_DECIMAL64, 6, TYPE_DECIMAL64, false}, - {"k12", FieldType::OLAP_FIELD_TYPE_DATETIMEV2, 12, TYPE_DATETIMEV2, false}, - {"k8", FieldType::OLAP_FIELD_TYPE_IPV4, 8, TYPE_IPV4, false}, - {"k9", FieldType::OLAP_FIELD_TYPE_IPV6, 9, TYPE_IPV6, false}, - }; - } else { - cols = {{"a", FieldType::OLAP_FIELD_TYPE_ARRAY, 6, TYPE_ARRAY, true}, - {"m", FieldType::OLAP_FIELD_TYPE_MAP, 8, TYPE_MAP, true}, - {"s", FieldType::OLAP_FIELD_TYPE_STRUCT, 5, TYPE_STRUCT, true}}; - } - - int row_num = 7; - // make desc and generate block - TupleDescriptor tuple_desc(PTupleDescriptor(), true); - for (auto t : cols) { - TSlotDescriptor tslot; - std::string col_name = std::get<0>(t); - tslot.__set_colName(col_name); - TypeDescriptor type_desc(std::get<3>(t)); - bool is_nullable(std::get<4>(t)); - switch (std::get<3>(t)) { - case TYPE_BOOLEAN: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto vec = vectorized::ColumnVector<UInt8>::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i % 2); - } - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeUInt8>()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); +void serialize_and_deserialize_arrow_test(std::vector<PrimitiveType> cols, int row_num, + bool is_nullable) { + auto block = std::make_shared<Block>(); + for (int i = 0; i < cols.size(); i++) { + std::string col_name = std::to_string(i); + TypeDescriptor type_desc(cols[i]); + switch (cols[i]) { + case TYPE_BOOLEAN: { + auto vec = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < row_num; ++i) { + data.push_back(i % 2); } - break; + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeUInt8>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, col_name); + block->insert(std::move(type_and_name)); + } break; case TYPE_INT: - tslot.__set_slotType(type_desc.to_thrift()); if (is_nullable) { { auto column_vector_int32 = vectorized::ColumnVector<Int32>::create(); @@ -152,7 +112,7 @@ void serialize_and_deserialize_arrow_test() { std::make_shared<vectorized::DataTypeInt32>()); vectorized::ColumnWithTypeAndName type_and_name( mutable_nullable_vector->get_ptr(), data_type, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } } else { auto vec = vectorized::ColumnVector<Int32>::create(); @@ -163,13 +123,12 @@ void serialize_and_deserialize_arrow_test() { vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, col_name); - block.insert(std::move(type_and_name)); + block->insert(std::move(type_and_name)); } break; case TYPE_DECIMAL32: type_desc.precision = 9; type_desc.scale = 2; - tslot.__set_slotType(type_desc.to_thrift()); { vectorized::DataTypePtr decimal_data_type = std::make_shared<DataTypeDecimal<Decimal32>>(type_desc.precision, @@ -197,13 +156,12 @@ void serialize_and_deserialize_arrow_test() { vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), decimal_data_type, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } break; case TYPE_DECIMAL64: type_desc.precision = 18; type_desc.scale = 6; - tslot.__set_slotType(type_desc.to_thrift()); { vectorized::DataTypePtr decimal_data_type = std::make_shared<DataTypeDecimal<Decimal64>>(type_desc.precision, @@ -229,13 +187,12 @@ void serialize_and_deserialize_arrow_test() { } vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), decimal_data_type, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } break; case TYPE_DECIMAL128I: type_desc.precision = 27; type_desc.scale = 9; - tslot.__set_slotType(type_desc.to_thrift()); { vectorized::DataTypePtr decimal_data_type( doris::vectorized::create_decimal(27, 9, true)); @@ -244,146 +201,125 @@ void serialize_and_deserialize_arrow_test() { decimal_column.get()) ->get_data(); for (int i = 0; i < row_num; ++i) { - __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8)); + auto value = __int128_t(i * pow(10, 9) + i * pow(10, 8)); data.push_back(value); } vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), decimal_data_type, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } break; - case TYPE_STRING: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto strcol = vectorized::ColumnString::create(); - for (int i = 0; i < row_num; ++i) { - std::string is = std::to_string(i); - strcol->insert_data(is.c_str(), is.size()); - } - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); - vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, - col_name); - block.insert(type_and_name); + case TYPE_STRING: { + auto strcol = vectorized::ColumnString::create(); + for (int i = 0; i < row_num; ++i) { + std::string is = std::to_string(i); + strcol->insert_data(is.c_str(), is.size()); } - break; - case TYPE_HLL: - tslot.__set_slotType(type_desc.to_thrift()); - { - vectorized::DataTypePtr hll_data_type(std::make_shared<vectorized::DataTypeHLL>()); - auto hll_column = hll_data_type->create_column(); - std::vector<HyperLogLog>& container = - ((vectorized::ColumnHLL*)hll_column.get())->get_data(); - for (int i = 0; i < row_num; ++i) { - HyperLogLog hll; - hll.update(i); - container.push_back(hll); - } - vectorized::ColumnWithTypeAndName type_and_name(hll_column->get_ptr(), - hll_data_type, col_name); - - block.insert(type_and_name); + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); + vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, col_name); + block->insert(type_and_name); + } break; + case TYPE_HLL: { + vectorized::DataTypePtr hll_data_type(std::make_shared<vectorized::DataTypeHLL>()); + auto hll_column = hll_data_type->create_column(); + std::vector<HyperLogLog>& container = + ((vectorized::ColumnHLL*)hll_column.get())->get_data(); + for (int i = 0; i < row_num; ++i) { + HyperLogLog hll; + hll.update(i); + container.push_back(hll); } - break; - case TYPE_DATEV2: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto column_vector_date_v2 = vectorized::ColumnVector<vectorized::UInt32>::create(); - auto& date_v2_data = column_vector_date_v2->get_data(); - for (int i = 0; i < row_num; ++i) { - DateV2Value<DateV2ValueType> value; - value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6)); - date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); - } - vectorized::DataTypePtr date_v2_type( - std::make_shared<vectorized::DataTypeDateV2>()); - vectorized::ColumnWithTypeAndName test_date_v2(column_vector_date_v2->get_ptr(), - date_v2_type, col_name); - block.insert(test_date_v2); + vectorized::ColumnWithTypeAndName type_and_name(hll_column->get_ptr(), hll_data_type, + col_name); + + block->insert(type_and_name); + } break; + case TYPE_DATEV2: { + auto column_vector_date_v2 = vectorized::ColumnVector<vectorized::UInt32>::create(); + auto& date_v2_data = column_vector_date_v2->get_data(); + for (int i = 0; i < row_num; ++i) { + DateV2Value<DateV2ValueType> value; + value.from_date_int64(20210501); + date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); } - break; + vectorized::DataTypePtr date_v2_type(std::make_shared<vectorized::DataTypeDateV2>()); + vectorized::ColumnWithTypeAndName test_date_v2(column_vector_date_v2->get_ptr(), + date_v2_type, col_name); + block->insert(test_date_v2); + } break; case TYPE_DATE: // int64 - tslot.__set_slotType(type_desc.to_thrift()); - { - auto column_vector_date = vectorized::ColumnVector<vectorized::Int64>::create(); - auto& date_data = column_vector_date->get_data(); - for (int i = 0; i < row_num; ++i) { - VecDateTimeValue value; - value.from_date_int64(20210501); - date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); - } - vectorized::DataTypePtr date_type(std::make_shared<vectorized::DataTypeDate>()); - vectorized::ColumnWithTypeAndName test_date(column_vector_date->get_ptr(), - date_type, col_name); - block.insert(test_date); + { + auto column_vector_date = vectorized::ColumnVector<vectorized::Int64>::create(); + auto& date_data = column_vector_date->get_data(); + for (int i = 0; i < row_num; ++i) { + VecDateTimeValue value; + value.from_date_int64(20210501); + date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); } - break; + vectorized::DataTypePtr date_type(std::make_shared<vectorized::DataTypeDate>()); + vectorized::ColumnWithTypeAndName test_date(column_vector_date->get_ptr(), date_type, + col_name); + block->insert(test_date); + } break; case TYPE_DATETIME: // int64 - tslot.__set_slotType(type_desc.to_thrift()); - { - auto column_vector_datetime = vectorized::ColumnVector<vectorized::Int64>::create(); - auto& datetime_data = column_vector_datetime->get_data(); - for (int i = 0; i < row_num; ++i) { - VecDateTimeValue value; - value.from_date_int64(20210501080910); - datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); - } - vectorized::DataTypePtr datetime_type( - std::make_shared<vectorized::DataTypeDateTime>()); - vectorized::ColumnWithTypeAndName test_datetime(column_vector_datetime->get_ptr(), - datetime_type, col_name); - block.insert(test_datetime); + { + auto column_vector_datetime = vectorized::ColumnVector<vectorized::Int64>::create(); + auto& datetime_data = column_vector_datetime->get_data(); + for (int i = 0; i < row_num; ++i) { + VecDateTimeValue value; + value.from_date_int64(20210501080910); + datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); } - break; + vectorized::DataTypePtr datetime_type(std::make_shared<vectorized::DataTypeDateTime>()); + vectorized::ColumnWithTypeAndName test_datetime(column_vector_datetime->get_ptr(), + datetime_type, col_name); + block->insert(test_datetime); + } break; case TYPE_DATETIMEV2: // uint64 - tslot.__set_slotType(type_desc.to_thrift()); - { - // 2022-01-01 11:11:11.111 - auto column_vector_datetimev2 = - vectorized::ColumnVector<vectorized::UInt64>::create(); - // auto& datetimev2_data = column_vector_datetimev2->get_data(); - DateV2Value<DateTimeV2ValueType> value; - string date_literal = "2022-01-01 11:11:11.111"; - value.from_date_str(date_literal.c_str(), date_literal.size()); - char to[64] = {}; - std::cout << "value: " << value.to_string(to) << std::endl; - for (int i = 0; i < row_num; ++i) { - column_vector_datetimev2->insert(value.to_date_int_val()); - } - vectorized::DataTypePtr datetimev2_type( - std::make_shared<vectorized::DataTypeDateTimeV2>()); - vectorized::ColumnWithTypeAndName test_datetimev2( - column_vector_datetimev2->get_ptr(), datetimev2_type, col_name); - block.insert(test_datetimev2); + { + auto column_vector_datetimev2 = vectorized::ColumnVector<vectorized::UInt64>::create(); + DateV2Value<DateTimeV2ValueType> value; + string date_literal = "2022-01-01 11:11:11.111"; + cctz::time_zone ctz; + TimezoneUtils::find_cctz_time_zone("UTC", ctz); + EXPECT_TRUE(value.from_date_str(date_literal.c_str(), date_literal.size(), ctz, 3)); + char to[64] = {}; + std::cout << "value: " << value.to_string(to) << std::endl; + for (int i = 0; i < row_num; ++i) { + column_vector_datetimev2->insert(value.to_date_int_val()); } - break; + vectorized::DataTypePtr datetimev2_type( + std::make_shared<vectorized::DataTypeDateTimeV2>(3)); + vectorized::ColumnWithTypeAndName test_datetimev2(column_vector_datetimev2->get_ptr(), + datetimev2_type, col_name); + block->insert(test_datetimev2); + } break; case TYPE_ARRAY: // array type_desc.add_sub_type(TYPE_STRING, true); - tslot.__set_slotType(type_desc.to_thrift()); { DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); DataTypePtr au = std::make_shared<DataTypeArray>(s); Array a1, a2; - a1.push_back(Field(String("sss"))); + a1.push_back(Field("sss")); a1.push_back(Null()); - a1.push_back(Field(String("clever amory"))); - a2.push_back(Field(String("hello amory"))); + a1.push_back(Field("clever amory")); + a2.push_back(Field("hello amory")); a2.push_back(Null()); - a2.push_back(Field(String("cute amory"))); - a2.push_back(Field(String("sf"))); + a2.push_back(Field("cute amory")); + a2.push_back(Field("sf")); MutableColumnPtr array_column = au->create_column(); array_column->reserve(2); array_column->insert(a1); array_column->insert(a2); vectorized::ColumnWithTypeAndName type_and_name(array_column->get_ptr(), au, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } break; case TYPE_MAP: type_desc.add_sub_type(TYPE_STRING, true); type_desc.add_sub_type(TYPE_STRING, true); - tslot.__set_slotType(type_desc.to_thrift()); { DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); @@ -416,14 +352,13 @@ void serialize_and_deserialize_arrow_test() { map_column->insert(m1); map_column->insert(m2); vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), m, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } break; case TYPE_STRUCT: type_desc.add_sub_type(TYPE_STRING, "name", true); type_desc.add_sub_type(TYPE_LARGEINT, "age", true); type_desc.add_sub_type(TYPE_BOOLEAN, "is", true); - tslot.__set_slotType(type_desc.to_thrift()); { DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); @@ -434,10 +369,10 @@ void serialize_and_deserialize_arrow_test() { DataTypePtr st = std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m}); Tuple t1, t2; - t1.push_back(Field(String("amory cute"))); + t1.push_back(Field("amory cute")); t1.push_back(__int128_t(37)); t1.push_back(true); - t2.push_back(Field("null")); + t2.push_back("null"); t2.push_back(__int128_t(26)); t2.push_back(false); MutableColumnPtr struct_column = st->create_column(); @@ -446,144 +381,61 @@ void serialize_and_deserialize_arrow_test() { struct_column->insert(t2); vectorized::ColumnWithTypeAndName type_and_name(struct_column->get_ptr(), st, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } break; - case TYPE_IPV4: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto vec = vectorized::ColumnIPv4::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i); - } - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeIPv4>()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); + case TYPE_IPV4: { + auto vec = vectorized::ColumnIPv4::create(); + auto& data = vec->get_data(); + for (int i = 0; i < row_num; ++i) { + data.push_back(i); } - break; - case TYPE_IPV6: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto vec = vectorized::ColumnIPv6::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i); - } - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeIPv6>()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeIPv4>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, col_name); + block->insert(std::move(type_and_name)); + } break; + case TYPE_IPV6: { + auto vec = vectorized::ColumnIPv6::create(); + auto& data = vec->get_data(); + for (int i = 0; i < row_num; ++i) { + data.push_back(i); } - break; + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeIPv6>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, col_name); + block->insert(std::move(type_and_name)); + } break; default: - break; + LOG(FATAL) << "error column type"; } - - tslot.__set_col_unique_id(std::get<2>(t)); - SlotDescriptor* slot = new SlotDescriptor(tslot); - tuple_desc.add_slot(slot); } - - RowDescriptor row_desc(&tuple_desc, true); - // arrow schema - std::shared_ptr<arrow::Schema> _arrow_schema; - EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); - - // serialize - std::shared_ptr<arrow::RecordBatch> result; - std::cout << "block data: " << block.dump_data(0, row_num) << std::endl; - std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << std::endl; - - cctz::time_zone timezone_obj; - TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, timezone_obj); - static_cast<void>(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, timezone_obj)); - Block new_block = block.clone_empty(); - EXPECT_TRUE(result != nullptr); - std::cout << "result: " << result->ToString() << std::endl; - // deserialize - for (auto t : cols) { - std::string real_column_name = std::get<0>(t); - auto* array = result->GetColumnByName(real_column_name).get(); - auto& column_with_type_and_name = new_block.get_by_name(real_column_name); - if (std::get<3>(t) == PrimitiveType::TYPE_DATE || - std::get<3>(t) == PrimitiveType::TYPE_DATETIME) { - { - auto strcol = vectorized::ColumnString::create(); - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); - vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, - real_column_name); - static_cast<void>(arrow_column_to_doris_column( - array, 0, type_and_name.column, type_and_name.type, block.rows(), "UTC")); - { - auto& col = column_with_type_and_name.column.get()->assume_mutable_ref(); - auto& date_data = static_cast<ColumnVector<Int64>&>(col).get_data(); - for (int i = 0; i < strcol->size(); ++i) { - StringRef str = strcol->get_data_at(i); - VecDateTimeValue value; - value.from_date_str(str.data, str.size); - date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); - } - } - } - continue; - } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) { - auto strcol = vectorized::ColumnString::create(); - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); - vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, - real_column_name); - static_cast<void>(arrow_column_to_doris_column( - array, 0, type_and_name.column, type_and_name.type, block.rows(), "UTC")); - { - auto& col = column_with_type_and_name.column.get()->assume_mutable_ref(); - auto& date_data = static_cast<ColumnVector<UInt32>&>(col).get_data(); - for (int i = 0; i < strcol->size(); ++i) { - StringRef str = strcol->get_data_at(i); - DateV2Value<DateV2ValueType> value; - value.from_date_str(str.data, str.size); - date_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); - } - } - continue; - } else if (std::get<3>(t) == PrimitiveType::TYPE_DATETIMEV2) { - // now we only support read doris datetimev2 to arrow - block.erase(real_column_name); - new_block.erase(real_column_name); - continue; - } - static_cast<void>(arrow_column_to_doris_column(array, 0, column_with_type_and_name.column, - column_with_type_and_name.type, block.rows(), - "UTC")); - } - - std::cout << block.dump_data() << std::endl; - std::cout << new_block.dump_data() << std::endl; - EXPECT_EQ(block.dump_data(), new_block.dump_data()); + std::shared_ptr<arrow::RecordBatch> record_batch = + CommonDataTypeSerdeTest::serialize_arrow(block); + auto assert_block = std::make_shared<Block>(block->clone_empty()); + CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch); + CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block); } TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) { - serialize_and_deserialize_arrow_test<true>(); + std::vector<PrimitiveType> cols = { + TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I, TYPE_BOOLEAN, + TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6, TYPE_DATETIME, + TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2, + }; + serialize_and_deserialize_arrow_test(cols, 7, true); + serialize_and_deserialize_arrow_test(cols, 7, false); } TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) { - serialize_and_deserialize_arrow_test<false>(); + std::vector<PrimitiveType> cols = {TYPE_ARRAY, TYPE_MAP, TYPE_STRUCT}; + serialize_and_deserialize_arrow_test(cols, 7, true); + serialize_and_deserialize_arrow_test(cols, 7, false); } TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) { - TupleDescriptor tuple_desc(PTupleDescriptor(), true); - TSlotDescriptor tslot; std::string col_name = "map_null_key"; - tslot.__set_colName(col_name); - TypeDescriptor type_desc(TYPE_MAP); - type_desc.add_sub_type(TYPE_STRING, true); - type_desc.add_sub_type(TYPE_INT, true); - tslot.__set_slotType(type_desc.to_thrift()); - vectorized::Block block; + auto block = std::make_shared<Block>(); { DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); - ; DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); DataTypePtr m = std::make_shared<DataTypeMap>(s, d); Array k1, k2, v1, v2, k3, v3; @@ -614,41 +466,14 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) { map_column->insert(m2); map_column->insert(m3); vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), m, col_name); - block.insert(type_and_name); + block->insert(type_and_name); } - tslot.__set_col_unique_id(1); - SlotDescriptor* slot = new SlotDescriptor(tslot); - tuple_desc.add_slot(slot); - RowDescriptor row_desc(&tuple_desc, true); - // arrow schema - std::shared_ptr<arrow::Schema> _arrow_schema; - EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); - - // serialize - std::shared_ptr<arrow::RecordBatch> result; - std::cout << "block structure: " << block.dump_structure() << std::endl; - std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << std::endl; - - cctz::time_zone timezone_obj; - TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, timezone_obj); - static_cast<void>(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, timezone_obj)); - Block new_block = block.clone_empty(); - EXPECT_TRUE(result != nullptr); - std::cout << "result: " << result->ToString() << std::endl; - // deserialize - auto* array = result->GetColumnByName(col_name).get(); - auto& column_with_type_and_name = new_block.get_by_name(col_name); - static_cast<void>(arrow_column_to_doris_column(array, 0, column_with_type_and_name.column, - column_with_type_and_name.type, block.rows(), - "UTC")); - std::cout << block.dump_data() << std::endl; - std::cout << new_block.dump_data() << std::endl; - // new block row_index 0, 2 which row has key null will be filter - EXPECT_EQ(new_block.dump_one_line(0, 1), "{\"doris\":null, \"clever amory\":30}"); - EXPECT_EQ(new_block.dump_one_line(2, 1), "{\"test\":11}"); - EXPECT_EQ(block.dump_data(1, 1), new_block.dump_data(1, 1)); + std::shared_ptr<arrow::RecordBatch> record_batch = + CommonDataTypeSerdeTest::serialize_arrow(block); + auto assert_block = std::make_shared<Block>(block->clone_empty()); + CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch); + CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block); } } // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 19b21c16a45..be8c3dfd201 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -294,8 +294,10 @@ static doris::TupleDescriptor* create_tuple_desc( t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 9).to_thrift()); } else { TypeDescriptor descriptor(column_descs[i].type); - if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) { + if (column_descs[i].precision >= 0) { descriptor.precision = column_descs[i].precision; + } + if (column_descs[i].scale >= 0) { descriptor.scale = column_descs[i].scale; } t_slot_desc.__set_slotType(descriptor.to_thrift()); diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp index 4c075cba848..76982d399fb 100644 --- a/be/test/vec/exprs/vexpr_test.cpp +++ b/be/test/vec/exprs/vexpr_test.cpp @@ -93,8 +93,10 @@ static doris::TupleDescriptor* create_tuple_desc( t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 9).to_thrift()); } else { TypeDescriptor descriptor(column_descs[i].type); - if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) { + if (column_descs[i].precision >= 0) { descriptor.precision = column_descs[i].precision; + } + if (column_descs[i].scale >= 0) { descriptor.scale = column_descs[i].scale; } t_slot_desc.__set_slotType(descriptor.to_thrift()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org