This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 84c6e213c3b branch-3.0: [fix](arrow) Fix Arrow serialization and 
deserialization of Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types 
(#49643)
84c6e213c3b is described below

commit 84c6e213c3bac218b4d307a6556c169b9a1c22ee
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue May 6 10:08:52 2025 +0800

    branch-3.0: [fix](arrow) Fix Arrow serialization and deserialization of 
Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types (#49643)
    
    pick #48944 [fix](arrow) Fix UT DataTypeSerDeArrowTest of
    Array/Map/Struct/Bitmap/HLL/Decimal256 types
    pick #48398  [fix](arrow) Fix UT DataTypeSerDeArrowTest of Date type
---
 be/src/runtime/types.cpp                           |  20 +-
 be/src/runtime/types.h                             |   3 +-
 be/src/util/arrow/block_convertor.cpp              | 272 +-----------
 be/src/util/arrow/row_batch.cpp                    |   2 -
 be/src/vec/columns/column_array.cpp                |   1 +
 be/src/vec/columns/column_map.cpp                  |   1 +
 be/src/vec/columns/column_string.h                 |   4 +
 be/src/vec/columns/column_struct.cpp               |   1 +
 be/src/vec/columns/column_vector.h                 |   5 +-
 be/src/vec/data_types/data_type_time_v2.h          |   4 +-
 .../data_types/serde/data_type_date64_serde.cpp    |  33 +-
 .../vec/data_types/serde/data_type_date64_serde.h  |   7 +
 .../serde/data_type_datetimev2_serde.cpp           |   5 +-
 .../data_types/serde/data_type_datev2_serde.cpp    |  11 +-
 .../data_types/serde/data_type_decimal_serde.cpp   |  45 +-
 .../vec/data_types/serde/data_type_ipv6_serde.cpp  |  16 +-
 .../data_types/serde/data_type_number_serde.cpp    |  20 +-
 .../vec/data_types/common_data_type_serder_test.h  | 144 +++++--
 be/test/vec/data_types/data_type_ip_test.cpp       |   2 +-
 be/test/vec/data_types/data_type_map_test.cpp      | 178 ++++++++
 be/test/vec/data_types/data_type_struct_test.cpp   | 115 +++++
 .../serde/data_type_serde_arrow_test.cpp           | 479 +++++++++++++++++++++
 be/test/vec/exprs/vexpr_test.cpp                   |   4 +-
 23 files changed, 1013 insertions(+), 359 deletions(-)

diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 7b7154fb38a..f0112ebd2bc 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -46,11 +46,14 @@ TypeDescriptor::TypeDescriptor(const 
std::vector<TTypeNode>& types, int* idx)
             DCHECK(scalar_type.__isset.len);
             len = scalar_type.len;
         } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type 
== TYPE_DATETIMEV2) {
+                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
             DCHECK(scalar_type.__isset.precision);
             DCHECK(scalar_type.__isset.scale);
             precision = scalar_type.precision;
             scale = scalar_type.scale;
+        } else if (type == TYPE_DATETIMEV2) {
+            DCHECK(scalar_type.__isset.scale);
+            scale = scalar_type.scale;
         } else if (type == TYPE_TIMEV2) {
             if (scalar_type.__isset.scale) {
                 scale = scalar_type.scale;
@@ -157,11 +160,14 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) 
const {
             // DCHECK_NE(len, -1);
             scalar_type.__set_len(len);
         } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type 
== TYPE_DATETIMEV2) {
+                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
             DCHECK_NE(precision, -1);
             DCHECK_NE(scale, -1);
             scalar_type.__set_precision(precision);
             scalar_type.__set_scale(scale);
+        } else if (type == TYPE_DATETIMEV2) {
+            DCHECK_NE(scale, -1);
+            scalar_type.__set_scale(scale);
         }
     }
 }
@@ -174,11 +180,14 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
     if (type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_HLL || type 
== TYPE_STRING) {
         scalar_type->set_len(len);
     } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-               type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type == 
