This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 84c6e213c3b branch-3.0: [fix](arrow) Fix Arrow serialization and
deserialization of Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types
(#49643)
84c6e213c3b is described below
commit 84c6e213c3bac218b4d307a6556c169b9a1c22ee
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue May 6 10:08:52 2025 +0800
branch-3.0: [fix](arrow) Fix Arrow serialization and deserialization of
Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types (#49643)
pick #48944 [fix](arrow) Fix UT DataTypeSerDeArrowTest of
Array/Map/Struct/Bitmap/HLL/Decimal256 types
pick #48398 [fix](arrow) Fix UT DataTypeSerDeArrowTest of Date type
---
be/src/runtime/types.cpp | 20 +-
be/src/runtime/types.h | 3 +-
be/src/util/arrow/block_convertor.cpp | 272 +-----------
be/src/util/arrow/row_batch.cpp | 2 -
be/src/vec/columns/column_array.cpp | 1 +
be/src/vec/columns/column_map.cpp | 1 +
be/src/vec/columns/column_string.h | 4 +
be/src/vec/columns/column_struct.cpp | 1 +
be/src/vec/columns/column_vector.h | 5 +-
be/src/vec/data_types/data_type_time_v2.h | 4 +-
.../data_types/serde/data_type_date64_serde.cpp | 33 +-
.../vec/data_types/serde/data_type_date64_serde.h | 7 +
.../serde/data_type_datetimev2_serde.cpp | 5 +-
.../data_types/serde/data_type_datev2_serde.cpp | 11 +-
.../data_types/serde/data_type_decimal_serde.cpp | 45 +-
.../vec/data_types/serde/data_type_ipv6_serde.cpp | 16 +-
.../data_types/serde/data_type_number_serde.cpp | 20 +-
.../vec/data_types/common_data_type_serder_test.h | 144 +++++--
be/test/vec/data_types/data_type_ip_test.cpp | 2 +-
be/test/vec/data_types/data_type_map_test.cpp | 178 ++++++++
be/test/vec/data_types/data_type_struct_test.cpp | 115 +++++
.../serde/data_type_serde_arrow_test.cpp | 479 +++++++++++++++++++++
be/test/vec/exprs/vexpr_test.cpp | 4 +-
23 files changed, 1013 insertions(+), 359 deletions(-)
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 7b7154fb38a..f0112ebd2bc 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -46,11 +46,14 @@ TypeDescriptor::TypeDescriptor(const
std::vector<TTypeNode>& types, int* idx)
DCHECK(scalar_type.__isset.len);
len = scalar_type.len;
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type
== TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK(scalar_type.__isset.precision);
DCHECK(scalar_type.__isset.scale);
precision = scalar_type.precision;
scale = scalar_type.scale;
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK(scalar_type.__isset.scale);
+ scale = scalar_type.scale;
} else if (type == TYPE_TIMEV2) {
if (scalar_type.__isset.scale) {
scale = scalar_type.scale;
@@ -157,11 +160,14 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type)
const {
// DCHECK_NE(len, -1);
scalar_type.__set_len(len);
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type
== TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK_NE(precision, -1);
DCHECK_NE(scale, -1);
scalar_type.__set_precision(precision);
scalar_type.__set_scale(scale);
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK_NE(scale, -1);
+ scalar_type.__set_scale(scale);
}
}
}
@@ -174,11 +180,14 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
if (type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_HLL || type
== TYPE_STRING) {
scalar_type->set_len(len);
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type ==
TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK_NE(precision, -1);
DCHECK_NE(scale, -1);
scalar_type->set_precision(precision);
scalar_type->set_scale(scale);
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK_NE(scale, -1);
+ scalar_type->set_scale(scale);
} else if (type == TYPE_ARRAY) {
node->set_type(TTypeNodeType::ARRAY);
node->set_contains_null(contains_nulls[0]);
@@ -224,11 +233,14 @@ TypeDescriptor::TypeDescriptor(const
google::protobuf::RepeatedPtrField<PTypeNod
DCHECK(scalar_type.has_len());
len = scalar_type.len();
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type
== TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK(scalar_type.has_precision());
DCHECK(scalar_type.has_scale());
precision = scalar_type.precision();
scale = scalar_type.scale();
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK(scalar_type.has_scale());
+ scale = scalar_type.scale();
} else if (type == TYPE_STRING) {
if (scalar_type.has_len()) {
len = scalar_type.len();
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 5d2f83c1bd6..f38a0f0ed07 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -48,6 +48,7 @@ struct TypeDescriptor {
/// Only set if type == TYPE_DECIMAL
int precision;
+ /// Only set if type == TYPE_DECIMAL or type = TYPE_DATETIMEV2
int scale;
std::vector<TypeDescriptor> children;
@@ -69,11 +70,11 @@ struct TypeDescriptor {
// explicit TypeDescriptor(PrimitiveType type) :
TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1),
scale(-1) {
+ // TODO, should not initialize default values, force initialization by
parameters or external.
if (type == TYPE_DECIMALV2) {
precision = 27;
scale = 9;
} else if (type == TYPE_DATETIMEV2) {
- precision = 18;
scale = 6;
}
}
diff --git a/be/src/util/arrow/block_convertor.cpp
b/be/src/util/arrow/block_convertor.cpp
index 817231e02ba..4db60144ea5 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -66,10 +66,7 @@ class Array;
namespace doris {
-// Convert Block to an Arrow::Array
-// We should keep this function to keep compatible with arrow's type visitor
-// Now we inherit TypeVisitor to use default Visit implementation
-class FromBlockConverter : public arrow::TypeVisitor {
+class FromBlockConverter {
public:
FromBlockConverter(const vectorized::Block& block, const
std::shared_ptr<arrow::Schema>& schema,
arrow::MemoryPool* pool, const cctz::time_zone&
timezone_obj)
@@ -79,276 +76,11 @@ public:
_cur_field_idx(-1),
_timezone_obj(timezone_obj) {}
- ~FromBlockConverter() override = default;
-
- // Use base class function
- using arrow::TypeVisitor::Visit;
-
-#define PRIMITIVE_VISIT(TYPE) \
- arrow::Status Visit(const arrow::TYPE& type) override { return
_visit(type); }
-
- PRIMITIVE_VISIT(Int8Type)
- PRIMITIVE_VISIT(Int16Type)
- PRIMITIVE_VISIT(Int32Type)
- PRIMITIVE_VISIT(Int64Type)
- PRIMITIVE_VISIT(FloatType)
- PRIMITIVE_VISIT(DoubleType)
-
-#undef PRIMITIVE_VISIT
-
- // process string-transformable field
- arrow::Status Visit(const arrow::StringType& type) override {
- auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = _cur_col->get_data_at(i);
- vectorized::TypeIndex type_idx =
vectorized::remove_nullable(_cur_type)->get_type_id();
- switch (type_idx) {
- case vectorized::TypeIndex::String:
- case vectorized::TypeIndex::FixedString:
- case vectorized::TypeIndex::HLL: {
- if (data_ref.size == 0) {
- // 0x01 is a magic num, not useful actually, just for
present ""
- //char* tmp_val = reinterpret_cast<char*>(0x01);
- ARROW_RETURN_NOT_OK(builder.Append(""));
- } else {
- ARROW_RETURN_NOT_OK(builder.Append(data_ref.data,
data_ref.size));
- }
- break;
- }
- case vectorized::TypeIndex::Date:
- case vectorized::TypeIndex::DateTime: {
- char buf[64];
- const VecDateTimeValue* time_val = (const
VecDateTimeValue*)(data_ref.data);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case vectorized::TypeIndex::DateV2: {
- char buf[64];
- const DateV2Value<DateV2ValueType>* time_val =
- (const DateV2Value<DateV2ValueType>*)(data_ref.data);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case vectorized::TypeIndex::DateTimeV2: {
- char buf[64];
- const DateV2Value<DateTimeV2ValueType>* time_val =
- (const
DateV2Value<DateTimeV2ValueType>*)(data_ref.data);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case vectorized::TypeIndex::Int128: {
- auto string_temp = LargeIntValue::to_string(
- reinterpret_cast<const
PackedInt128*>(data_ref.data)->value);
- ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(),
string_temp.size()));
- break;
- }
- case vectorized::TypeIndex::JSONB: {
- std::string string_temp =
- JsonbToJson::jsonb_to_json_string(data_ref.data,
data_ref.size);
- ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(),
string_temp.size()));
- break;
- }
- default: {
- LOG(WARNING) << "can't convert this type = " <<
vectorized::getTypeName(type_idx)
- << " to arrow type";
- return arrow::Status::TypeError("unsupported column type");
- }
- }
- }
- return arrow::Status::OK();
- }
-
- // process doris Decimal
- arrow::Status Visit(const arrow::Decimal128Type& type) override {
- auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- if (auto* decimalv2_column = vectorized::check_and_get_column<
- vectorized::ColumnDecimal<vectorized::Decimal128V2>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(27, 9);
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimalv2_column->get_data_at(i);
- const PackedInt128* p_value = reinterpret_cast<const
PackedInt128*>(data_ref.data);
- int64_t high = (p_value->value) >> 64;
- uint64 low = p_value->value;
- arrow::Decimal128 value(high, low);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else if (auto* decimal128_column = vectorized::check_and_get_column<
-
vectorized::ColumnDecimal<vectorized::Decimal128V3>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(38,
decimal128_column->get_scale());
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimal128_column->get_data_at(i);
- const PackedInt128* p_value = reinterpret_cast<const
PackedInt128*>(data_ref.data);
- int64_t high = (p_value->value) >> 64;
- uint64 low = p_value->value;
- arrow::Decimal128 value(high, low);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else if (auto* decimal32_column = vectorized::check_and_get_column<
- vectorized::ColumnDecimal<vectorized::Decimal32>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(8,
decimal32_column->get_scale());
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimal32_column->get_data_at(i);
- const int32_t* p_value = reinterpret_cast<const
int32_t*>(data_ref.data);
- int64_t high = *p_value > 0 ? 0 : 1UL << 63;
- arrow::Decimal128 value(high, *p_value > 0 ? *p_value :
-*p_value);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else if (auto* decimal64_column = vectorized::check_and_get_column<
- vectorized::ColumnDecimal<vectorized::Decimal64>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(18,
decimal64_column->get_scale());
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimal64_column->get_data_at(i);
- const int64_t* p_value = reinterpret_cast<const
int64_t*>(data_ref.data);
- int64_t high = *p_value > 0 ? 0 : 1UL << 63;
- arrow::Decimal128 value(high, *p_value > 0 ? *p_value :
-*p_value);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else {
- return arrow::Status::TypeError("Unsupported column:" +
_cur_col->get_name());
- }
- }
- // process boolean
- arrow::Status Visit(const arrow::BooleanType& type) override {
- auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = _cur_col->get_data_at(i);
- ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data));
- }
- return arrow::Status::OK();
- }
-
- // process array type
- arrow::Status Visit(const arrow::ListType& type) override {
- auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder);
- auto orignal_col = _cur_col;
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
-
- const vectorized::ColumnArray* array_column = nullptr;
- if (orignal_col->is_nullable()) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(orignal_col.get());
- array_column = assert_cast<const vectorized::ColumnArray*>(
- &nullable_column->get_nested_column());
- } else {
- array_column = assert_cast<const
vectorized::ColumnArray*>(orignal_col.get());
- }
- const auto& offsets = array_column->get_offsets();
- vectorized::ColumnPtr nested_column = array_column->get_data_ptr();
-
- // set current col/type/builder to nested
- _cur_col = nested_column;
- if (_cur_type->is_nullable()) {
- auto nullable_type = assert_cast<const
vectorized::DataTypeNullable*>(_cur_type.get());
- _cur_type = assert_cast<const vectorized::DataTypeArray*>(
- nullable_type->get_nested_type().get())
- ->get_nested_type();
- } else {
- _cur_type = assert_cast<const
vectorized::DataTypeArray*>(_cur_type.get())
- ->get_nested_type();
- }
- _cur_builder = builder.value_builder();
-
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = orignal_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- // append array elements in row i
- ARROW_RETURN_NOT_OK(builder.Append());
- _cur_start = offsets[i - 1];
- _cur_rows = offsets[i] - offsets[i - 1];
- ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(),
this));
- }
-
- return arrow::Status::OK();
- }
+ ~FromBlockConverter() = default;
Status convert(std::shared_ptr<arrow::RecordBatch>* out);
private:
- template <typename T>
- arrow::Status _visit(const T& type) {
- auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- if (_cur_col->is_nullable()) {
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = _cur_col->get_data_at(i);
- ARROW_RETURN_NOT_OK(builder.Append(*(const typename
T::c_type*)data_ref.data));
- }
- } else {
- ARROW_RETURN_NOT_OK(builder.AppendValues(
- (const typename
T::c_type*)_cur_col->get_data_at(start).data, num_rows));
- }
- return arrow::Status::OK();
- }
-
const vectorized::Block& _block;
const std::shared_ptr<arrow::Schema>& _schema;
arrow::MemoryPool* _pool;
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index a0cd77aee41..9bac79ff258 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -104,8 +104,6 @@ Status convert_to_arrow_type(const TypeDescriptor& type,
std::shared_ptr<arrow::
}
break;
case TYPE_DECIMALV2:
- *result = std::make_shared<arrow::Decimal128Type>(27, 9);
- break;
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128I:
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index e66e016381e..c6cf889b5a0 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -368,6 +368,7 @@ void ColumnArray::update_crcs_with_value(uint32_t*
__restrict hash, PrimitiveTyp
}
void ColumnArray::insert(const Field& x) {
+ DCHECK_EQ(x.get_type(), Field::Types::Array);
if (x.is_null()) {
get_data().insert(Null());
get_offsets().push_back(get_offsets().back() + 1);
diff --git a/be/src/vec/columns/column_map.cpp
b/be/src/vec/columns/column_map.cpp
index 85964ca967b..355866bd868 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -137,6 +137,7 @@ void ColumnMap::insert_data(const char*, size_t) {
}
void ColumnMap::insert(const Field& x) {
+ DCHECK_EQ(x.get_type(), Field::Types::Map);
const auto& map = doris::vectorized::get<const Map&>(x);
CHECK_EQ(map.size(), 2);
const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 37b9218ed7b..94c835f600b 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -151,6 +151,10 @@ public:
const auto& real_field = vectorized::get<const JsonbField&>(x);
s = StringRef(real_field.get_value(), real_field.get_size());
} else {
+ DCHECK_EQ(x.get_type(), Field::Types::String);
+ // If `x.get_type()` is not String, such as UInt64, may get the
error
+ // `string column length is too large:
total_length=13744632839234567870`
+ // because `<String>(x).size() = 13744632839234567870`
s.data = vectorized::get<const String&>(x).data();
s.size = vectorized::get<const String&>(x).size();
}
diff --git a/be/src/vec/columns/column_struct.cpp
b/be/src/vec/columns/column_struct.cpp
index ec0c5e6a687..40cbefa85ed 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -124,6 +124,7 @@ void ColumnStruct::get(size_t n, Field& res) const {
}
void ColumnStruct::insert(const Field& x) {
+ DCHECK_EQ(x.get_type(), Field::Types::Tuple);
const auto& tuple = x.get<const Tuple&>();
const size_t tuple_size = columns.size();
if (tuple.size() != tuple_size) {
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index df99e519f2a..f3cdb7d2dbd 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -353,8 +353,9 @@ public:
// For example, during create column_const(1, uint8), will use
NearestFieldType
// to cast a uint8 to int64, so that the Field is int64, but the column is
created
- // using data_type, so that T == uint8. After the field is created, it
will be inserted
- // into the column, but its type is different from column's data type, so
that during column
+ // using data_type, so that T == uint8, NearestFieldType<T> == uint64.
+ // After the field is created, it will be inserted into the column,
+ // but its type is different from column's data type (int64 vs uint64), so
that during column
// insert method, should use NearestFieldType<T> to get the Field and get
it actual
// uint8 value and then insert into column.
void insert(const Field& x) override {
diff --git a/be/src/vec/data_types/data_type_time_v2.h
b/be/src/vec/data_types/data_type_time_v2.h
index 7688a04a9a8..739328d564f 100644
--- a/be/src/vec/data_types/data_type_time_v2.h
+++ b/be/src/vec/data_types/data_type_time_v2.h
@@ -121,7 +121,9 @@ public:
DataTypeDateTimeV2(const DataTypeDateTimeV2& rhs) : _scale(rhs._scale) {}
TypeIndex get_type_id() const override { return TypeIndex::DateTimeV2; }
TypeDescriptor get_type_as_type_descriptor() const override {
- return TypeDescriptor(TYPE_DATETIMEV2);
+ auto desc = TypeDescriptor(TYPE_DATETIMEV2);
+ desc.scale = _scale;
+ return desc;
}
doris::FieldType get_storage_field_type() const override {
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index 0624717d1dd..1ce0ff54ac8 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -166,6 +166,12 @@ Status
DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl
return Status::OK();
}
+void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ _read_column_from_arrow<false>(column, arrow_array, start, end, ctz);
+}
+
void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
arrow::ArrayBuilder*
array_builder, int start,
int end, const
cctz::time_zone& ctz) const {
@@ -205,9 +211,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type
unit) {
}
}
-void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int start, int end,
- const cctz::time_zone& ctz)
const {
+template <bool is_date>
+void DataTypeDate64SerDe::_read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
int64_t divisor = 1;
int64_t multiplier = 1;
@@ -246,13 +253,15 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
}
} else if (arrow_array->type()->id() == arrow::Type::STRING) {
// to be compatible with old version, we use string type for date.
- auto concrete_array = dynamic_cast<const
arrow::StringArray*>(arrow_array);
- for (size_t value_i = start; value_i < end; ++value_i) {
- Int64 val = 0;
+ const auto* concrete_array = dynamic_cast<const
arrow::StringArray*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
auto val_str = concrete_array->GetString(value_i);
- ReadBuffer rb(val_str.data(), val_str.size());
- read_datetime_text_impl(val, rb, ctz);
- col_data.emplace_back(val);
+ VecDateTimeValue v;
+ v.from_date_str(val_str.c_str(), val_str.length(), ctz);
+ if constexpr (is_date) {
+ v.cast_to_date();
+ }
+ col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
}
} else {
throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
@@ -260,6 +269,12 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
}
}
+void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
+}
+
template <bool is_binary_format>
Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h
b/be/src/vec/data_types/serde/data_type_date64_serde.h
index 497ac2aeff4..5f5fc4f1c38 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -73,6 +73,11 @@ public:
int start, int end,
std::vector<StringRef>& buffer_list) const
override;
+protected:
+ template <bool is_date>
+ void _read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const;
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
@@ -94,6 +99,8 @@ public:
Status deserialize_column_from_json_vector(IColumn& column,
std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options)
const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index f073ac6decf..3b4142a2c01 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -170,7 +170,10 @@ void
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
// convert second
v.from_unixtime(utc_epoch / divisor, ctz);
// get rest time
- v.set_microsecond(utc_epoch % divisor);
+ // add 0 on the right to make it 6 digits. DateTimeV2Value
microsecond is 6 digits,
+ // the scale decides to keep the first few digits, so the valid
digits should be kept at the front.
+ // "2022-01-01 11:11:11.111", utc_epoch = 1641035471111, divisor =
1000, set_microsecond(111000)
+ v.set_microsecond((utc_epoch % divisor) * DIVISOR_FOR_MICRO /
divisor);
col_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v));
}
} else {
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index 53427e8c69c..ba575d43abf 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -111,15 +111,10 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
int start, int end,
const cctz::time_zone& ctz)
const {
auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
- auto concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
- int64_t divisor = 1;
- int64_t multiplier = 1;
-
- multiplier = 24 * 60 * 60; // day => secs
- for (size_t value_i = start; value_i < end; ++value_i) {
+ const auto* concrete_array = dynamic_cast<const
arrow::Date32Array*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
DateV2Value<DateV2ValueType> v;
- v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier,
- ctz);
+ v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>,
UInt32>(v));
}
}
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index d98f6cae2b0..92b69dbfca9 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -88,8 +88,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
arrow::ArrayBuilder*
array_builder, int start,
int end, const
cctz::time_zone& ctz) const {
auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
- auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(27, 9);
for (size_t i = start; i < end; ++i) {
@@ -108,6 +108,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
}
// TODO: decimal256
} else if constexpr (std::is_same_v<T, Decimal128V3>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
for (size_t i = start; i < end; ++i) {
@@ -125,6 +126,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
array_builder->type()->name());
}
} else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
for (size_t i = start; i < end; ++i) {
@@ -139,6 +141,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
array_builder->type()->name());
}
} else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
for (size_t i = start; i < end; ++i) {
@@ -152,6 +155,28 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
checkArrowStatus(builder.Append(value), column.get_name(),
array_builder->type()->name());
}
+ } else if constexpr (std::is_same_v<T, Decimal256>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal256Builder&>(*array_builder);
+ std::shared_ptr<arrow::DataType> s_decimal_ptr =
+ std::make_shared<arrow::Decimal256Type>(76, col.get_scale());
+ for (size_t i = start; i < end; ++i) {
+ if (null_map && (*null_map)[i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ auto p_value = wide::Int256(col.get_element(i));
+ using half_type = wide::Int256::base_type; // uint64_t
+ half_type a0 = p_value.items[wide::Int256::_impl::little(0)];
+ half_type a1 = p_value.items[wide::Int256::_impl::little(1)];
+ half_type a2 = p_value.items[wide::Int256::_impl::little(2)];
+ half_type a3 = p_value.items[wide::Int256::_impl::little(3)];
+
+ std::array<uint64_t, 4> word_array = {a0, a1, a2, a3};
+ arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray,
word_array);
+ checkArrowStatus(builder.Append(value), column.get_name(),
+ array_builder->type()->name());
+ }
} else {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_arrow with type " +
column.get_name());
@@ -162,14 +187,14 @@ template <typename T>
void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
const arrow::Array*
arrow_array, int start,
int end, const
cctz::time_zone& ctz) const {
- auto concrete_array = dynamic_cast<const
arrow::DecimalArray*>(arrow_array);
- const auto* arrow_decimal_type =
- static_cast<const arrow::DecimalType*>(arrow_array->type().get());
- const auto arrow_scale = arrow_decimal_type->scale();
auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
// Decimal<Int128> for decimalv2
// Decimal<Int128I> for deicmalv3
if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+ const auto* concrete_array = dynamic_cast<const
arrow::DecimalArray*>(arrow_array);
+ const auto* arrow_decimal_type =
+ static_cast<const
arrow::DecimalType*>(arrow_array->type().get());
+ const auto arrow_scale = arrow_decimal_type->scale();
// TODO check precision
for (size_t value_i = start; value_i < end; ++value_i) {
auto value = *reinterpret_cast<const vectorized::Decimal128V2*>(
@@ -195,7 +220,13 @@ void
DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
}
} else if constexpr (std::is_same_v<T, Decimal128V3> || std::is_same_v<T,
Decimal64> ||
std::is_same_v<T, Decimal32>) {
- for (size_t value_i = start; value_i < end; ++value_i) {
+ const auto* concrete_array = dynamic_cast<const
arrow::DecimalArray*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
+ column_data.emplace_back(*reinterpret_cast<const
T*>(concrete_array->Value(value_i)));
+ }
+ } else if constexpr (std::is_same_v<T, Decimal256>) {
+ const auto* concrete_array = dynamic_cast<const
arrow::Decimal256Array*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
column_data.emplace_back(*reinterpret_cast<const
T*>(concrete_array->Value(value_i)));
}
} else {
@@ -257,7 +288,7 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const
std::string& timezone,
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
auto& v = col_data[row_id];
- orc::Int128 value(v >> 64, (uint64_t)v);
+ orc::Int128 value(v >> 64, (uint64_t)v); // TODO, Decimal256
will lose precision
cur_batch->values[row_id] = value;
}
}
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 9c31c74dee5..83acbf9cda1 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -174,13 +174,17 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn&
column, const arrow::Arr
buffer->data() + concrete_array->value_offset(offset_i));
const auto raw_data_len = concrete_array->value_length(offset_i);
- IPv6 ipv6_val;
- if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) {
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "parse number fail, string: '{}'",
- std::string(raw_data,
raw_data_len).c_str());
+ if (raw_data_len == 0) {
+ col_data.emplace_back(0);
+ } else {
+ IPv6 ipv6_val;
+ if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len))
{
+ throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+ "parse number fail, string: '{}'",
+ std::string(raw_data,
raw_data_len).c_str());
+ }
+ col_data.emplace_back(ipv6_val);
}
- col_data.emplace_back(ipv6_val);
} else {
col_data.emplace_back(0);
}
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index e81343c9ede..20ce78bf9c2 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -215,14 +215,20 @@ void
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
const auto* raw_data = buffer->data() +
concrete_array->value_offset(offset_i);
const auto raw_data_len =
concrete_array->value_length(offset_i);
- Int128 val = 0;
- ReadBuffer rb(raw_data, raw_data_len);
- if (!read_int_text_impl(val, rb)) {
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "parse number fail, string: '{}'",
- std::string(rb.position(),
rb.count()).c_str());
+ if (raw_data_len == 0) {
+ col_data.emplace_back(Int128()); // Int128() is NULL
+ } else {
+ Int128 val = 0;
+ ReadBuffer rb(raw_data, raw_data_len);
+ if (!read_int_text_impl(val, rb)) {
+ throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+ "parse number fail, string:
'{}'",
+ std::string(rb.position(),
rb.count()).c_str());
+ }
+ col_data.emplace_back(val);
}
- col_data.emplace_back(val);
+ } else {
+ col_data.emplace_back(Int128()); // Int128() is NULL
}
}
return;
diff --git a/be/test/vec/data_types/common_data_type_serder_test.h
b/be/test/vec/data_types/common_data_type_serder_test.h
index 4a01436c8ef..ef8d07323df 100644
--- a/be/test/vec/data_types/common_data_type_serder_test.h
+++ b/be/test/vec/data_types/common_data_type_serder_test.h
@@ -23,25 +23,18 @@
#include <filesystem>
#include <fstream>
#include <iostream>
+#include <memory>
#include "arrow/array/array_base.h"
#include "arrow/type.h"
-#include "olap/schema.h"
-#include "runtime/descriptors.cpp"
#include "runtime/descriptors.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
-#include "vec/columns/column_map.h"
-#include "vec/columns/columns_number.h"
#include "vec/core/field.h"
-#include "vec/core/sort_block.h"
-#include "vec/core/sort_description.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
-#include "vec/data_types/data_type_array.h"
-#include "vec/data_types/data_type_map.h"
#include "vec/runtime/ipv6_value.h"
#include "vec/utils/arrow_column_to_doris_column.h"
@@ -55,9 +48,9 @@
// serialize_one_cell_to_json (const IColumn &column, int row_num,
BufferWritable &bw, FormatOptions &options) const =0
// serialize_column_to_json (const IColumn &column, int start_idx, int
end_idx, BufferWritable &bw, FormatOptions &options) const =0
// deserialize_one_cell_from_json (IColumn &column, Slice &slice, const
FormatOptions &options) const =0
-// deserialize_column_from_json_vector (IColumn &column, std::vector< Slice >
&slices, int *num_deserialized, const FormatOptions &options) const =0
-// deserialize_column_from_fixed_json (IColumn &column, Slice &slice, int
rows, int *num_deserialized, const FormatOptions &options) const
-// insert_column_last_value_multiple_times (IColumn &column, int times) const
+// deserialize_column_from_json_vector (IColumn &column, std::vector< Slice >
&slices, uint64_t *num_deserialized, const FormatOptions &options) const =0
+// deserialize_column_from_fixed_json (IColumn &column, Slice &slice,
uint64_t rows, uint64_t *num_deserialized, const FormatOptions &options) const
+// insert_column_last_value_multiple_times (IColumn &column, uint64_t times)
const
// 3. fe|be protobuffer ser-deserialize
// write_column_to_pb (const IColumn &column, PValues &result, int start, int
end) const =0
// read_column_from_pb (IColumn &column, const PValues &arg) const =0
@@ -154,16 +147,14 @@ public:
while (std::getline(file, line)) {
std::stringstream lineStream(line);
- // std::cout << "whole : " << lineStream.str() <<
std::endl;
std::string value;
int l_idx = 0;
int c_idx = 0;
- std::vector<string> row;
+ std::vector<std::string> row;
while (std::getline(lineStream, value, spliter)) {
- if (idxes.contains(l_idx)) {
+ if (!value.starts_with("//") && idxes.contains(l_idx)) {
// load csv data
Slice string_slice(value.data(), value.size());
- std::cout << "origin : " << string_slice << std::endl;
Status st;
// deserialize data
if constexpr (is_hive_format) {
@@ -213,7 +204,7 @@ public:
if (generate_res_file) {
// generate res
auto pos = file_path.find_last_of(".");
- string hive_format = is_hive_format ? "_hive" : "";
+ std::string hive_format = is_hive_format ? "_hive" : "";
std::string res_file = file_path.substr(0, pos) + hive_format +
"_serde_res.csv";
std::ofstream res_f(res_file);
if (!res_f.is_open()) {
@@ -221,8 +212,6 @@ public:
}
for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) {
for (size_t c = 0; c < assert_str_cols.size(); ++c) {
- std::cout <<
assert_str_cols[c]->get_data_at(r).to_string() << spliter
- << std::endl;
res_f << assert_str_cols[c]->get_data_at(r).to_string() <<
spliter;
}
res_f << std::endl;
@@ -233,6 +222,8 @@ public:
}
// standard hive text ser-deserialize assert function
+ // pb serde now is only used RPCFncall and fold_constant_executor which
just write column data to pb value means
+ // just call write_column_to_pb
static void assert_pb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs
serders) {
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
@@ -266,6 +257,21 @@ public:
static void assert_jsonb_format(MutableColumns& load_cols,
DataTypeSerDeSPtrs serders) {
Arena pool;
auto jsonb_column = ColumnString::create(); // jsonb column
+ // maybe these load_cols has different size, so we keep it same
+ size_t max_row_size = load_cols[0]->size();
+ for (size_t i = 1; i < load_cols.size(); ++i) {
+ if (load_cols[i]->size() > max_row_size) {
+ max_row_size = load_cols[i]->size();
+ }
+ }
+ // keep same rows
+ for (size_t i = 0; i < load_cols.size(); ++i) {
+ if (load_cols[i]->size() < max_row_size) {
+ load_cols[i]->insert_many_defaults(max_row_size -
load_cols[i]->size());
+ } else if (load_cols[i]->size() > max_row_size) {
+ load_cols[i]->resize(max_row_size);
+ }
+ }
jsonb_column->reserve(load_cols[0]->size());
MutableColumns assert_cols;
for (size_t i = 0; i < load_cols.size(); ++i) {
@@ -321,54 +327,114 @@ public:
}
// assert arrow serialize
- static void assert_arrow_format(MutableColumns& load_cols,
DataTypeSerDeSPtrs serders,
- DataTypes types) {
+ static void assert_arrow_format(MutableColumns& load_cols, DataTypes
types) {
// make a block to write to arrow
auto block = std::make_shared<Block>();
+ build_block(block, load_cols, types);
+ auto record_batch = serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ deserialize_arrow(assert_block, record_batch);
+ compare_two_blocks(block, assert_block);
+ }
+
+ static void build_block(const std::shared_ptr<Block>& block,
MutableColumns& load_cols,
+ DataTypes types) {
+ // maybe these load_cols has different size, so we keep it same
+ size_t max_row_size = load_cols[0]->size();
+ for (size_t i = 1; i < load_cols.size(); ++i) {
+ if (load_cols[i]->size() > max_row_size) {
+ max_row_size = load_cols[i]->size();
+ }
+ }
+ // keep same rows
+ for (auto& load_col : load_cols) {
+ if (load_col->size() < max_row_size) {
+ load_col->insert_many_defaults(max_row_size -
load_col->size());
+ } else if (load_col->size() > max_row_size) {
+ load_col->resize(max_row_size);
+ }
+ }
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
block->insert(ColumnWithTypeAndName(std::move(col), types[i],
types[i]->get_name()));
}
// print block
- std::cout << "block: " << block->dump_structure() << std::endl;
+ std::cout << "build block structure: " << block->dump_structure() <<
std::endl;
+ std::cout << "build block data: "
+ << block->dump_data(0, std::min(max_row_size,
static_cast<size_t>(5)))
+ << std::endl;
+ for (int i = 0; i < block->columns(); i++) {
+ auto col = block->get_by_position(i);
+ std::cout << "col: " << i << ", " << col.column->get_name() << ", "
+ << col.type->get_name() << ", " << col.to_string(0) <<
std::endl;
+ }
+ }
+
+ static std::shared_ptr<arrow::RecordBatch> serialize_arrow(
+ const std::shared_ptr<Block>& block) {
std::shared_ptr<arrow::Schema> block_arrow_schema;
EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema,
"UTC"), Status::OK());
+ std::cout << "schema: " << block_arrow_schema->ToString(true) <<
std::endl;
// convert block to arrow
std::shared_ptr<arrow::RecordBatch> result;
cctz::time_zone _timezone_obj; //default UTC
Status stt = convert_to_arrow_batch(*block, block_arrow_schema,
arrow::default_memory_pool(),
&result, _timezone_obj);
EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" <<
stt.to_string();
+ std::cout << "arrow serialize result: " << result->num_columns() << ",
"
+ << result->num_rows() << std::endl;
+ return result;
+ }
+ static void deserialize_arrow(const std::shared_ptr<Block>& new_block,
+ std::shared_ptr<arrow::RecordBatch>
record_batch) {
// deserialize arrow to block
- auto assert_block = block->clone_empty();
- auto rows = block->rows();
- for (size_t i = 0; i < load_cols.size(); ++i) {
- auto array = result->column(i);
- std::cout << array.get()->ToString() << std::endl;
- auto& column_with_type_and_name = assert_block.get_by_position(i);
+ auto rows = record_batch->num_rows();
+ for (size_t i = 0; i < record_batch->num_columns(); ++i) {
+ auto array = record_batch->column(i);
+ std::cout << "arrow record_batch pos: " << i << ", array: " <<
array->ToString()
+ << std::endl;
+ auto& column_with_type_and_name = new_block->get_by_position(i);
std::cout << "now we are testing column: "
- << column_with_type_and_name.column->get_name() <<
std::endl;
- auto ret = arrow_column_to_doris_column(
- array.get(), 0, column_with_type_and_name.column,
- column_with_type_and_name.type, rows, _timezone_obj);
- // do check data
- std::cout << "arrow_column_to_doris_column done: "
<< column_with_type_and_name.column->get_name()
- << " with column size: " <<
column_with_type_and_name.column->size()
- << std::endl;
+ << ", type: " <<
column_with_type_and_name.type->get_name() << std::endl;
+ auto ret =
+ arrow_column_to_doris_column(array.get(), 0,
column_with_type_and_name.column,
+
column_with_type_and_name.type, rows, "UTC");
+ // do check data
+ std::cout << "arrow_column_to_doris_column done, column data: "
+ << column_with_type_and_name.to_string(0)
+ << ", column size: " <<
column_with_type_and_name.column->size() << std::endl;
EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" <<
ret.to_string();
- auto& col = block->get_by_position(i).column;
- auto& assert_col = column_with_type_and_name.column;
- std::cout << "column: " << col->get_name() << " size: " <<
col->size()
- << " assert size: " << assert_col->size() << std::endl;
+ }
+ std::cout << "arrow deserialize block structure: " <<
new_block->dump_structure()
+ << std::endl;
+ std::cout << "arrow deserialize block data: "
+ << new_block->dump_data(
+ 0, std::min(static_cast<size_t>(rows),
static_cast<size_t>(5)))
+ << std::endl;
+ }
+
+ static void compare_two_blocks(const std::shared_ptr<Block>& frist_block,
+ const std::shared_ptr<Block>& second_block)
{
+ for (size_t i = 0; i < frist_block->columns(); ++i) {
+ EXPECT_EQ(frist_block->get_by_position(i).type,
second_block->get_by_position(i).type);
+ auto& col = frist_block->get_by_position(i).column;
+ auto& assert_col = second_block->get_by_position(i).column;
+ std::cout << "compare_two_blocks, column: " << col->get_name()
+ << ", type: " <<
frist_block->get_by_position(i).type->get_name()
+ << ", size: " << col->size() << ", assert size: " <<
assert_col->size()
+ << std::endl;
EXPECT_EQ(assert_col->size(), col->size());
for (size_t j = 0; j < assert_col->size(); ++j) {
+ EXPECT_EQ(frist_block->get_by_position(i).to_string(j),
+ second_block->get_by_position(i).to_string(j));
auto cell = col->operator[](j);
auto assert_cell = assert_col->operator[](j);
EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name()
<< " row: " << j;
}
}
+ EXPECT_EQ(frist_block->dump_data(), second_block->dump_data());
}
// assert rapidjson format
diff --git a/be/test/vec/data_types/data_type_ip_test.cpp
b/be/test/vec/data_types/data_type_ip_test.cpp
index 3bd91102d13..c4a63b77b69 100644
--- a/be/test/vec/data_types/data_type_ip_test.cpp
+++ b/be/test/vec/data_types/data_type_ip_test.cpp
@@ -264,7 +264,7 @@ TEST_F(DataTypeIPTest, SerdeMysqlAndArrowTest) {
CommonDataTypeSerdeTest::check_data(ip_cols, serde, ';', {1, 2},
data_files[0],
CommonDataTypeSerdeTest::assert_mysql_format);
- CommonDataTypeSerdeTest::assert_arrow_format(ip_cols, serde, {dt_ipv4,
dt_ipv6});
+ CommonDataTypeSerdeTest::assert_arrow_format(ip_cols, {dt_ipv4, dt_ipv6});
}
TEST_F(DataTypeIPTest, SerdeTOJsonInComplex) {
diff --git a/be/test/vec/data_types/data_type_map_test.cpp
b/be/test/vec/data_types/data_type_map_test.cpp
new file mode 100644
index 00000000000..548802b0d91
--- /dev/null
+++ b/be/test/vec/data_types/data_type_map_test.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_map.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "common/exception.h"
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeMapSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Map type nested Array and Struct and Map,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeMapTest, SerdeNestedTypeArrowTest) {
+ auto block = std::make_shared<Block>();
+ {
+ std::string col_name = "map_nesting_array";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr dt1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+ DataTypePtr dt2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f2));
+ DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+ Array a1, a2, a3, a4;
+ a1.push_back(Field("cute"));
+ a1.push_back(Null());
+ a2.push_back(Field("clever"));
+ a1.push_back(Field("hello"));
+ a3.push_back(1);
+ a3.push_back(2);
+ a4.push_back(11);
+ a4.push_back(22);
+
+ Array k1, v1;
+ k1.push_back(a1);
+ k1.push_back(a2);
+ v1.push_back(a3);
+ v1.push_back(a4);
+
+ Map m1;
+ m1.push_back(k1);
+ m1.push_back(v1);
+
+ MutableColumnPtr map_column = ma->create_column();
+ map_column->reserve(1);
+ map_column->insert(m1);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
ma, col_name);
+ block->insert(type_and_name);
+ }
+ {
+ std::string col_name = "map_nesting_struct";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr f3 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr f4 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr dt1 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f1,
f2, f3}));
+ DataTypePtr dt2 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeStruct>(std::vector<DataTypePtr>
{f4}));
+ DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+ Tuple t1, t2, t3, t4;
+ t1.push_back(Field("clever"));
+ t1.push_back(__int128_t(37));
+ t1.push_back(true);
+ t2.push_back("null");
+ t2.push_back(__int128_t(26));
+ t2.push_back(false);
+ t3.push_back(Field("cute"));
+ t4.push_back("null");
+
+ Array k1, v1;
+ k1.push_back(t1);
+ k1.push_back(t2);
+ v1.push_back(t3);
+ v1.push_back(t4);
+
+ Map m1;
+ m1.push_back(k1);
+ m1.push_back(v1);
+
+ MutableColumnPtr map_column = ma->create_column();
+ map_column->reserve(1);
+ map_column->insert(m1);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
ma, col_name);
+ block->insert(type_and_name);
+ }
+ {
+ std::string col_name = "map_nesting_map";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f3 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr f4 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr dt1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f1, f2));
+ DataTypePtr dt2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f3, f4));
+ DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+ Array k1, k2, k3, k4, v1, v2, v3, v4;
+ k1.push_back(1);
+ k1.push_back(2);
+ k2.push_back(11);
+ k2.push_back(22);
+ v1.push_back(Field("map"));
+ v1.push_back(Null());
+ v2.push_back(Field("clever map"));
+ v2.push_back(Field("hello map"));
+ k3.push_back(__int128_t(37));
+ k3.push_back(__int128_t(26));
+ k4.push_back(__int128_t(1111));
+ k4.push_back(__int128_t(432535423));
+ v3.push_back(true);
+ v3.push_back(false);
+ v4.push_back(false);
+ v4.push_back(true);
+
+ Map m11, m12, m21, m22;
+ m11.push_back(k1);
+ m11.push_back(v1);
+ m12.push_back(k2);
+ m12.push_back(v2);
+ m21.push_back(k3);
+ m21.push_back(v3);
+ m22.push_back(k4);
+ m22.push_back(v4);
+
+ Array kk1, vv1;
+ kk1.push_back(m11);
+ kk1.push_back(m12);
+ vv1.push_back(m21);
+ vv1.push_back(m22);
+
+ Map m1;
+ m1.push_back(kk1);
+ m1.push_back(vv1);
+
+ MutableColumnPtr map_column = ma->create_column();
+ map_column->reserve(1);
+ map_column->insert(m1);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
ma, col_name);
+ block->insert(type_and_name);
+ }
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/data_type_struct_test.cpp
b/be/test/vec/data_types/data_type_struct_test.cpp
new file mode 100644
index 00000000000..1fc8aac0312
--- /dev/null
+++ b/be/test/vec/data_types/data_type_struct_test.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_struct.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeStructSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Struct type nested Array and Map and Struct,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeStructTest, SerdeNestedTypeArrowTest) {
+ auto block = std::make_shared<Block>();
+ {
+ std::string col_name = "struct_nesting_array_map_struct";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr f3 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f4 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f5 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr f6 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr dt1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+ DataTypePtr dt2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f2, f3));
+ DataTypePtr dt3 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f4,
f5, f6}));
+ DataTypePtr st =
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {dt1, dt2, dt3});
+
+ // nested Array
+ Array a1, a2;
+ a1.push_back(Field("array"));
+ a1.push_back(Null());
+ a2.push_back(Field("lucky array"));
+ a2.push_back(Field("cute array"));
+
+ // nested Map
+ Array k1, k2, v1, v2;
+ k1.push_back(1);
+ k1.push_back(2);
+ k2.push_back(11);
+ k2.push_back(22);
+ v1.push_back(Field("map"));
+ v1.push_back(Null());
+ v2.push_back(Field("clever map"));
+ v2.push_back(Field("hello map"));
+
+ Map m1, m2;
+ m1.push_back(k1);
+ m1.push_back(v1);
+ m2.push_back(k2);
+ m2.push_back(v2);
+
+ // nested Struct
+ Tuple t1, t2;
+ t1.push_back(Field("clever"));
+ t1.push_back(__int128_t(37));
+ t1.push_back(true);
+ t2.push_back("null");
+ t2.push_back(__int128_t(26));
+ t2.push_back(false);
+
+ // Struct
+ Tuple tt1, tt2;
+ tt1.push_back(a1);
+ tt1.push_back(m1);
+ tt1.push_back(t1);
+ tt2.push_back(a2);
+ tt2.push_back(m2);
+ tt2.push_back(t2);
+
+ MutableColumnPtr struct_column = st->create_column();
+ struct_column->reserve(2);
+ struct_column->insert(tt1);
+ struct_column->insert(tt2);
+ vectorized::ColumnWithTypeAndName
type_and_name(struct_column->get_ptr(), st, col_name);
+ block->insert(type_and_name);
+ }
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
new file mode 100644
index 00000000000..f4caa35069f
--- /dev/null
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -0,0 +1,479 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/array/builder_base.h>
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/record_batch.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+#include <arrow/visit_type_inline.h>
+#include <arrow/visitor.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/types.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <cmath>
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "olap/hll.h"
+#include "runtime/descriptors.cpp"
+#include "util/arrow/block_convertor.h"
+#include "util/arrow/row_batch.h"
+#include "util/string_parser.hpp"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/block.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
+#include "vec/data_types/data_type_ipv4.h"
+#include "vec/data_types/data_type_ipv6.h"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_time_v2.h"
+#include "vec/runtime/vdatetime_value.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+void serialize_and_deserialize_arrow_test(std::vector<PrimitiveType> cols, int
row_num,
+ bool is_nullable) {
+ auto block = std::make_shared<Block>();
+ for (int i = 0; i < cols.size(); i++) {
+ std::string col_name = std::to_string(i);
+ TypeDescriptor type_desc(cols[i]);
+ switch (cols[i]) {
+ case TYPE_BOOLEAN: {
+ auto vec = vectorized::ColumnVector<UInt8>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i % 2);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeUInt8>());
+ vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, col_name);
+ block->insert(std::move(type_and_name));
+ } break;
+ case TYPE_INT:
+ if (is_nullable) {
+ {
+ auto column_vector_int32 =
vectorized::ColumnVector<Int32>::create();
+ auto column_nullable_vector =
+
vectorized::make_nullable(std::move(column_vector_int32));
+ auto mutable_nullable_vector =
std::move(*column_nullable_vector).mutate();
+ for (int i = 0; i < row_num; i++) {
+ if (i % 2 == 0) {
+ mutable_nullable_vector->insert_default();
+ } else {
+ mutable_nullable_vector->insert(int32(i));
+ }
+ }
+ auto data_type = vectorized::make_nullable(
+ std::make_shared<vectorized::DataTypeInt32>());
+ vectorized::ColumnWithTypeAndName type_and_name(
+ mutable_nullable_vector->get_ptr(), data_type,
col_name);
+ block->insert(type_and_name);
+ }
+ } else {
+ auto vec = vectorized::ColumnVector<Int32>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeInt32>());
+ vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
+ col_name);
+ block->insert(std::move(type_and_name));
+ }
+ break;
+ case TYPE_DECIMAL32:
+ type_desc.precision = 9;
+ type_desc.scale = 2;
+ {
+ vectorized::DataTypePtr decimal_data_type =
+
std::make_shared<DataTypeDecimal<Decimal32>>(type_desc.precision,
+
type_desc.scale);
+ auto decimal_column = decimal_data_type->create_column();
+ auto& data =
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int32>>*)
+ decimal_column.get())
+ ->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ if (i == 0) {
+ data.push_back(Int32(0));
+ continue;
+ }
+ Int32 val;
+ StringParser::ParseResult result =
StringParser::PARSE_SUCCESS;
+ i % 2 == 0 ? val =
StringParser::string_to_decimal<TYPE_DECIMAL32>(
+ "1234567.56", 11,
type_desc.precision, type_desc.scale,
+ &result)
+ : val =
StringParser::string_to_decimal<TYPE_DECIMAL32>(
+ "-1234567.56", 12,
type_desc.precision, type_desc.scale,
+ &result);
+ EXPECT_TRUE(result == StringParser::PARSE_SUCCESS);
+ data.push_back(val);
+ }
+
+ vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
+
decimal_data_type, col_name);
+ block->insert(type_and_name);
+ }
+ break;
+ case TYPE_DECIMAL64:
+ type_desc.precision = 18;
+ type_desc.scale = 6;
+ {
+ vectorized::DataTypePtr decimal_data_type =
+
std::make_shared<DataTypeDecimal<Decimal64>>(type_desc.precision,
+
type_desc.scale);
+ auto decimal_column = decimal_data_type->create_column();
+ auto& data =
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int64>>*)
+ decimal_column.get())
+ ->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ if (i == 0) {
+ data.push_back(Int64(0));
+ continue;
+ }
+ Int64 val;
+ StringParser::ParseResult result =
StringParser::PARSE_SUCCESS;
+ std::string decimal_string =
+ i % 2 == 0 ? "-123456789012.123456" :
"123456789012.123456";
+ val = StringParser::string_to_decimal<TYPE_DECIMAL64>(
+ decimal_string.c_str(), decimal_string.size(),
type_desc.precision,
+ type_desc.scale, &result);
+ EXPECT_TRUE(result == StringParser::PARSE_SUCCESS);
+ data.push_back(val);
+ }
+ vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
+
decimal_data_type, col_name);
+ block->insert(type_and_name);
+ }
+ break;
+ case TYPE_DECIMAL128I:
+ type_desc.precision = 27;
+ type_desc.scale = 9;
+ {
+ vectorized::DataTypePtr decimal_data_type(
+ doris::vectorized::create_decimal(27, 9, true));
+ auto decimal_column = decimal_data_type->create_column();
+ auto& data =
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
+ decimal_column.get())
+ ->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ auto value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
+ data.push_back(value);
+ }
+ vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
+
decimal_data_type, col_name);
+ block->insert(type_and_name);
+ }
+ break;
+ case TYPE_STRING: {
+ auto strcol = vectorized::ColumnString::create();
+ for (int i = 0; i < row_num; ++i) {
+ std::string is = std::to_string(i);
+ strcol->insert_data(is.c_str(), is.size());
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(),
data_type, col_name);
+ block->insert(type_and_name);
+ } break;
+ case TYPE_HLL: {
+ vectorized::DataTypePtr
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+ auto hll_column = hll_data_type->create_column();
+ std::vector<HyperLogLog>& container =
+ ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ HyperLogLog hll;
+ hll.update(i);
+ container.push_back(hll);
+ }
+ vectorized::ColumnWithTypeAndName
type_and_name(hll_column->get_ptr(), hll_data_type,
+ col_name);
+
+ block->insert(type_and_name);
+ } break;
+ case TYPE_DATEV2: {
+ auto column_vector_date_v2 =
vectorized::ColumnVector<vectorized::UInt32>::create();
+ auto& date_v2_data = column_vector_date_v2->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ DateV2Value<DateV2ValueType> value;
+ value.from_date_int64(20210501);
+
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ }
+ vectorized::DataTypePtr
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
+ vectorized::ColumnWithTypeAndName
test_date_v2(column_vector_date_v2->get_ptr(),
+ date_v2_type,
col_name);
+ block->insert(test_date_v2);
+ } break;
+ case TYPE_DATE: // int64
+ {
+ auto column_vector_date =
vectorized::ColumnVector<vectorized::Int64>::create();
+ auto& date_data = column_vector_date->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ VecDateTimeValue value;
+ value.from_date_int64(20210501);
+
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+ }
+ vectorized::DataTypePtr
date_type(std::make_shared<vectorized::DataTypeDate>());
+ vectorized::ColumnWithTypeAndName
test_date(column_vector_date->get_ptr(), date_type,
+ col_name);
+ block->insert(test_date);
+ } break;
+ case TYPE_DATETIME: // int64
+ {
+ auto column_vector_datetime =
vectorized::ColumnVector<vectorized::Int64>::create();
+ auto& datetime_data = column_vector_datetime->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ VecDateTimeValue value;
+ value.from_date_int64(20210501080910);
+
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+ }
+ vectorized::DataTypePtr
datetime_type(std::make_shared<vectorized::DataTypeDateTime>());
+ vectorized::ColumnWithTypeAndName
test_datetime(column_vector_datetime->get_ptr(),
+ datetime_type,
col_name);
+ block->insert(test_datetime);
+ } break;
+ case TYPE_DATETIMEV2: // uint64
+ {
+ auto column_vector_datetimev2 =
vectorized::ColumnVector<vectorized::UInt64>::create();
+ DateV2Value<DateTimeV2ValueType> value;
+ string date_literal = "2022-01-01 11:11:11.111";
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone("UTC", ctz);
+ EXPECT_TRUE(value.from_date_str(date_literal.c_str(),
date_literal.size(), ctz, 3));
+ char to[64] = {};
+ std::cout << "value: " << value.to_string(to) << std::endl;
+ for (int i = 0; i < row_num; ++i) {
+ column_vector_datetimev2->insert(value.to_date_int_val());
+ }
+ vectorized::DataTypePtr datetimev2_type(
+ std::make_shared<vectorized::DataTypeDateTimeV2>(3));
+ vectorized::ColumnWithTypeAndName
test_datetimev2(column_vector_datetimev2->get_ptr(),
+ datetimev2_type,
col_name);
+ block->insert(test_datetimev2);
+ } break;
+ case TYPE_ARRAY: // array
+ type_desc.add_sub_type(TYPE_STRING, true);
+ {
+ DataTypePtr s =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr au = std::make_shared<DataTypeArray>(s);
+ Array a1, a2;
+ a1.push_back(Field("sss"));
+ a1.push_back(Null());
+ a1.push_back(Field("clever amory"));
+ a2.push_back(Field("hello amory"));
+ a2.push_back(Null());
+ a2.push_back(Field("cute amory"));
+ a2.push_back(Field("sf"));
+ MutableColumnPtr array_column = au->create_column();
+ array_column->reserve(2);
+ array_column->insert(a1);
+ array_column->insert(a2);
+ vectorized::ColumnWithTypeAndName
type_and_name(array_column->get_ptr(), au,
+ col_name);
+ block->insert(type_and_name);
+ }
+ break;
+ case TYPE_MAP:
+ type_desc.add_sub_type(TYPE_STRING, true);
+ type_desc.add_sub_type(TYPE_STRING, true);
+ {
+ DataTypePtr s =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ ;
+ DataTypePtr d =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
+ Array k1, k2, v1, v2;
+ k1.push_back("null");
+ k1.push_back("doris");
+ k1.push_back("clever amory");
+ v1.push_back("ss");
+ v1.push_back(Null());
+ v1.push_back("NULL");
+ k2.push_back("hello amory");
+ k2.push_back("NULL");
+ k2.push_back("cute amory");
+ k2.push_back("doris");
+ v2.push_back("s");
+ v2.push_back("0");
+ v2.push_back("sf");
+ v2.push_back(Null());
+ Map m1, m2;
+ m1.push_back(k1);
+ m1.push_back(v1);
+ m2.push_back(k2);
+ m2.push_back(v2);
+ MutableColumnPtr map_column = m->create_column();
+ map_column->reserve(2);
+ map_column->insert(m1);
+ map_column->insert(m2);
+ vectorized::ColumnWithTypeAndName
type_and_name(map_column->get_ptr(), m, col_name);
+ block->insert(type_and_name);
+ }
+ break;
+ case TYPE_STRUCT:
+ type_desc.add_sub_type(TYPE_STRING, "name", true);
+ type_desc.add_sub_type(TYPE_LARGEINT, "age", true);
+ type_desc.add_sub_type(TYPE_BOOLEAN, "is", true);
+ {
+ DataTypePtr s =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr d =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr m =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr st =
+
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m});
+ Tuple t1, t2;
+ t1.push_back(Field("amory cute"));
+ t1.push_back(__int128_t(37));
+ t1.push_back(true);
+ t2.push_back("null");
+ t2.push_back(__int128_t(26));
+ t2.push_back(false);
+ MutableColumnPtr struct_column = st->create_column();
+ struct_column->reserve(2);
+ struct_column->insert(t1);
+ struct_column->insert(t2);
+ vectorized::ColumnWithTypeAndName
type_and_name(struct_column->get_ptr(), st,
+ col_name);
+ block->insert(type_and_name);
+ }
+ break;
+ case TYPE_IPV4: {
+ auto vec = vectorized::ColumnIPv4::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeIPv4>());
+ vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, col_name);
+ block->insert(std::move(type_and_name));
+ } break;
+ case TYPE_IPV6: {
+ auto vec = vectorized::ColumnIPv6::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeIPv6>());
+ vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, col_name);
+ block->insert(std::move(type_and_name));
+ } break;
+ default:
+ LOG(FATAL) << "error column type";
+ }
+ }
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
+ std::vector<PrimitiveType> cols = {
+ TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I,
TYPE_BOOLEAN,
+ TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6,
TYPE_DATETIME,
+ TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2,
+ };
+ serialize_and_deserialize_arrow_test(cols, 7, true);
+ serialize_and_deserialize_arrow_test(cols, 7, false);
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) {
+ std::vector<PrimitiveType> cols = {TYPE_ARRAY, TYPE_MAP, TYPE_STRUCT};
+ serialize_and_deserialize_arrow_test(cols, 7, true);
+ serialize_and_deserialize_arrow_test(cols, 7, false);
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
+ std::string col_name = "map_null_key";
+ auto block = std::make_shared<Block>();
+ {
+ DataTypePtr s =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr d =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
+ Array k1, k2, v1, v2, k3, v3;
+ k1.push_back("doris");
+ k1.push_back("clever amory");
+ v1.push_back(Null());
+ v1.push_back(30);
+ k2.push_back("hello amory");
+ k2.push_back("NULL");
+ k2.push_back("cute amory");
+ k2.push_back("doris");
+ v2.push_back(26);
+ v2.push_back(Null());
+ v2.push_back(6);
+ v2.push_back(7);
+ k3.push_back("test");
+ v3.push_back(11);
+ Map m1, m2, m3;
+ m1.push_back(k1);
+ m1.push_back(v1);
+ m2.push_back(k2);
+ m2.push_back(v2);
+ m3.push_back(k3);
+ m3.push_back(v3);
+ MutableColumnPtr map_column = m->create_column();
+ map_column->reserve(3);
+ map_column->insert(m1);
+ map_column->insert(m2);
+ map_column->insert(m3);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
m, col_name);
+ block->insert(type_and_name);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp
index 1bd9e478b89..7a64d6e7fa2 100644
--- a/be/test/vec/exprs/vexpr_test.cpp
+++ b/be/test/vec/exprs/vexpr_test.cpp
@@ -94,8 +94,10 @@ static doris::TupleDescriptor* create_tuple_desc(
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27,
9).to_thrift());
} else {
TypeDescriptor descriptor(column_descs[i].type);
- if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) {
+ if (column_descs[i].precision >= 0) {
descriptor.precision = column_descs[i].precision;
+ }
+ if (column_descs[i].scale >= 0) {
descriptor.scale = column_descs[i].scale;
}
t_slot_desc.__set_slotType(descriptor.to_thrift());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]