TYPE_DATETIMEV2) {
+               type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
         DCHECK_NE(precision, -1);
         DCHECK_NE(scale, -1);
         scalar_type->set_precision(precision);
         scalar_type->set_scale(scale);
+    } else if (type == TYPE_DATETIMEV2) {
+        DCHECK_NE(scale, -1);
+        scalar_type->set_scale(scale);
     } else if (type == TYPE_ARRAY) {
         node->set_type(TTypeNodeType::ARRAY);
         node->set_contains_null(contains_nulls[0]);
@@ -224,11 +233,14 @@ TypeDescriptor::TypeDescriptor(const 
google::protobuf::RepeatedPtrField<PTypeNod
             DCHECK(scalar_type.has_len());
             len = scalar_type.len();
         } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type 
== TYPE_DATETIMEV2) {
+                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
             DCHECK(scalar_type.has_precision());
             DCHECK(scalar_type.has_scale());
             precision = scalar_type.precision();
             scale = scalar_type.scale();
+        } else if (type == TYPE_DATETIMEV2) {
+            DCHECK(scalar_type.has_scale());
+            scale = scalar_type.scale();
         } else if (type == TYPE_STRING) {
             if (scalar_type.has_len()) {
                 len = scalar_type.len();
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 5d2f83c1bd6..f38a0f0ed07 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -48,6 +48,7 @@ struct TypeDescriptor {
 
     /// Only set if type == TYPE_DECIMAL
     int precision;
+    /// Only set if type == TYPE_DECIMAL or type = TYPE_DATETIMEV2
     int scale;
 
     std::vector<TypeDescriptor> children;
@@ -69,11 +70,11 @@ struct TypeDescriptor {
 
     // explicit TypeDescriptor(PrimitiveType type) :
     TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1), 
scale(-1) {
+        // TODO, should not initialize default values, force initialization by 
parameters or external.
         if (type == TYPE_DECIMALV2) {
             precision = 27;
             scale = 9;
         } else if (type == TYPE_DATETIMEV2) {
-            precision = 18;
             scale = 6;
         }
     }
diff --git a/be/src/util/arrow/block_convertor.cpp 
b/be/src/util/arrow/block_convertor.cpp
index 817231e02ba..4db60144ea5 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -66,10 +66,7 @@ class Array;
 
 namespace doris {
 
-// Convert Block to an Arrow::Array
-// We should keep this function to keep compatible with arrow's type visitor
-// Now we inherit TypeVisitor to use default Visit implementation
-class FromBlockConverter : public arrow::TypeVisitor {
+class FromBlockConverter {
 public:
     FromBlockConverter(const vectorized::Block& block, const 
std::shared_ptr<arrow::Schema>& schema,
                        arrow::MemoryPool* pool, const cctz::time_zone& 
timezone_obj)
@@ -79,276 +76,11 @@ public:
               _cur_field_idx(-1),
               _timezone_obj(timezone_obj) {}
 
-    ~FromBlockConverter() override = default;
-
-    // Use base class function
-    using arrow::TypeVisitor::Visit;
-
-#define PRIMITIVE_VISIT(TYPE) \
-    arrow::Status Visit(const arrow::TYPE& type) override { return 
_visit(type); }
-
-    PRIMITIVE_VISIT(Int8Type)
-    PRIMITIVE_VISIT(Int16Type)
-    PRIMITIVE_VISIT(Int32Type)
-    PRIMITIVE_VISIT(Int64Type)
-    PRIMITIVE_VISIT(FloatType)
-    PRIMITIVE_VISIT(DoubleType)
-
-#undef PRIMITIVE_VISIT
-
-    // process string-transformable field
-    arrow::Status Visit(const arrow::StringType& type) override {
-        auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = start; i < start + num_rows; ++i) {
-            bool is_null = _cur_col->is_null_at(i);
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            const auto& data_ref = _cur_col->get_data_at(i);
-            vectorized::TypeIndex type_idx = 
vectorized::remove_nullable(_cur_type)->get_type_id();
-            switch (type_idx) {
-            case vectorized::TypeIndex::String:
-            case vectorized::TypeIndex::FixedString:
-            case vectorized::TypeIndex::HLL: {
-                if (data_ref.size == 0) {
-                    // 0x01 is a magic num, not useful actually, just for 
present ""
-                    //char* tmp_val = reinterpret_cast<char*>(0x01);
-                    ARROW_RETURN_NOT_OK(builder.Append(""));
-                } else {
-                    ARROW_RETURN_NOT_OK(builder.Append(data_ref.data, 
data_ref.size));
-                }
-                break;
-            }
-            case vectorized::TypeIndex::Date:
-            case vectorized::TypeIndex::DateTime: {
-                char buf[64];
-                const VecDateTimeValue* time_val = (const 
VecDateTimeValue*)(data_ref.data);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case vectorized::TypeIndex::DateV2: {
-                char buf[64];
-                const DateV2Value<DateV2ValueType>* time_val =
-                        (const DateV2Value<DateV2ValueType>*)(data_ref.data);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case vectorized::TypeIndex::DateTimeV2: {
-                char buf[64];
-                const DateV2Value<DateTimeV2ValueType>* time_val =
-                        (const 
DateV2Value<DateTimeV2ValueType>*)(data_ref.data);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case vectorized::TypeIndex::Int128: {
-                auto string_temp = LargeIntValue::to_string(
-                        reinterpret_cast<const 
PackedInt128*>(data_ref.data)->value);
-                ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), 
string_temp.size()));
-                break;
-            }
-            case vectorized::TypeIndex::JSONB: {
-                std::string string_temp =
-                        JsonbToJson::jsonb_to_json_string(data_ref.data, 
data_ref.size);
-                ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), 
string_temp.size()));
-                break;
-            }
-            default: {
-                LOG(WARNING) << "can't convert this type = " << 
vectorized::getTypeName(type_idx)
-                             << " to arrow type";
-                return arrow::Status::TypeError("unsupported column type");
-            }
-            }
-        }
-        return arrow::Status::OK();
-    }
-
-    // process doris Decimal
-    arrow::Status Visit(const arrow::Decimal128Type& type) override {
-        auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        if (auto* decimalv2_column = vectorized::check_and_get_column<
-                    vectorized::ColumnDecimal<vectorized::Decimal128V2>>(
-                    *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(27, 9);
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimalv2_column->get_data_at(i);
-                const PackedInt128* p_value = reinterpret_cast<const 
PackedInt128*>(data_ref.data);
-                int64_t high = (p_value->value) >> 64;
-                uint64 low = p_value->value;
-                arrow::Decimal128 value(high, low);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else if (auto* decimal128_column = vectorized::check_and_get_column<
-                           
vectorized::ColumnDecimal<vectorized::Decimal128V3>>(
-                           *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(38, 
decimal128_column->get_scale());
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimal128_column->get_data_at(i);
-                const PackedInt128* p_value = reinterpret_cast<const 
PackedInt128*>(data_ref.data);
-                int64_t high = (p_value->value) >> 64;
-                uint64 low = p_value->value;
-                arrow::Decimal128 value(high, low);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else if (auto* decimal32_column = vectorized::check_and_get_column<
-                           vectorized::ColumnDecimal<vectorized::Decimal32>>(
-                           *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(8, 
decimal32_column->get_scale());
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimal32_column->get_data_at(i);
-                const int32_t* p_value = reinterpret_cast<const 
int32_t*>(data_ref.data);
-                int64_t high = *p_value > 0 ? 0 : 1UL << 63;
-                arrow::Decimal128 value(high, *p_value > 0 ? *p_value : 
-*p_value);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else if (auto* decimal64_column = vectorized::check_and_get_column<
-                           vectorized::ColumnDecimal<vectorized::Decimal64>>(
-                           *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(18, 
decimal64_column->get_scale());
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimal64_column->get_data_at(i);
-                const int64_t* p_value = reinterpret_cast<const 
int64_t*>(data_ref.data);
-                int64_t high = *p_value > 0 ? 0 : 1UL << 63;
-                arrow::Decimal128 value(high, *p_value > 0 ? *p_value : 
-*p_value);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else {
-            return arrow::Status::TypeError("Unsupported column:" + 
_cur_col->get_name());
-        }
-    }
-    // process boolean
-    arrow::Status Visit(const arrow::BooleanType& type) override {
-        auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = start; i < start + num_rows; ++i) {
-            bool is_null = _cur_col->is_null_at(i);
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            const auto& data_ref = _cur_col->get_data_at(i);
-            ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data));
-        }
-        return arrow::Status::OK();
-    }
-
-    // process array type
-    arrow::Status Visit(const arrow::ListType& type) override {
-        auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder);
-        auto orignal_col = _cur_col;
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-
-        const vectorized::ColumnArray* array_column = nullptr;
-        if (orignal_col->is_nullable()) {
-            auto nullable_column =
-                    assert_cast<const 
vectorized::ColumnNullable*>(orignal_col.get());
-            array_column = assert_cast<const vectorized::ColumnArray*>(
-                    &nullable_column->get_nested_column());
-        } else {
-            array_column = assert_cast<const 
vectorized::ColumnArray*>(orignal_col.get());
-        }
-        const auto& offsets = array_column->get_offsets();
-        vectorized::ColumnPtr nested_column = array_column->get_data_ptr();
-
-        // set current col/type/builder to nested
-        _cur_col = nested_column;
-        if (_cur_type->is_nullable()) {
-            auto nullable_type = assert_cast<const 
vectorized::DataTypeNullable*>(_cur_type.get());
-            _cur_type = assert_cast<const vectorized::DataTypeArray*>(
-                                nullable_type->get_nested_type().get())
-                                ->get_nested_type();
-        } else {
-            _cur_type = assert_cast<const 
vectorized::DataTypeArray*>(_cur_type.get())
-                                ->get_nested_type();
-        }
-        _cur_builder = builder.value_builder();
-
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = start; i < start + num_rows; ++i) {
-            bool is_null = orignal_col->is_null_at(i);
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            // append array elements in row i
-            ARROW_RETURN_NOT_OK(builder.Append());
-            _cur_start = offsets[i - 1];
-            _cur_rows = offsets[i] - offsets[i - 1];
-            ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(), 
this));
-        }
-
-        return arrow::Status::OK();
-    }
+    ~FromBlockConverter() = default;
 
     Status convert(std::shared_ptr<arrow::RecordBatch>* out);
 
 private:
-    template <typename T>
-    arrow::Status _visit(const T& type) {
-        auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        if (_cur_col->is_nullable()) {
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = _cur_col->get_data_at(i);
-                ARROW_RETURN_NOT_OK(builder.Append(*(const typename 
T::c_type*)data_ref.data));
-            }
-        } else {
-            ARROW_RETURN_NOT_OK(builder.AppendValues(
-                    (const typename 
T::c_type*)_cur_col->get_data_at(start).data, num_rows));
-        }
-        return arrow::Status::OK();
-    }
-
     const vectorized::Block& _block;
     const std::shared_ptr<arrow::Schema>& _schema;
     arrow::MemoryPool* _pool;
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index a0cd77aee41..9bac79ff258 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -104,8 +104,6 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         }
         break;
     case TYPE_DECIMALV2:
-        *result = std::make_shared<arrow::Decimal128Type>(27, 9);
-        break;
     case TYPE_DECIMAL32:
     case TYPE_DECIMAL64:
     case TYPE_DECIMAL128I:
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index e66e016381e..c6cf889b5a0 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -368,6 +368,7 @@ void ColumnArray::update_crcs_with_value(uint32_t* 
__restrict hash, PrimitiveTyp
 }
 
 void ColumnArray::insert(const Field& x) {
+    DCHECK_EQ(x.get_type(), Field::Types::Array);
     if (x.is_null()) {
         get_data().insert(Null());
         get_offsets().push_back(get_offsets().back() + 1);
diff --git a/be/src/vec/columns/column_map.cpp 
b/be/src/vec/columns/column_map.cpp
index 85964ca967b..355866bd868 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -137,6 +137,7 @@ void ColumnMap::insert_data(const char*, size_t) {
 }
 
 void ColumnMap::insert(const Field& x) {
+    DCHECK_EQ(x.get_type(), Field::Types::Map);
     const auto& map = doris::vectorized::get<const Map&>(x);
     CHECK_EQ(map.size(), 2);
     const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 37b9218ed7b..94c835f600b 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -151,6 +151,10 @@ public:
             const auto& real_field = vectorized::get<const JsonbField&>(x);
             s = StringRef(real_field.get_value(), real_field.get_size());
         } else {
+            DCHECK_EQ(x.get_type(), Field::Types::String);
+            // If `x.get_type()` is not String, such as UInt64, may get the 
error
+            // `string column length is too large: 
total_length=13744632839234567870`
+            // because `<String>(x).size() = 13744632839234567870`
             s.data = vectorized::get<const String&>(x).data();
             s.size = vectorized::get<const String&>(x).size();
         }
diff --git a/be/src/vec/columns/column_struct.cpp 
b/be/src/vec/columns/column_struct.cpp
index ec0c5e6a687..40cbefa85ed 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -124,6 +124,7 @@ void ColumnStruct::get(size_t n, Field& res) const {
 }
 
 void ColumnStruct::insert(const Field& x) {
+    DCHECK_EQ(x.get_type(), Field::Types::Tuple);
     const auto& tuple = x.get<const Tuple&>();
     const size_t tuple_size = columns.size();
     if (tuple.size() != tuple_size) {
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index df99e519f2a..f3cdb7d2dbd 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -353,8 +353,9 @@ public:
 
     // For example, during create column_const(1, uint8), will use 
NearestFieldType
     // to cast a uint8 to int64, so that the Field is int64, but the column is 
created
-    // using data_type, so that T == uint8. After the field is created, it 
will be inserted
-    // into the column, but its type is different from column's data type, so 
that during column
+    // using data_type, so that T == uint8, NearestFieldType<T> == uint64.
+    // After the field is created, it will be inserted into the column,
+    // but its type is different from column's data type (int64 vs uint64), so 
that during column
     // insert method, should use NearestFieldType<T> to get the Field and get 
it actual
     // uint8 value and then insert into column.
     void insert(const Field& x) override {
diff --git a/be/src/vec/data_types/data_type_time_v2.h 
b/be/src/vec/data_types/data_type_time_v2.h
index 7688a04a9a8..739328d564f 100644
--- a/be/src/vec/data_types/data_type_time_v2.h
+++ b/be/src/vec/data_types/data_type_time_v2.h
@@ -121,7 +121,9 @@ public:
     DataTypeDateTimeV2(const DataTypeDateTimeV2& rhs) : _scale(rhs._scale) {}
     TypeIndex get_type_id() const override { return TypeIndex::DateTimeV2; }
     TypeDescriptor get_type_as_type_descriptor() const override {
-        return TypeDescriptor(TYPE_DATETIMEV2);
+        auto desc = TypeDescriptor(TYPE_DATETIMEV2);
+        desc.scale = _scale;
+        return desc;
     }
 
     doris::FieldType get_storage_field_type() const override {
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp 
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index 0624717d1dd..1ce0ff54ac8 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -166,6 +166,12 @@ Status 
DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl
     return Status::OK();
 }
 
+void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                   int start, int end,
+                                                   const cctz::time_zone& ctz) 
const {
+    _read_column_from_arrow<false>(column, arrow_array, start, end, ctz);
+}
+
 void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
                                                 arrow::ArrayBuilder* 
array_builder, int start,
                                                 int end, const 
cctz::time_zone& ctz) const {
@@ -205,9 +211,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type 
unit) {
     }
 }
 
-void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                 int start, int end,
-                                                 const cctz::time_zone& ctz) 
const {
+template <bool is_date>
+void DataTypeDate64SerDe::_read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                  int start, int end,
+                                                  const cctz::time_zone& ctz) 
const {
     auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
     int64_t divisor = 1;
     int64_t multiplier = 1;
@@ -246,13 +253,15 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
         }
     } else if (arrow_array->type()->id() == arrow::Type::STRING) {
         // to be compatible with old version, we use string type for date.
-        auto concrete_array = dynamic_cast<const 
arrow::StringArray*>(arrow_array);
-        for (size_t value_i = start; value_i < end; ++value_i) {
-            Int64 val = 0;
+        const auto* concrete_array = dynamic_cast<const 
arrow::StringArray*>(arrow_array);
+        for (auto value_i = start; value_i < end; ++value_i) {
             auto val_str = concrete_array->GetString(value_i);
-            ReadBuffer rb(val_str.data(), val_str.size());
-            read_datetime_text_impl(val, rb, ctz);
-            col_data.emplace_back(val);
+            VecDateTimeValue v;
+            v.from_date_str(val_str.c_str(), val_str.length(), ctz);
+            if constexpr (is_date) {
+                v.cast_to_date();
+            }
+            col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
         }
     } else {
         throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
@@ -260,6 +269,12 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
     }
 }
 
+void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int start, int end,
+                                                 const cctz::time_zone& ctz) 
const {
+    _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
+}
+
 template <bool is_binary_format>
 Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,
                                                    
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h 
b/be/src/vec/data_types/serde/data_type_date64_serde.h
index 497ac2aeff4..5f5fc4f1c38 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -73,6 +73,11 @@ public:
                                int start, int end,
                                std::vector<StringRef>& buffer_list) const 
override;
 
+protected:
+    template <bool is_date>
+    void _read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                 int end, const cctz::time_zone& ctz) const;
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
@@ -94,6 +99,8 @@ public:
     Status deserialize_column_from_json_vector(IColumn& column, 
std::vector<Slice>& slices,
                                                int* num_deserialized,
                                                const FormatOptions& options) 
const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index f073ac6decf..3b4142a2c01 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -170,7 +170,10 @@ void 
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
             // convert second
             v.from_unixtime(utc_epoch / divisor, ctz);
             // get rest time
-            v.set_microsecond(utc_epoch % divisor);
+            // add 0 on the right to make it 6 digits. DateTimeV2Value 
microsecond is 6 digits,
+            // the scale decides to keep the first few digits, so the valid 
digits should be kept at the front.
+            // "2022-01-01 11:11:11.111", utc_epoch = 1641035471111, divisor = 
1000, set_microsecond(111000)
+            v.set_microsecond((utc_epoch % divisor) * DIVISOR_FOR_MICRO / 
divisor);
             
col_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v));
         }
     } else {
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index 53427e8c69c..ba575d43abf 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -111,15 +111,10 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
                                                  int start, int end,
                                                  const cctz::time_zone& ctz) 
const {
     auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
-    auto concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
-    int64_t divisor = 1;
-    int64_t multiplier = 1;
-
-    multiplier = 24 * 60 * 60; // day => secs
-    for (size_t value_i = start; value_i < end; ++value_i) {
+    const auto* concrete_array = dynamic_cast<const 
arrow::Date32Array*>(arrow_array);
+    for (auto value_i = start; value_i < end; ++value_i) {
         DateV2Value<DateV2ValueType> v;
-        v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier,
-                        ctz);
+        v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
         col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, 
UInt32>(v));
     }
 }
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp 
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index d98f6cae2b0..92b69dbfca9 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -88,8 +88,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                                                     arrow::ArrayBuilder* 
array_builder, int start,
                                                     int end, const 
cctz::time_zone& ctz) const {
     auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
-    auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
     if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(27, 9);
         for (size_t i = start; i < end; ++i) {
@@ -108,6 +108,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
         }
         // TODO: decimal256
     } else if constexpr (std::is_same_v<T, Decimal128V3>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
         for (size_t i = start; i < end; ++i) {
@@ -125,6 +126,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                              array_builder->type()->name());
         }
     } else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
         for (size_t i = start; i < end; ++i) {
@@ -139,6 +141,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                              array_builder->type()->name());
         }
     } else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
         for (size_t i = start; i < end; ++i) {
@@ -152,6 +155,28 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
             checkArrowStatus(builder.Append(value), column.get_name(),
                              array_builder->type()->name());
         }
+    } else if constexpr (std::is_same_v<T, Decimal256>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal256Builder&>(*array_builder);
+        std::shared_ptr<arrow::DataType> s_decimal_ptr =
+                std::make_shared<arrow::Decimal256Type>(76, col.get_scale());
+        for (size_t i = start; i < end; ++i) {
+            if (null_map && (*null_map)[i]) {
+                checkArrowStatus(builder.AppendNull(), column.get_name(),
+                                 array_builder->type()->name());
+                continue;
+            }
+            auto p_value = wide::Int256(col.get_element(i));
+            using half_type = wide::Int256::base_type; // uint64_t
+            half_type a0 = p_value.items[wide::Int256::_impl::little(0)];
+            half_type a1 = p_value.items[wide::Int256::_impl::little(1)];
+            half_type a2 = p_value.items[wide::Int256::_impl::little(2)];
+            half_type a3 = p_value.items[wide::Int256::_impl::little(3)];
+
+            std::array<uint64_t, 4> word_array = {a0, a1, a2, a3};
+            arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray, 
word_array);
+            checkArrowStatus(builder.Append(value), column.get_name(),
+                             array_builder->type()->name());
+        }
     } else {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "write_column_to_arrow with type " + 
column.get_name());
@@ -162,14 +187,14 @@ template <typename T>
 void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
                                                      const arrow::Array* 
arrow_array, int start,
                                                      int end, const 
cctz::time_zone& ctz) const {
-    auto concrete_array = dynamic_cast<const 
arrow::DecimalArray*>(arrow_array);
-    const auto* arrow_decimal_type =
-            static_cast<const arrow::DecimalType*>(arrow_array->type().get());
-    const auto arrow_scale = arrow_decimal_type->scale();
     auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
     // Decimal<Int128> for decimalv2
     // Decimal<Int128I> for deicmalv3
     if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+        const auto* concrete_array = dynamic_cast<const 
arrow::DecimalArray*>(arrow_array);
+        const auto* arrow_decimal_type =
+                static_cast<const 
arrow::DecimalType*>(arrow_array->type().get());
+        const auto arrow_scale = arrow_decimal_type->scale();
         // TODO check precision
         for (size_t value_i = start; value_i < end; ++value_i) {
             auto value = *reinterpret_cast<const vectorized::Decimal128V2*>(
@@ -195,7 +220,13 @@ void 
DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
         }
     } else if constexpr (std::is_same_v<T, Decimal128V3> || std::is_same_v<T, 
Decimal64> ||
                          std::is_same_v<T, Decimal32>) {
-        for (size_t value_i = start; value_i < end; ++value_i) {
+        const auto* concrete_array = dynamic_cast<const 
arrow::DecimalArray*>(arrow_array);
+        for (auto value_i = start; value_i < end; ++value_i) {
+            column_data.emplace_back(*reinterpret_cast<const 
T*>(concrete_array->Value(value_i)));
+        }
+    } else if constexpr (std::is_same_v<T, Decimal256>) {
+        const auto* concrete_array = dynamic_cast<const 
arrow::Decimal256Array*>(arrow_array);
+        for (auto value_i = start; value_i < end; ++value_i) {
             column_data.emplace_back(*reinterpret_cast<const 
T*>(concrete_array->Value(value_i)));
         }
     } else {
@@ -257,7 +288,7 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const 
std::string& timezone,
         for (size_t row_id = start; row_id < end; row_id++) {
             if (cur_batch->notNull[row_id] == 1) {
                 auto& v = col_data[row_id];
-                orc::Int128 value(v >> 64, (uint64_t)v);
+                orc::Int128 value(v >> 64, (uint64_t)v); // TODO, Decimal256 
will lose precision
                 cur_batch->values[row_id] = value;
             }
         }
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp 
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 9c31c74dee5..83acbf9cda1 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -174,13 +174,17 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& 
column, const arrow::Arr
                     buffer->data() + concrete_array->value_offset(offset_i));
             const auto raw_data_len = concrete_array->value_length(offset_i);
 
-            IPv6 ipv6_val;
-            if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) {
-                throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                       "parse number fail, string: '{}'",
-                                       std::string(raw_data, 
raw_data_len).c_str());
+            if (raw_data_len == 0) {
+                col_data.emplace_back(0);
+            } else {
+                IPv6 ipv6_val;
+                if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) 
{
+                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                           "parse number fail, string: '{}'",
+                                           std::string(raw_data, 
raw_data_len).c_str());
+                }
+                col_data.emplace_back(ipv6_val);
             }
-            col_data.emplace_back(ipv6_val);
         } else {
             col_data.emplace_back(0);
         }
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index e81343c9ede..20ce78bf9c2 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -215,14 +215,20 @@ void 
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
                 const auto* raw_data = buffer->data() + 
concrete_array->value_offset(offset_i);
                 const auto raw_data_len = 
concrete_array->value_length(offset_i);
 
-                Int128 val = 0;
-                ReadBuffer rb(raw_data, raw_data_len);
-                if (!read_int_text_impl(val, rb)) {
-                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                           "parse number fail, string: '{}'",
-                                           std::string(rb.position(), 
rb.count()).c_str());
+                if (raw_data_len == 0) {
+                    col_data.emplace_back(Int128()); // Int128() is NULL
+                } else {
+                    Int128 val = 0;
+                    ReadBuffer rb(raw_data, raw_data_len);
+                    if (!read_int_text_impl(val, rb)) {
+                        throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                               "parse number fail, string: 
'{}'",
+                                               std::string(rb.position(), 
rb.count()).c_str());
+                    }
+                    col_data.emplace_back(val);
                 }
-                col_data.emplace_back(val);
+            } else {
+                col_data.emplace_back(Int128()); // Int128() is NULL
             }
         }
         return;
diff --git a/be/test/vec/data_types/common_data_type_serder_test.h 
b/be/test/vec/data_types/common_data_type_serder_test.h
index 4a01436c8ef..ef8d07323df 100644
--- a/be/test/vec/data_types/common_data_type_serder_test.h
+++ b/be/test/vec/data_types/common_data_type_serder_test.h
@@ -23,25 +23,18 @@
 #include <filesystem>
 #include <fstream>
 #include <iostream>
+#include <memory>
 
 #include "arrow/array/array_base.h"
 #include "arrow/type.h"
-#include "olap/schema.h"
-#include "runtime/descriptors.cpp"
 #include "runtime/descriptors.h"
 #include "util/arrow/block_convertor.h"
 #include "util/arrow/row_batch.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_array.h"
-#include "vec/columns/column_map.h"
-#include "vec/columns/columns_number.h"
 #include "vec/core/field.h"
-#include "vec/core/sort_block.h"
-#include "vec/core/sort_description.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
-#include "vec/data_types/data_type_array.h"
-#include "vec/data_types/data_type_map.h"
 #include "vec/runtime/ipv6_value.h"
 #include "vec/utils/arrow_column_to_doris_column.h"
 
@@ -55,9 +48,9 @@
 //  serialize_one_cell_to_json (const IColumn &column, int row_num, 
BufferWritable &bw, FormatOptions &options) const =0
 //  serialize_column_to_json (const IColumn &column, int start_idx, int 
end_idx, BufferWritable &bw, FormatOptions &options) const =0
 //  deserialize_one_cell_from_json (IColumn &column, Slice &slice, const 
FormatOptions &options) const =0
-//  deserialize_column_from_json_vector (IColumn &column, std::vector< Slice > 
&slices, int *num_deserialized, const FormatOptions &options) const =0
-//  deserialize_column_from_fixed_json (IColumn &column, Slice &slice, int 
rows, int *num_deserialized, const FormatOptions &options) const
-//  insert_column_last_value_multiple_times (IColumn &column, int times) const
+//  deserialize_column_from_json_vector (IColumn &column, std::vector< Slice > 
&slices, uint64_t *num_deserialized, const FormatOptions &options) const =0
+//  deserialize_column_from_fixed_json (IColumn &column, Slice &slice, 
uint64_t rows, uint64_t *num_deserialized, const FormatOptions &options) const
+//  insert_column_last_value_multiple_times (IColumn &column, uint64_t times) 
const
 // 3. fe|be protobuffer ser-deserialize
 //  write_column_to_pb (const IColumn &column, PValues &result, int start, int 
end) const =0
 //  read_column_from_pb (IColumn &column, const PValues &arg) const =0
@@ -154,16 +147,14 @@ public:
 
         while (std::getline(file, line)) {
             std::stringstream lineStream(line);
-            //            std::cout << "whole : " << lineStream.str() << 
std::endl;
             std::string value;
             int l_idx = 0;
             int c_idx = 0;
-            std::vector<string> row;
+            std::vector<std::string> row;
             while (std::getline(lineStream, value, spliter)) {
-                if (idxes.contains(l_idx)) {
+                if (!value.starts_with("//") && idxes.contains(l_idx)) {
                     // load csv data
                     Slice string_slice(value.data(), value.size());
-                    std::cout << "origin : " << string_slice << std::endl;
                     Status st;
                     // deserialize data
                     if constexpr (is_hive_format) {
@@ -213,7 +204,7 @@ public:
         if (generate_res_file) {
             // generate res
             auto pos = file_path.find_last_of(".");
-            string hive_format = is_hive_format ? "_hive" : "";
+            std::string hive_format = is_hive_format ? "_hive" : "";
             std::string res_file = file_path.substr(0, pos) + hive_format + 
"_serde_res.csv";
             std::ofstream res_f(res_file);
             if (!res_f.is_open()) {
@@ -221,8 +212,6 @@ public:
             }
             for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) {
                 for (size_t c = 0; c < assert_str_cols.size(); ++c) {
-                    std::cout << 
assert_str_cols[c]->get_data_at(r).to_string() << spliter
-                              << std::endl;
                     res_f << assert_str_cols[c]->get_data_at(r).to_string() << 
spliter;
                 }
                 res_f << std::endl;
@@ -233,6 +222,8 @@ public:
     }
 
     // standard hive text ser-deserialize assert function
+    // pb serde now is only used RPCFncall and fold_constant_executor which 
just write column data to pb value means
+    // just call write_column_to_pb
     static void assert_pb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs 
serders) {
         for (size_t i = 0; i < load_cols.size(); ++i) {
             auto& col = load_cols[i];
@@ -266,6 +257,21 @@ public:
     static void assert_jsonb_format(MutableColumns& load_cols, 
DataTypeSerDeSPtrs serders) {
         Arena pool;
         auto jsonb_column = ColumnString::create(); // jsonb column
+        // maybe these load_cols has different size, so we keep it same
+        size_t max_row_size = load_cols[0]->size();
+        for (size_t i = 1; i < load_cols.size(); ++i) {
+            if (load_cols[i]->size() > max_row_size) {
+                max_row_size = load_cols[i]->size();
+            }
+        }
+        // keep same rows
+        for (size_t i = 0; i < load_cols.size(); ++i) {
+            if (load_cols[i]->size() < max_row_size) {
+                load_cols[i]->insert_many_defaults(max_row_size - 
load_cols[i]->size());
+            } else if (load_cols[i]->size() > max_row_size) {
+                load_cols[i]->resize(max_row_size);
+            }
+        }
         jsonb_column->reserve(load_cols[0]->size());
         MutableColumns assert_cols;
         for (size_t i = 0; i < load_cols.size(); ++i) {
@@ -321,54 +327,114 @@ public:
     }
 
     // assert arrow serialize
-    static void assert_arrow_format(MutableColumns& load_cols, 
DataTypeSerDeSPtrs serders,
-                                    DataTypes types) {
+    static void assert_arrow_format(MutableColumns& load_cols, DataTypes 
types) {
         // make a block to write to arrow
         auto block = std::make_shared<Block>();
+        build_block(block, load_cols, types);
+        auto record_batch = serialize_arrow(block);
+        auto assert_block = std::make_shared<Block>(block->clone_empty());
+        deserialize_arrow(assert_block, record_batch);
+        compare_two_blocks(block, assert_block);
+    }
+
+    static void build_block(const std::shared_ptr<Block>& block, 
MutableColumns& load_cols,
+                            DataTypes types) {
+        // maybe these load_cols has different size, so we keep it same
+        size_t max_row_size = load_cols[0]->size();
+        for (size_t i = 1; i < load_cols.size(); ++i) {
+            if (load_cols[i]->size() > max_row_size) {
+                max_row_size = load_cols[i]->size();
+            }
+        }
+        // keep same rows
+        for (auto& load_col : load_cols) {
+            if (load_col->size() < max_row_size) {
+                load_col->insert_many_defaults(max_row_size - 
load_col->size());
+            } else if (load_col->size() > max_row_size) {
+                load_col->resize(max_row_size);
+            }
+        }
         for (size_t i = 0; i < load_cols.size(); ++i) {
             auto& col = load_cols[i];
             block->insert(ColumnWithTypeAndName(std::move(col), types[i], 
types[i]->get_name()));
         }
         // print block
-        std::cout << "block: " << block->dump_structure() << std::endl;
+        std::cout << "build block structure: " << block->dump_structure() << 
std::endl;
+        std::cout << "build block data: "
+                  << block->dump_data(0, std::min(max_row_size, 
static_cast<size_t>(5)))
+                  << std::endl;
+        for (int i = 0; i < block->columns(); i++) {
+            auto col = block->get_by_position(i);
+            std::cout << "col: " << i << ", " << col.column->get_name() << ", "
+                      << col.type->get_name() << ", " << col.to_string(0) << 
std::endl;
+        }
+    }
+
+    static std::shared_ptr<arrow::RecordBatch> serialize_arrow(
+            const std::shared_ptr<Block>& block) {
         std::shared_ptr<arrow::Schema> block_arrow_schema;
         EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema, 
"UTC"), Status::OK());
+        std::cout << "schema: " << block_arrow_schema->ToString(true) << 
std::endl;
         // convert block to arrow
         std::shared_ptr<arrow::RecordBatch> result;
         cctz::time_zone _timezone_obj; //default UTC
         Status stt = convert_to_arrow_batch(*block, block_arrow_schema,
                                             arrow::default_memory_pool(), 
&result, _timezone_obj);
         EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" << 
stt.to_string();
+        std::cout << "arrow serialize result: " << result->num_columns() << ", 
"
+                  << result->num_rows() << std::endl;
+        return result;
+    }
 
+    static void deserialize_arrow(const std::shared_ptr<Block>& new_block,
+                                  std::shared_ptr<arrow::RecordBatch> 
record_batch) {
         // deserialize arrow to block
-        auto assert_block = block->clone_empty();
-        auto rows = block->rows();
-        for (size_t i = 0; i < load_cols.size(); ++i) {
-            auto array = result->column(i);
-            std::cout << array.get()->ToString() << std::endl;
-            auto& column_with_type_and_name = assert_block.get_by_position(i);
+        auto rows = record_batch->num_rows();
+        for (size_t i = 0; i < record_batch->num_columns(); ++i) {
+            auto array = record_batch->column(i);
+            std::cout << "arrow record_batch pos: " << i << ", array: " << 
array->ToString()
+                      << std::endl;
+            auto& column_with_type_and_name = new_block->get_by_position(i);
             std::cout << "now we are testing column: "
-                      << column_with_type_and_name.column->get_name() << 
std::endl;
-            auto ret = arrow_column_to_doris_column(
-                    array.get(), 0, column_with_type_and_name.column,
-                    column_with_type_and_name.type, rows, _timezone_obj);
-            // do check data
-            std::cout << "arrow_column_to_doris_column done: "
                       << column_with_type_and_name.column->get_name()
-                      << " with column size: " << 
column_with_type_and_name.column->size()
-                      << std::endl;
+                      << ", type: " << 
column_with_type_and_name.type->get_name() << std::endl;
+            auto ret =
+                    arrow_column_to_doris_column(array.get(), 0, 
column_with_type_and_name.column,
+                                                 
column_with_type_and_name.type, rows, "UTC");
+            // do check data
+            std::cout << "arrow_column_to_doris_column done, column data: "
+                      << column_with_type_and_name.to_string(0)
+                      << ", column size: " << 
column_with_type_and_name.column->size() << std::endl;
             EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" << 
ret.to_string();
-            auto& col = block->get_by_position(i).column;
-            auto& assert_col = column_with_type_and_name.column;
-            std::cout << "column: " << col->get_name() << " size: " << 
col->size()
-                      << " assert size: " << assert_col->size() << std::endl;
+        }
+        std::cout << "arrow deserialize block structure: " << 
new_block->dump_structure()
+                  << std::endl;
+        std::cout << "arrow deserialize block data: "
+                  << new_block->dump_data(
+                             0, std::min(static_cast<size_t>(rows), 
static_cast<size_t>(5)))
+                  << std::endl;
+    }
+
+    static void compare_two_blocks(const std::shared_ptr<Block>& frist_block,
+                                   const std::shared_ptr<Block>& second_block) 
{
+        for (size_t i = 0; i < frist_block->columns(); ++i) {
+            EXPECT_EQ(frist_block->get_by_position(i).type, 
second_block->get_by_position(i).type);
+            auto& col = frist_block->get_by_position(i).column;
+            auto& assert_col = second_block->get_by_position(i).column;
+            std::cout << "compare_two_blocks, column: " << col->get_name()
+                      << ", type: " << 
frist_block->get_by_position(i).type->get_name()
+                      << ", size: " << col->size() << ", assert size: " << 
assert_col->size()
+                      << std::endl;
             EXPECT_EQ(assert_col->size(), col->size());
             for (size_t j = 0; j < assert_col->size(); ++j) {
+                EXPECT_EQ(frist_block->get_by_position(i).to_string(j),
+                          second_block->get_by_position(i).to_string(j));
                 auto cell = col->operator[](j);
                 auto assert_cell = assert_col->operator[](j);
                 EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() 
<< " row: " << j;
             }
         }
+        EXPECT_EQ(frist_block->dump_data(), second_block->dump_data());
     }
 
     // assert rapidjson format
diff --git a/be/test/vec/data_types/data_type_ip_test.cpp 
b/be/test/vec/data_types/data_type_ip_test.cpp
index 3bd91102d13..c4a63b77b69 100644
--- a/be/test/vec/data_types/data_type_ip_test.cpp
+++ b/be/test/vec/data_types/data_type_ip_test.cpp
@@ -264,7 +264,7 @@ TEST_F(DataTypeIPTest, SerdeMysqlAndArrowTest) {
     CommonDataTypeSerdeTest::check_data(ip_cols, serde, ';', {1, 2}, 
data_files[0],
                                         
CommonDataTypeSerdeTest::assert_mysql_format);
 
-    CommonDataTypeSerdeTest::assert_arrow_format(ip_cols, serde, {dt_ipv4, 
dt_ipv6});
+    CommonDataTypeSerdeTest::assert_arrow_format(ip_cols, {dt_ipv4, dt_ipv6});
 }
 
 TEST_F(DataTypeIPTest, SerdeTOJsonInComplex) {
diff --git a/be/test/vec/data_types/data_type_map_test.cpp 
b/be/test/vec/data_types/data_type_map_test.cpp
new file mode 100644
index 00000000000..548802b0d91
--- /dev/null
+++ b/be/test/vec/data_types/data_type_map_test.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_map.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "common/exception.h"
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeMapSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Map type nested Array and Struct and Map,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeMapTest, SerdeNestedTypeArrowTest) {
+    auto block = std::make_shared<Block>();
+    {
+        std::string col_name = "map_nesting_array";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr dt1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+        DataTypePtr dt2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f2));
+        DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+        Array a1, a2, a3, a4;
+        a1.push_back(Field("cute"));
+        a1.push_back(Null());
+        a2.push_back(Field("clever"));
+        a1.push_back(Field("hello"));
+        a3.push_back(1);
+        a3.push_back(2);
+        a4.push_back(11);
+        a4.push_back(22);
+
+        Array k1, v1;
+        k1.push_back(a1);
+        k1.push_back(a2);
+        v1.push_back(a3);
+        v1.push_back(a4);
+
+        Map m1;
+        m1.push_back(k1);
+        m1.push_back(v1);
+
+        MutableColumnPtr map_column = ma->create_column();
+        map_column->reserve(1);
+        map_column->insert(m1);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
ma, col_name);
+        block->insert(type_and_name);
+    }
+    {
+        std::string col_name = "map_nesting_struct";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+        DataTypePtr f3 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+        DataTypePtr f4 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr dt1 = std::make_shared<DataTypeNullable>(
+                std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f1, 
f2, f3}));
+        DataTypePtr dt2 = std::make_shared<DataTypeNullable>(
+                std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> 
{f4}));
+        DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+        Tuple t1, t2, t3, t4;
+        t1.push_back(Field("clever"));
+        t1.push_back(__int128_t(37));
+        t1.push_back(true);
+        t2.push_back("null");
+        t2.push_back(__int128_t(26));
+        t2.push_back(false);
+        t3.push_back(Field("cute"));
+        t4.push_back("null");
+
+        Array k1, v1;
+        k1.push_back(t1);
+        k1.push_back(t2);
+        v1.push_back(t3);
+        v1.push_back(t4);
+
+        Map m1;
+        m1.push_back(k1);
+        m1.push_back(v1);
+
+        MutableColumnPtr map_column = ma->create_column();
+        map_column->reserve(1);
+        map_column->insert(m1);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
ma, col_name);
+        block->insert(type_and_name);
+    }
+    {
+        std::string col_name = "map_nesting_map";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f3 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+        DataTypePtr f4 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+        DataTypePtr dt1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f1, f2));
+        DataTypePtr dt2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f3, f4));
+        DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+        Array k1, k2, k3, k4, v1, v2, v3, v4;
+        k1.push_back(1);
+        k1.push_back(2);
+        k2.push_back(11);
+        k2.push_back(22);
+        v1.push_back(Field("map"));
+        v1.push_back(Null());
+        v2.push_back(Field("clever map"));
+        v2.push_back(Field("hello map"));
+        k3.push_back(__int128_t(37));
+        k3.push_back(__int128_t(26));
+        k4.push_back(__int128_t(1111));
+        k4.push_back(__int128_t(432535423));
+        v3.push_back(true);
+        v3.push_back(false);
+        v4.push_back(false);
+        v4.push_back(true);
+
+        Map m11, m12, m21, m22;
+        m11.push_back(k1);
+        m11.push_back(v1);
+        m12.push_back(k2);
+        m12.push_back(v2);
+        m21.push_back(k3);
+        m21.push_back(v3);
+        m22.push_back(k4);
+        m22.push_back(v4);
+
+        Array kk1, vv1;
+        kk1.push_back(m11);
+        kk1.push_back(m12);
+        vv1.push_back(m21);
+        vv1.push_back(m22);
+
+        Map m1;
+        m1.push_back(kk1);
+        m1.push_back(vv1);
+
+        MutableColumnPtr map_column = ma->create_column();
+        map_column->reserve(1);
+        map_column->insert(m1);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
ma, col_name);
+        block->insert(type_and_name);
+    }
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/data_type_struct_test.cpp 
b/be/test/vec/data_types/data_type_struct_test.cpp
new file mode 100644
index 00000000000..1fc8aac0312
--- /dev/null
+++ b/be/test/vec/data_types/data_type_struct_test.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_struct.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeStructSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Struct type nested Array and Map and Struct,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeStructTest, SerdeNestedTypeArrowTest) {
+    auto block = std::make_shared<Block>();
+    {
+        std::string col_name = "struct_nesting_array_map_struct";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr f3 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f4 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f5 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+        DataTypePtr f6 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+        DataTypePtr dt1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+        DataTypePtr dt2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f2, f3));
+        DataTypePtr dt3 = std::make_shared<DataTypeNullable>(
+                std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f4, 
f5, f6}));
+        DataTypePtr st = 
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {dt1, dt2, dt3});
+
+        // nested Array
+        Array a1, a2;
+        a1.push_back(Field("array"));
+        a1.push_back(Null());
+        a2.push_back(Field("lucky array"));
+        a2.push_back(Field("cute array"));
+
+        // nested Map
+        Array k1, k2, v1, v2;
+        k1.push_back(1);
+        k1.push_back(2);
+        k2.push_back(11);
+        k2.push_back(22);
+        v1.push_back(Field("map"));
+        v1.push_back(Null());
+        v2.push_back(Field("clever map"));
+        v2.push_back(Field("hello map"));
+
+        Map m1, m2;
+        m1.push_back(k1);
+        m1.push_back(v1);
+        m2.push_back(k2);
+        m2.push_back(v2);
+
+        // nested Struct
+        Tuple t1, t2;
+        t1.push_back(Field("clever"));
+        t1.push_back(__int128_t(37));
+        t1.push_back(true);
+        t2.push_back("null");
+        t2.push_back(__int128_t(26));
+        t2.push_back(false);
+
+        // Struct
+        Tuple tt1, tt2;
+        tt1.push_back(a1);
+        tt1.push_back(m1);
+        tt1.push_back(t1);
+        tt2.push_back(a2);
+        tt2.push_back(m2);
+        tt2.push_back(t2);
+
+        MutableColumnPtr struct_column = st->create_column();
+        struct_column->reserve(2);
+        struct_column->insert(tt1);
+        struct_column->insert(tt2);
+        vectorized::ColumnWithTypeAndName 
type_and_name(struct_column->get_ptr(), st, col_name);
+        block->insert(type_and_name);
+    }
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
new file mode 100644
index 00000000000..f4caa35069f
--- /dev/null
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -0,0 +1,479 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/array/builder_base.h>
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/record_batch.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+#include <arrow/visit_type_inline.h>
+#include <arrow/visitor.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/types.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <cmath>
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "olap/hll.h"
+#include "runtime/descriptors.cpp"
+#include "util/arrow/block_convertor.h"
+#include "util/arrow/row_batch.h"
+#include "util/string_parser.hpp"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/block.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
+#include "vec/data_types/data_type_ipv4.h"
+#include "vec/data_types/data_type_ipv6.h"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_time_v2.h"
+#include "vec/runtime/vdatetime_value.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+void serialize_and_deserialize_arrow_test(std::vector<PrimitiveType> cols, int 
row_num,
+                                          bool is_nullable) {
+    auto block = std::make_shared<Block>();
+    for (int i = 0; i < cols.size(); i++) {
+        std::string col_name = std::to_string(i);
+        TypeDescriptor type_desc(cols[i]);
+        switch (cols[i]) {
+        case TYPE_BOOLEAN: {
+            auto vec = vectorized::ColumnVector<UInt8>::create();
+            auto& data = vec->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                data.push_back(i % 2);
+            }
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeUInt8>());
+            vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, col_name);
+            block->insert(std::move(type_and_name));
+        } break;
+        case TYPE_INT:
+            if (is_nullable) {
+                {
+                    auto column_vector_int32 = 
vectorized::ColumnVector<Int32>::create();
+                    auto column_nullable_vector =
+                            
vectorized::make_nullable(std::move(column_vector_int32));
+                    auto mutable_nullable_vector = 
std::move(*column_nullable_vector).mutate();
+                    for (int i = 0; i < row_num; i++) {
+                        if (i % 2 == 0) {
+                            mutable_nullable_vector->insert_default();
+                        } else {
+                            mutable_nullable_vector->insert(int32(i));
+                        }
+                    }
+                    auto data_type = vectorized::make_nullable(
+                            std::make_shared<vectorized::DataTypeInt32>());
+                    vectorized::ColumnWithTypeAndName type_and_name(
+                            mutable_nullable_vector->get_ptr(), data_type, 
col_name);
+                    block->insert(type_and_name);
+                }
+            } else {
+                auto vec = vectorized::ColumnVector<Int32>::create();
+                auto& data = vec->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    data.push_back(i);
+                }
+                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
+                vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
+                                                                col_name);
+                block->insert(std::move(type_and_name));
+            }
+            break;
+        case TYPE_DECIMAL32:
+            type_desc.precision = 9;
+            type_desc.scale = 2;
+            {
+                vectorized::DataTypePtr decimal_data_type =
+                        
std::make_shared<DataTypeDecimal<Decimal32>>(type_desc.precision,
+                                                                     
type_desc.scale);
+                auto decimal_column = decimal_data_type->create_column();
+                auto& data = 
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int32>>*)
+                                      decimal_column.get())
+                                     ->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    if (i == 0) {
+                        data.push_back(Int32(0));
+                        continue;
+                    }
+                    Int32 val;
+                    StringParser::ParseResult result = 
StringParser::PARSE_SUCCESS;
+                    i % 2 == 0 ? val = 
StringParser::string_to_decimal<TYPE_DECIMAL32>(
+                                         "1234567.56", 11, 
type_desc.precision, type_desc.scale,
+                                         &result)
+                               : val = 
StringParser::string_to_decimal<TYPE_DECIMAL32>(
+                                         "-1234567.56", 12, 
type_desc.precision, type_desc.scale,
+                                         &result);
+                    EXPECT_TRUE(result == StringParser::PARSE_SUCCESS);
+                    data.push_back(val);
+                }
+
+                vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
+                                                                
decimal_data_type, col_name);
+                block->insert(type_and_name);
+            }
+            break;
+        case TYPE_DECIMAL64:
+            type_desc.precision = 18;
+            type_desc.scale = 6;
+            {
+                vectorized::DataTypePtr decimal_data_type =
+                        
std::make_shared<DataTypeDecimal<Decimal64>>(type_desc.precision,
+                                                                     
type_desc.scale);
+                auto decimal_column = decimal_data_type->create_column();
+                auto& data = 
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int64>>*)
+                                      decimal_column.get())
+                                     ->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    if (i == 0) {
+                        data.push_back(Int64(0));
+                        continue;
+                    }
+                    Int64 val;
+                    StringParser::ParseResult result = 
StringParser::PARSE_SUCCESS;
+                    std::string decimal_string =
+                            i % 2 == 0 ? "-123456789012.123456" : 
"123456789012.123456";
+                    val = StringParser::string_to_decimal<TYPE_DECIMAL64>(
+                            decimal_string.c_str(), decimal_string.size(), 
type_desc.precision,
+                            type_desc.scale, &result);
+                    EXPECT_TRUE(result == StringParser::PARSE_SUCCESS);
+                    data.push_back(val);
+                }
+                vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
+                                                                
decimal_data_type, col_name);
+                block->insert(type_and_name);
+            }
+            break;
+        case TYPE_DECIMAL128I:
+            type_desc.precision = 27;
+            type_desc.scale = 9;
+            {
+                vectorized::DataTypePtr decimal_data_type(
+                        doris::vectorized::create_decimal(27, 9, true));
+                auto decimal_column = decimal_data_type->create_column();
+                auto& data = 
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
+                                      decimal_column.get())
+                                     ->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    auto value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
+                    data.push_back(value);
+                }
+                vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
+                                                                
decimal_data_type, col_name);
+                block->insert(type_and_name);
+            }
+            break;
+        case TYPE_STRING: {
+            auto strcol = vectorized::ColumnString::create();
+            for (int i = 0; i < row_num; ++i) {
+                std::string is = std::to_string(i);
+                strcol->insert_data(is.c_str(), is.size());
+            }
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+            vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), 
data_type, col_name);
+            block->insert(type_and_name);
+        } break;
+        case TYPE_HLL: {
+            vectorized::DataTypePtr 
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+            auto hll_column = hll_data_type->create_column();
+            std::vector<HyperLogLog>& container =
+                    ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                HyperLogLog hll;
+                hll.update(i);
+                container.push_back(hll);
+            }
+            vectorized::ColumnWithTypeAndName 
type_and_name(hll_column->get_ptr(), hll_data_type,
+                                                            col_name);
+
+            block->insert(type_and_name);
+        } break;
+        case TYPE_DATEV2: {
+            auto column_vector_date_v2 = 
vectorized::ColumnVector<vectorized::UInt32>::create();
+            auto& date_v2_data = column_vector_date_v2->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                DateV2Value<DateV2ValueType> value;
+                value.from_date_int64(20210501);
+                
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+            }
+            vectorized::DataTypePtr 
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
+            vectorized::ColumnWithTypeAndName 
test_date_v2(column_vector_date_v2->get_ptr(),
+                                                           date_v2_type, 
col_name);
+            block->insert(test_date_v2);
+        } break;
+        case TYPE_DATE: // int64
+        {
+            auto column_vector_date = 
vectorized::ColumnVector<vectorized::Int64>::create();
+            auto& date_data = column_vector_date->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                VecDateTimeValue value;
+                value.from_date_int64(20210501);
+                
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+            }
+            vectorized::DataTypePtr 
date_type(std::make_shared<vectorized::DataTypeDate>());
+            vectorized::ColumnWithTypeAndName 
test_date(column_vector_date->get_ptr(), date_type,
+                                                        col_name);
+            block->insert(test_date);
+        } break;
+        case TYPE_DATETIME: // int64
+        {
+            auto column_vector_datetime = 
vectorized::ColumnVector<vectorized::Int64>::create();
+            auto& datetime_data = column_vector_datetime->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                VecDateTimeValue value;
+                value.from_date_int64(20210501080910);
+                
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+            }
+            vectorized::DataTypePtr 
datetime_type(std::make_shared<vectorized::DataTypeDateTime>());
+            vectorized::ColumnWithTypeAndName 
test_datetime(column_vector_datetime->get_ptr(),
+                                                            datetime_type, 
col_name);
+            block->insert(test_datetime);
+        } break;
+        case TYPE_DATETIMEV2: // uint64
+        {
+            auto column_vector_datetimev2 = 
vectorized::ColumnVector<vectorized::UInt64>::create();
+            DateV2Value<DateTimeV2ValueType> value;
+            string date_literal = "2022-01-01 11:11:11.111";
+            cctz::time_zone ctz;
+            TimezoneUtils::find_cctz_time_zone("UTC", ctz);
+            EXPECT_TRUE(value.from_date_str(date_literal.c_str(), 
date_literal.size(), ctz, 3));
+            char to[64] = {};
+            std::cout << "value: " << value.to_string(to) << std::endl;
+            for (int i = 0; i < row_num; ++i) {
+                column_vector_datetimev2->insert(value.to_date_int_val());
+            }
+            vectorized::DataTypePtr datetimev2_type(
+                    std::make_shared<vectorized::DataTypeDateTimeV2>(3));
+            vectorized::ColumnWithTypeAndName 
test_datetimev2(column_vector_datetimev2->get_ptr(),
+                                                              datetimev2_type, 
col_name);
+            block->insert(test_datetimev2);
+        } break;
+        case TYPE_ARRAY: // array
+            type_desc.add_sub_type(TYPE_STRING, true);
+            {
+                DataTypePtr s =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+                DataTypePtr au = std::make_shared<DataTypeArray>(s);
+                Array a1, a2;
+                a1.push_back(Field("sss"));
+                a1.push_back(Null());
+                a1.push_back(Field("clever amory"));
+                a2.push_back(Field("hello amory"));
+                a2.push_back(Null());
+                a2.push_back(Field("cute amory"));
+                a2.push_back(Field("sf"));
+                MutableColumnPtr array_column = au->create_column();
+                array_column->reserve(2);
+                array_column->insert(a1);
+                array_column->insert(a2);
+                vectorized::ColumnWithTypeAndName 
type_and_name(array_column->get_ptr(), au,
+                                                                col_name);
+                block->insert(type_and_name);
+            }
+            break;
+        case TYPE_MAP:
+            type_desc.add_sub_type(TYPE_STRING, true);
+            type_desc.add_sub_type(TYPE_STRING, true);
+            {
+                DataTypePtr s =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+                ;
+                DataTypePtr d =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+                DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
+                Array k1, k2, v1, v2;
+                k1.push_back("null");
+                k1.push_back("doris");
+                k1.push_back("clever amory");
+                v1.push_back("ss");
+                v1.push_back(Null());
+                v1.push_back("NULL");
+                k2.push_back("hello amory");
+                k2.push_back("NULL");
+                k2.push_back("cute amory");
+                k2.push_back("doris");
+                v2.push_back("s");
+                v2.push_back("0");
+                v2.push_back("sf");
+                v2.push_back(Null());
+                Map m1, m2;
+                m1.push_back(k1);
+                m1.push_back(v1);
+                m2.push_back(k2);
+                m2.push_back(v2);
+                MutableColumnPtr map_column = m->create_column();
+                map_column->reserve(2);
+                map_column->insert(m1);
+                map_column->insert(m2);
+                vectorized::ColumnWithTypeAndName 
type_and_name(map_column->get_ptr(), m, col_name);
+                block->insert(type_and_name);
+            }
+            break;
+        case TYPE_STRUCT:
+            type_desc.add_sub_type(TYPE_STRING, "name", true);
+            type_desc.add_sub_type(TYPE_LARGEINT, "age", true);
+            type_desc.add_sub_type(TYPE_BOOLEAN, "is", true);
+            {
+                DataTypePtr s =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+                DataTypePtr d =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+                DataTypePtr m =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+                DataTypePtr st =
+                        
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m});
+                Tuple t1, t2;
+                t1.push_back(Field("amory cute"));
+                t1.push_back(__int128_t(37));
+                t1.push_back(true);
+                t2.push_back("null");
+                t2.push_back(__int128_t(26));
+                t2.push_back(false);
+                MutableColumnPtr struct_column = st->create_column();
+                struct_column->reserve(2);
+                struct_column->insert(t1);
+                struct_column->insert(t2);
+                vectorized::ColumnWithTypeAndName 
type_and_name(struct_column->get_ptr(), st,
+                                                                col_name);
+                block->insert(type_and_name);
+            }
+            break;
+        case TYPE_IPV4: {
+            auto vec = vectorized::ColumnIPv4::create();
+            auto& data = vec->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                data.push_back(i);
+            }
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeIPv4>());
+            vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, col_name);
+            block->insert(std::move(type_and_name));
+        } break;
+        case TYPE_IPV6: {
+            auto vec = vectorized::ColumnIPv6::create();
+            auto& data = vec->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                data.push_back(i);
+            }
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeIPv6>());
+            vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, col_name);
+            block->insert(std::move(type_and_name));
+        } break;
+        default:
+            LOG(FATAL) << "error column type";
+        }
+    }
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
+    std::vector<PrimitiveType> cols = {
+            TYPE_INT,        TYPE_INT,       TYPE_STRING, TYPE_DECIMAL128I, 
TYPE_BOOLEAN,
+            TYPE_DECIMAL32,  TYPE_DECIMAL64, TYPE_IPV4,   TYPE_IPV6,        
TYPE_DATETIME,
+            TYPE_DATETIMEV2, TYPE_DATE,      TYPE_DATEV2,
+    };
+    serialize_and_deserialize_arrow_test(cols, 7, true);
+    serialize_and_deserialize_arrow_test(cols, 7, false);
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) {
+    std::vector<PrimitiveType> cols = {TYPE_ARRAY, TYPE_MAP, TYPE_STRUCT};
+    serialize_and_deserialize_arrow_test(cols, 7, true);
+    serialize_and_deserialize_arrow_test(cols, 7, false);
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
+    std::string col_name = "map_null_key";
+    auto block = std::make_shared<Block>();
+    {
+        DataTypePtr s = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr d = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
+        Array k1, k2, v1, v2, k3, v3;
+        k1.push_back("doris");
+        k1.push_back("clever amory");
+        v1.push_back(Null());
+        v1.push_back(30);
+        k2.push_back("hello amory");
+        k2.push_back("NULL");
+        k2.push_back("cute amory");
+        k2.push_back("doris");
+        v2.push_back(26);
+        v2.push_back(Null());
+        v2.push_back(6);
+        v2.push_back(7);
+        k3.push_back("test");
+        v3.push_back(11);
+        Map m1, m2, m3;
+        m1.push_back(k1);
+        m1.push_back(v1);
+        m2.push_back(k2);
+        m2.push_back(v2);
+        m3.push_back(k3);
+        m3.push_back(v3);
+        MutableColumnPtr map_column = m->create_column();
+        map_column->reserve(3);
+        map_column->insert(m1);
+        map_column->insert(m2);
+        map_column->insert(m3);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
m, col_name);
+        block->insert(type_and_name);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp
index 1bd9e478b89..7a64d6e7fa2 100644
--- a/be/test/vec/exprs/vexpr_test.cpp
+++ b/be/test/vec/exprs/vexpr_test.cpp
@@ -94,8 +94,10 @@ static doris::TupleDescriptor* create_tuple_desc(
             
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 
9).to_thrift());
         } else {
             TypeDescriptor descriptor(column_descs[i].type);
-            if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) {
+            if (column_descs[i].precision >= 0) {
                 descriptor.precision = column_descs[i].precision;
+            }
+            if (column_descs[i].scale >= 0) {
                 descriptor.scale = column_descs[i].scale;
             }
             t_slot_desc.__set_slotType(descriptor.to_thrift());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to