This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8f79742f7d9 branch-2.1: [fix](arrow) Fix Arrow serialization and 
deserialization of Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types 
(#49244)
8f79742f7d9 is described below

commit 8f79742f7d98a3252abe8a9ebdc056fd2399a56e
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Thu Mar 20 09:57:04 2025 +0800

    branch-2.1: [fix](arrow) Fix Arrow serialization and deserialization of 
Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types (#49244)
    
    ### What problem does this PR solve?
    
    pick #48944 [fix](arrow) Fix UT DataTypeSerDeArrowTest of
    Array/Map/Struct/Bitmap/HLL/Decimal256 types
    pick #48398  [fix](arrow) Fix UT DataTypeSerDeArrowTest of Date type
---
 be/src/runtime/types.cpp                           |  27 +-
 be/src/runtime/types.h                             |   3 +-
 be/src/util/arrow/block_convertor.cpp              | 272 +-----------
 be/src/util/arrow/row_batch.cpp                    |   2 -
 be/src/vec/columns/column_array.cpp                |   1 +
 be/src/vec/columns/column_map.cpp                  |   1 +
 be/src/vec/columns/column_string.h                 |   4 +
 be/src/vec/columns/column_struct.cpp               |   1 +
 be/src/vec/columns/column_vector.h                 |   5 +-
 be/src/vec/data_types/data_type_time_v2.h          |   4 +-
 .../data_types/serde/data_type_date64_serde.cpp    |  33 +-
 .../vec/data_types/serde/data_type_date64_serde.h  |   7 +
 .../serde/data_type_datetimev2_serde.cpp           |   5 +-
 .../data_types/serde/data_type_datev2_serde.cpp    |  11 +-
 .../data_types/serde/data_type_decimal_serde.cpp   |  45 +-
 .../vec/data_types/serde/data_type_ipv6_serde.cpp  |  18 +-
 .../data_types/serde/data_type_number_serde.cpp    |  20 +-
 .../vec/data_types/common_data_type_serder_test.h  | 445 +++++++++++++++++++
 be/test/vec/data_types/data_type_map_test.cpp      | 178 ++++++++
 be/test/vec/data_types/data_type_struct_test.cpp   | 115 +++++
 .../serde/data_type_serde_arrow_test.cpp           | 481 +++++++--------------
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |   4 +-
 be/test/vec/exprs/vexpr_test.cpp                   |   4 +-
 23 files changed, 1037 insertions(+), 649 deletions(-)

diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 14ba4b2cebd..f0112ebd2bc 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -46,12 +46,20 @@ TypeDescriptor::TypeDescriptor(const 
std::vector<TTypeNode>& types, int* idx)
             DCHECK(scalar_type.__isset.len);
             len = scalar_type.len;
         } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type 
== TYPE_DATETIMEV2 ||
-                   type == TYPE_TIMEV2) {
+                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
             DCHECK(scalar_type.__isset.precision);
             DCHECK(scalar_type.__isset.scale);
             precision = scalar_type.precision;
             scale = scalar_type.scale;
+        } else if (type == TYPE_DATETIMEV2) {
+            DCHECK(scalar_type.__isset.scale);
+            scale = scalar_type.scale;
+        } else if (type == TYPE_TIMEV2) {
+            if (scalar_type.__isset.scale) {
+                scale = scalar_type.scale;
+            } else {
+                scale = 0;
+            }
         } else if (type == TYPE_STRING) {
             if (scalar_type.__isset.len) {
                 len = scalar_type.len;
@@ -152,11 +160,14 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) 
const {
             // DCHECK_NE(len, -1);
             scalar_type.__set_len(len);
         } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type 
== TYPE_DATETIMEV2) {
+                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
             DCHECK_NE(precision, -1);
             DCHECK_NE(scale, -1);
             scalar_type.__set_precision(precision);
             scalar_type.__set_scale(scale);
+        } else if (type == TYPE_DATETIMEV2) {
+            DCHECK_NE(scale, -1);
+            scalar_type.__set_scale(scale);
         }
     }
 }
@@ -169,11 +180,14 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
     if (type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_HLL || type 
== TYPE_STRING) {
         scalar_type->set_len(len);
     } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-               type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type == 
TYPE_DATETIMEV2) {
+               type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
         DCHECK_NE(precision, -1);
         DCHECK_NE(scale, -1);
         scalar_type->set_precision(precision);
         scalar_type->set_scale(scale);
+    } else if (type == TYPE_DATETIMEV2) {
+        DCHECK_NE(scale, -1);
+        scalar_type->set_scale(scale);
     } else if (type == TYPE_ARRAY) {
         node->set_type(TTypeNodeType::ARRAY);
         node->set_contains_null(contains_nulls[0]);
@@ -219,11 +233,14 @@ TypeDescriptor::TypeDescriptor(const 
google::protobuf::RepeatedPtrField<PTypeNod
             DCHECK(scalar_type.has_len());
             len = scalar_type.len();
         } else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == 
TYPE_DECIMAL64 ||
-                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type 
== TYPE_DATETIMEV2) {
+                   type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
             DCHECK(scalar_type.has_precision());
             DCHECK(scalar_type.has_scale());
             precision = scalar_type.precision();
             scale = scalar_type.scale();
+        } else if (type == TYPE_DATETIMEV2) {
+            DCHECK(scalar_type.has_scale());
+            scale = scalar_type.scale();
         } else if (type == TYPE_STRING) {
             if (scalar_type.has_len()) {
                 len = scalar_type.len();
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 4cb7d51e4b5..c96480274fc 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -49,6 +49,7 @@ struct TypeDescriptor {
 
     /// Only set if type == TYPE_DECIMAL
     int precision;
+    /// Only set if type == TYPE_DECIMAL or type = TYPE_DATETIMEV2
     int scale;
 
     std::vector<TypeDescriptor> children;
@@ -68,11 +69,11 @@ struct TypeDescriptor {
 
     // explicit TypeDescriptor(PrimitiveType type) :
     TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1), 
scale(-1) {
+        // TODO, should not initialize default values, force initialization by 
parameters or external.
         if (type == TYPE_DECIMALV2) {
             precision = 27;
             scale = 9;
         } else if (type == TYPE_DATETIMEV2) {
-            precision = 18;
             scale = 6;
         }
     }
diff --git a/be/src/util/arrow/block_convertor.cpp 
b/be/src/util/arrow/block_convertor.cpp
index 817231e02ba..4db60144ea5 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -66,10 +66,7 @@ class Array;
 
 namespace doris {
 
-// Convert Block to an Arrow::Array
-// We should keep this function to keep compatible with arrow's type visitor
-// Now we inherit TypeVisitor to use default Visit implementation
-class FromBlockConverter : public arrow::TypeVisitor {
+class FromBlockConverter {
 public:
     FromBlockConverter(const vectorized::Block& block, const 
std::shared_ptr<arrow::Schema>& schema,
                        arrow::MemoryPool* pool, const cctz::time_zone& 
timezone_obj)
@@ -79,276 +76,11 @@ public:
               _cur_field_idx(-1),
               _timezone_obj(timezone_obj) {}
 
-    ~FromBlockConverter() override = default;
-
-    // Use base class function
-    using arrow::TypeVisitor::Visit;
-
-#define PRIMITIVE_VISIT(TYPE) \
-    arrow::Status Visit(const arrow::TYPE& type) override { return 
_visit(type); }
-
-    PRIMITIVE_VISIT(Int8Type)
-    PRIMITIVE_VISIT(Int16Type)
-    PRIMITIVE_VISIT(Int32Type)
-    PRIMITIVE_VISIT(Int64Type)
-    PRIMITIVE_VISIT(FloatType)
-    PRIMITIVE_VISIT(DoubleType)
-
-#undef PRIMITIVE_VISIT
-
-    // process string-transformable field
-    arrow::Status Visit(const arrow::StringType& type) override {
-        auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = start; i < start + num_rows; ++i) {
-            bool is_null = _cur_col->is_null_at(i);
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            const auto& data_ref = _cur_col->get_data_at(i);
-            vectorized::TypeIndex type_idx = 
vectorized::remove_nullable(_cur_type)->get_type_id();
-            switch (type_idx) {
-            case vectorized::TypeIndex::String:
-            case vectorized::TypeIndex::FixedString:
-            case vectorized::TypeIndex::HLL: {
-                if (data_ref.size == 0) {
-                    // 0x01 is a magic num, not useful actually, just for 
present ""
-                    //char* tmp_val = reinterpret_cast<char*>(0x01);
-                    ARROW_RETURN_NOT_OK(builder.Append(""));
-                } else {
-                    ARROW_RETURN_NOT_OK(builder.Append(data_ref.data, 
data_ref.size));
-                }
-                break;
-            }
-            case vectorized::TypeIndex::Date:
-            case vectorized::TypeIndex::DateTime: {
-                char buf[64];
-                const VecDateTimeValue* time_val = (const 
VecDateTimeValue*)(data_ref.data);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case vectorized::TypeIndex::DateV2: {
-                char buf[64];
-                const DateV2Value<DateV2ValueType>* time_val =
-                        (const DateV2Value<DateV2ValueType>*)(data_ref.data);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case vectorized::TypeIndex::DateTimeV2: {
-                char buf[64];
-                const DateV2Value<DateTimeV2ValueType>* time_val =
-                        (const 
DateV2Value<DateTimeV2ValueType>*)(data_ref.data);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case vectorized::TypeIndex::Int128: {
-                auto string_temp = LargeIntValue::to_string(
-                        reinterpret_cast<const 
PackedInt128*>(data_ref.data)->value);
-                ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), 
string_temp.size()));
-                break;
-            }
-            case vectorized::TypeIndex::JSONB: {
-                std::string string_temp =
-                        JsonbToJson::jsonb_to_json_string(data_ref.data, 
data_ref.size);
-                ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), 
string_temp.size()));
-                break;
-            }
-            default: {
-                LOG(WARNING) << "can't convert this type = " << 
vectorized::getTypeName(type_idx)
-                             << " to arrow type";
-                return arrow::Status::TypeError("unsupported column type");
-            }
-            }
-        }
-        return arrow::Status::OK();
-    }
-
-    // process doris Decimal
-    arrow::Status Visit(const arrow::Decimal128Type& type) override {
-        auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        if (auto* decimalv2_column = vectorized::check_and_get_column<
-                    vectorized::ColumnDecimal<vectorized::Decimal128V2>>(
-                    *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(27, 9);
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimalv2_column->get_data_at(i);
-                const PackedInt128* p_value = reinterpret_cast<const 
PackedInt128*>(data_ref.data);
-                int64_t high = (p_value->value) >> 64;
-                uint64 low = p_value->value;
-                arrow::Decimal128 value(high, low);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else if (auto* decimal128_column = vectorized::check_and_get_column<
-                           
vectorized::ColumnDecimal<vectorized::Decimal128V3>>(
-                           *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(38, 
decimal128_column->get_scale());
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimal128_column->get_data_at(i);
-                const PackedInt128* p_value = reinterpret_cast<const 
PackedInt128*>(data_ref.data);
-                int64_t high = (p_value->value) >> 64;
-                uint64 low = p_value->value;
-                arrow::Decimal128 value(high, low);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else if (auto* decimal32_column = vectorized::check_and_get_column<
-                           vectorized::ColumnDecimal<vectorized::Decimal32>>(
-                           *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(8, 
decimal32_column->get_scale());
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimal32_column->get_data_at(i);
-                const int32_t* p_value = reinterpret_cast<const 
int32_t*>(data_ref.data);
-                int64_t high = *p_value > 0 ? 0 : 1UL << 63;
-                arrow::Decimal128 value(high, *p_value > 0 ? *p_value : 
-*p_value);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else if (auto* decimal64_column = vectorized::check_and_get_column<
-                           vectorized::ColumnDecimal<vectorized::Decimal64>>(
-                           *vectorized::remove_nullable(_cur_col))) {
-            std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                    std::make_shared<arrow::Decimal128Type>(18, 
decimal64_column->get_scale());
-            ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = decimal64_column->get_data_at(i);
-                const int64_t* p_value = reinterpret_cast<const 
int64_t*>(data_ref.data);
-                int64_t high = *p_value > 0 ? 0 : 1UL << 63;
-                arrow::Decimal128 value(high, *p_value > 0 ? *p_value : 
-*p_value);
-                ARROW_RETURN_NOT_OK(builder.Append(value));
-            }
-            return arrow::Status::OK();
-        } else {
-            return arrow::Status::TypeError("Unsupported column:" + 
_cur_col->get_name());
-        }
-    }
-    // process boolean
-    arrow::Status Visit(const arrow::BooleanType& type) override {
-        auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = start; i < start + num_rows; ++i) {
-            bool is_null = _cur_col->is_null_at(i);
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            const auto& data_ref = _cur_col->get_data_at(i);
-            ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data));
-        }
-        return arrow::Status::OK();
-    }
-
-    // process array type
-    arrow::Status Visit(const arrow::ListType& type) override {
-        auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder);
-        auto orignal_col = _cur_col;
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-
-        const vectorized::ColumnArray* array_column = nullptr;
-        if (orignal_col->is_nullable()) {
-            auto nullable_column =
-                    assert_cast<const 
vectorized::ColumnNullable*>(orignal_col.get());
-            array_column = assert_cast<const vectorized::ColumnArray*>(
-                    &nullable_column->get_nested_column());
-        } else {
-            array_column = assert_cast<const 
vectorized::ColumnArray*>(orignal_col.get());
-        }
-        const auto& offsets = array_column->get_offsets();
-        vectorized::ColumnPtr nested_column = array_column->get_data_ptr();
-
-        // set current col/type/builder to nested
-        _cur_col = nested_column;
-        if (_cur_type->is_nullable()) {
-            auto nullable_type = assert_cast<const 
vectorized::DataTypeNullable*>(_cur_type.get());
-            _cur_type = assert_cast<const vectorized::DataTypeArray*>(
-                                nullable_type->get_nested_type().get())
-                                ->get_nested_type();
-        } else {
-            _cur_type = assert_cast<const 
vectorized::DataTypeArray*>(_cur_type.get())
-                                ->get_nested_type();
-        }
-        _cur_builder = builder.value_builder();
-
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = start; i < start + num_rows; ++i) {
-            bool is_null = orignal_col->is_null_at(i);
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            // append array elements in row i
-            ARROW_RETURN_NOT_OK(builder.Append());
-            _cur_start = offsets[i - 1];
-            _cur_rows = offsets[i] - offsets[i - 1];
-            ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(), 
this));
-        }
-
-        return arrow::Status::OK();
-    }
+    ~FromBlockConverter() = default;
 
     Status convert(std::shared_ptr<arrow::RecordBatch>* out);
 
 private:
-    template <typename T>
-    arrow::Status _visit(const T& type) {
-        auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder);
-        size_t start = _cur_start;
-        size_t num_rows = _cur_rows;
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        if (_cur_col->is_nullable()) {
-            for (size_t i = start; i < start + num_rows; ++i) {
-                bool is_null = _cur_col->is_null_at(i);
-                if (is_null) {
-                    ARROW_RETURN_NOT_OK(builder.AppendNull());
-                    continue;
-                }
-                const auto& data_ref = _cur_col->get_data_at(i);
-                ARROW_RETURN_NOT_OK(builder.Append(*(const typename 
T::c_type*)data_ref.data));
-            }
-        } else {
-            ARROW_RETURN_NOT_OK(builder.AppendValues(
-                    (const typename 
T::c_type*)_cur_col->get_data_at(start).data, num_rows));
-        }
-        return arrow::Status::OK();
-    }
-
     const vectorized::Block& _block;
     const std::shared_ptr<arrow::Schema>& _schema;
     arrow::MemoryPool* _pool;
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index ecdd733e76b..38dd40ca4c4 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -105,8 +105,6 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         }
         break;
     case TYPE_DECIMALV2:
-        *result = std::make_shared<arrow::Decimal128Type>(27, 9);
-        break;
     case TYPE_DECIMAL32:
     case TYPE_DECIMAL64:
     case TYPE_DECIMAL128I:
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index 9c40676af47..949e992af1d 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -410,6 +410,7 @@ void ColumnArray::update_crcs_with_value(uint32_t* 
__restrict hash, PrimitiveTyp
 }
 
 void ColumnArray::insert(const Field& x) {
+    DCHECK_EQ(x.get_type(), Field::Types::Array);
     if (x.is_null()) {
         get_data().insert(Null());
         get_offsets().push_back(get_offsets().back() + 1);
diff --git a/be/src/vec/columns/column_map.cpp 
b/be/src/vec/columns/column_map.cpp
index 694d2e39f5c..46df3c4b59e 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -142,6 +142,7 @@ void ColumnMap::insert_data(const char*, size_t) {
 }
 
 void ColumnMap::insert(const Field& x) {
+    DCHECK_EQ(x.get_type(), Field::Types::Map);
     const auto& map = doris::vectorized::get<const Map&>(x);
     CHECK_EQ(map.size(), 2);
     const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 81a495eabd1..b9d684eb53a 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -151,6 +151,10 @@ public:
             const auto& real_field = vectorized::get<const JsonbField&>(x);
             s = StringRef(real_field.get_value(), real_field.get_size());
         } else {
+            DCHECK_EQ(x.get_type(), Field::Types::String);
+            // If `x.get_type()` is not String, such as UInt64, may get the 
error
+            // `string column length is too large: 
total_length=13744632839234567870`
+            // because `<String>(x).size() = 13744632839234567870`
             s.data = vectorized::get<const String&>(x).data();
             s.size = vectorized::get<const String&>(x).size();
         }
diff --git a/be/src/vec/columns/column_struct.cpp 
b/be/src/vec/columns/column_struct.cpp
index be0f4e7adde..8cd71d822b2 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -134,6 +134,7 @@ bool ColumnStruct::is_default_at(size_t n) const {
 }
 
 void ColumnStruct::insert(const Field& x) {
+    DCHECK_EQ(x.get_type(), Field::Types::Tuple);
     const auto& tuple = x.get<const Tuple&>();
     const size_t tuple_size = columns.size();
     if (tuple.size() != tuple_size) {
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 444603c1d87..a7c1ddc7ccb 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -356,8 +356,9 @@ public:
 
     // For example, during create column_const(1, uint8), will use 
NearestFieldType
     // to cast a uint8 to int64, so that the Field is int64, but the column is 
created
-    // using data_type, so that T == uint8. After the field is created, it 
will be inserted
-    // into the column, but its type is different from column's data type, so 
that during column
+    // using data_type, so that T == uint8, NearestFieldType<T> == uint64.
+    // After the field is created, it will be inserted into the column,
+    // but its type is different from column's data type (int64 vs uint64), so 
that during column
     // insert method, should use NearestFieldType<T> to get the Field and get 
it actual
     // uint8 value and then insert into column.
     void insert(const Field& x) override {
diff --git a/be/src/vec/data_types/data_type_time_v2.h 
b/be/src/vec/data_types/data_type_time_v2.h
index 86ce7836c56..28360f7e4de 100644
--- a/be/src/vec/data_types/data_type_time_v2.h
+++ b/be/src/vec/data_types/data_type_time_v2.h
@@ -114,7 +114,9 @@ public:
     DataTypeDateTimeV2(const DataTypeDateTimeV2& rhs) : _scale(rhs._scale) {}
     TypeIndex get_type_id() const override { return TypeIndex::DateTimeV2; }
     TypeDescriptor get_type_as_type_descriptor() const override {
-        return TypeDescriptor(TYPE_DATETIMEV2);
+        auto desc = TypeDescriptor(TYPE_DATETIMEV2);
+        desc.scale = _scale;
+        return desc;
     }
 
     doris::FieldType get_storage_field_type() const override {
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp 
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index 7077229d861..d5b2e1e70d7 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -157,6 +157,12 @@ Status 
DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl
     return Status::OK();
 }
 
+void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                   int start, int end,
+                                                   const cctz::time_zone& ctz) 
const {
+    _read_column_from_arrow<false>(column, arrow_array, start, end, ctz);
+}
+
 void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
                                                 arrow::ArrayBuilder* 
array_builder, int start,
                                                 int end, const 
cctz::time_zone& ctz) const {
@@ -196,9 +202,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type 
unit) {
     }
 }
 
-void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                 int start, int end,
-                                                 const cctz::time_zone& ctz) 
const {
+template <bool is_date>
+void DataTypeDate64SerDe::_read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                  int start, int end,
+                                                  const cctz::time_zone& ctz) 
const {
     auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
     int64_t divisor = 1;
     int64_t multiplier = 1;
@@ -237,13 +244,15 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
         }
     } else if (arrow_array->type()->id() == arrow::Type::STRING) {
         // to be compatible with old version, we use string type for date.
-        auto concrete_array = dynamic_cast<const 
arrow::StringArray*>(arrow_array);
-        for (size_t value_i = start; value_i < end; ++value_i) {
-            Int64 val = 0;
+        const auto* concrete_array = dynamic_cast<const 
arrow::StringArray*>(arrow_array);
+        for (auto value_i = start; value_i < end; ++value_i) {
             auto val_str = concrete_array->GetString(value_i);
-            ReadBuffer rb(val_str.data(), val_str.size());
-            read_datetime_text_impl(val, rb, ctz);
-            col_data.emplace_back(val);
+            VecDateTimeValue v;
+            v.from_date_str(val_str.c_str(), val_str.length(), ctz);
+            if constexpr (is_date) {
+                v.cast_to_date();
+            }
+            col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
         }
     } else {
         throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
@@ -251,6 +260,12 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
     }
 }
 
+void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int start, int end,
+                                                 const cctz::time_zone& ctz) 
const {
+    _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
+}
+
 template <bool is_binary_format>
 Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,
                                                    
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h 
b/be/src/vec/data_types/serde/data_type_date64_serde.h
index 497ac2aeff4..5f5fc4f1c38 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -73,6 +73,11 @@ public:
                                int start, int end,
                                std::vector<StringRef>& buffer_list) const 
override;
 
+protected:
+    template <bool is_date>
+    void _read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                 int end, const cctz::time_zone& ctz) const;
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
@@ -94,6 +99,8 @@ public:
     Status deserialize_column_from_json_vector(IColumn& column, 
std::vector<Slice>& slices,
                                                int* num_deserialized,
                                                const FormatOptions& options) 
const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 12d30961e3e..e8dd2274765 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -160,7 +160,10 @@ void 
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
             // convert second
             v.from_unixtime(utc_epoch / divisor, ctz);
             // get rest time
-            v.set_microsecond(utc_epoch % divisor);
+            // add 0 on the right to make it 6 digits. DateTimeV2Value 
microsecond is 6 digits,
+            // the scale decides to keep the first few digits, so the valid 
digits should be kept at the front.
+            // "2022-01-01 11:11:11.111", utc_epoch = 1641035471111, divisor = 
1000, set_microsecond(111000)
+            v.set_microsecond((utc_epoch % divisor) * DIVISOR_FOR_MICRO / 
divisor);
             
col_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v));
         }
     } else {
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index 95109ee408c..f07c449851e 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -102,15 +102,10 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
                                                  int start, int end,
                                                  const cctz::time_zone& ctz) 
const {
     auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
-    auto concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
-    int64_t divisor = 1;
-    int64_t multiplier = 1;
-
-    multiplier = 24 * 60 * 60; // day => secs
-    for (size_t value_i = start; value_i < end; ++value_i) {
+    const auto* concrete_array = dynamic_cast<const 
arrow::Date32Array*>(arrow_array);
+    for (auto value_i = start; value_i < end; ++value_i) {
         DateV2Value<DateV2ValueType> v;
-        v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier,
-                        ctz);
+        v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
         col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, 
UInt32>(v));
     }
 }
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp 
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index d98f6cae2b0..92b69dbfca9 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -88,8 +88,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                                                     arrow::ArrayBuilder* 
array_builder, int start,
                                                     int end, const 
cctz::time_zone& ctz) const {
     auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
-    auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
     if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(27, 9);
         for (size_t i = start; i < end; ++i) {
@@ -108,6 +108,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
         }
         // TODO: decimal256
     } else if constexpr (std::is_same_v<T, Decimal128V3>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
         for (size_t i = start; i < end; ++i) {
@@ -125,6 +126,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                              array_builder->type()->name());
         }
     } else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
         for (size_t i = start; i < end; ++i) {
@@ -139,6 +141,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                              array_builder->type()->name());
         }
     } else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
         std::shared_ptr<arrow::DataType> s_decimal_ptr =
                 std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
         for (size_t i = start; i < end; ++i) {
@@ -152,6 +155,28 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
             checkArrowStatus(builder.Append(value), column.get_name(),
                              array_builder->type()->name());
         }
+    } else if constexpr (std::is_same_v<T, Decimal256>) {
+        auto& builder = 
reinterpret_cast<arrow::Decimal256Builder&>(*array_builder);
+        std::shared_ptr<arrow::DataType> s_decimal_ptr =
+                std::make_shared<arrow::Decimal256Type>(76, col.get_scale());
+        for (size_t i = start; i < end; ++i) {
+            if (null_map && (*null_map)[i]) {
+                checkArrowStatus(builder.AppendNull(), column.get_name(),
+                                 array_builder->type()->name());
+                continue;
+            }
+            auto p_value = wide::Int256(col.get_element(i));
+            using half_type = wide::Int256::base_type; // uint64_t
+            half_type a0 = p_value.items[wide::Int256::_impl::little(0)];
+            half_type a1 = p_value.items[wide::Int256::_impl::little(1)];
+            half_type a2 = p_value.items[wide::Int256::_impl::little(2)];
+            half_type a3 = p_value.items[wide::Int256::_impl::little(3)];
+
+            std::array<uint64_t, 4> word_array = {a0, a1, a2, a3};
+            arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray, 
word_array);
+            checkArrowStatus(builder.Append(value), column.get_name(),
+                             array_builder->type()->name());
+        }
     } else {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "write_column_to_arrow with type " + 
column.get_name());
@@ -162,14 +187,14 @@ template <typename T>
 void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
                                                      const arrow::Array* 
arrow_array, int start,
                                                      int end, const 
cctz::time_zone& ctz) const {
-    auto concrete_array = dynamic_cast<const 
arrow::DecimalArray*>(arrow_array);
-    const auto* arrow_decimal_type =
-            static_cast<const arrow::DecimalType*>(arrow_array->type().get());
-    const auto arrow_scale = arrow_decimal_type->scale();
     auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
     // Decimal<Int128> for decimalv2
     // Decimal<Int128I> for deicmalv3
     if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+        const auto* concrete_array = dynamic_cast<const 
arrow::DecimalArray*>(arrow_array);
+        const auto* arrow_decimal_type =
+                static_cast<const 
arrow::DecimalType*>(arrow_array->type().get());
+        const auto arrow_scale = arrow_decimal_type->scale();
         // TODO check precision
         for (size_t value_i = start; value_i < end; ++value_i) {
             auto value = *reinterpret_cast<const vectorized::Decimal128V2*>(
@@ -195,7 +220,13 @@ void 
DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
         }
     } else if constexpr (std::is_same_v<T, Decimal128V3> || std::is_same_v<T, 
Decimal64> ||
                          std::is_same_v<T, Decimal32>) {
-        for (size_t value_i = start; value_i < end; ++value_i) {
+        const auto* concrete_array = dynamic_cast<const 
arrow::DecimalArray*>(arrow_array);
+        for (auto value_i = start; value_i < end; ++value_i) {
+            column_data.emplace_back(*reinterpret_cast<const 
T*>(concrete_array->Value(value_i)));
+        }
+    } else if constexpr (std::is_same_v<T, Decimal256>) {
+        const auto* concrete_array = dynamic_cast<const 
arrow::Decimal256Array*>(arrow_array);
+        for (auto value_i = start; value_i < end; ++value_i) {
             column_data.emplace_back(*reinterpret_cast<const 
T*>(concrete_array->Value(value_i)));
         }
     } else {
@@ -257,7 +288,7 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const 
std::string& timezone,
         for (size_t row_id = start; row_id < end; row_id++) {
             if (cur_batch->notNull[row_id] == 1) {
                 auto& v = col_data[row_id];
-                orc::Int128 value(v >> 64, (uint64_t)v);
+                orc::Int128 value(v >> 64, (uint64_t)v); // TODO, Decimal256 
will lose precision
                 cur_batch->values[row_id] = value;
             }
         }
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp 
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 612c9ce4222..f59fc712d98 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -165,13 +165,19 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& 
column, const arrow::Arr
                     buffer->data() + concrete_array->value_offset(offset_i));
             const auto raw_data_len = concrete_array->value_length(offset_i);
 
-            IPv6 ipv6_val;
-            if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) {
-                throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                       "parse number fail, string: '{}'",
-                                       std::string(raw_data, 
raw_data_len).c_str());
+            if (raw_data_len == 0) {
+                col_data.emplace_back(0);
+            } else {
+                IPv6 ipv6_val;
+                if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) 
{
+                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                           "parse number fail, string: '{}'",
+                                           std::string(raw_data, 
raw_data_len).c_str());
+                }
+                col_data.emplace_back(ipv6_val);
             }
-            col_data.emplace_back(ipv6_val);
+        } else {
+            col_data.emplace_back(0);
         }
     }
 }
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index f4fb6bbbb1f..522cf02c75f 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -215,14 +215,20 @@ void 
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
                 const auto* raw_data = buffer->data() + 
concrete_array->value_offset(offset_i);
                 const auto raw_data_len = 
concrete_array->value_length(offset_i);
 
-                Int128 val = 0;
-                ReadBuffer rb(raw_data, raw_data_len);
-                if (!read_int_text_impl(val, rb)) {
-                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                           "parse number fail, string: '{}'",
-                                           std::string(rb.position(), 
rb.count()).c_str());
+                if (raw_data_len == 0) {
+                    col_data.emplace_back(Int128()); // Int128() is NULL
+                } else {
+                    Int128 val = 0;
+                    ReadBuffer rb(raw_data, raw_data_len);
+                    if (!read_int_text_impl(val, rb)) {
+                        throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                               "parse number fail, string: 
'{}'",
+                                               std::string(rb.position(), 
rb.count()).c_str());
+                    }
+                    col_data.emplace_back(val);
                 }
-                col_data.emplace_back(val);
+            } else {
+                col_data.emplace_back(Int128()); // Int128() is NULL
             }
         }
         return;
diff --git a/be/test/vec/data_types/common_data_type_serder_test.h 
b/be/test/vec/data_types/common_data_type_serder_test.h
new file mode 100644
index 00000000000..c5db1d16f18
--- /dev/null
+++ b/be/test/vec/data_types/common_data_type_serder_test.h
@@ -0,0 +1,445 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <arrow/record_batch.h>
+#include <gen_cpp/data.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <memory>
+
+#include "arrow/array/array_base.h"
+#include "arrow/type.h"
+#include "runtime/descriptors.h"
+#include "util/arrow/block_convertor.h"
+#include "util/arrow/row_batch.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/runtime/ipv6_value.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+// this test is gonna to be a data type serialize and deserialize functions
+// such as
+// 1. standard hive text ser-deserialize
+//  deserialize_one_cell_from_hive_text (IColumn &column, Slice &slice, const 
FormatOptions &options, int hive_text_complex_type_delimiter_level=1) const
+//  deserialize_column_from_hive_text_vector (IColumn &column, std::vector< 
Slice > &slices, int *num_deserialized, const FormatOptions &options, int 
hive_text_complex_type_delimiter_level=1) const
+//  serialize_one_cell_to_hive_text (const IColumn &column, int row_num, 
BufferWritable &bw, FormatOptions &options, int 
hive_text_complex_type_delimiter_level=1) const
+// 2. json format ser-deserialize which used in table not in doris database
+//  serialize_one_cell_to_json (const IColumn &column, int row_num, 
BufferWritable &bw, FormatOptions &options) const =0
+//  serialize_column_to_json (const IColumn &column, int start_idx, int 
end_idx, BufferWritable &bw, FormatOptions &options) const =0
+//  deserialize_one_cell_from_json (IColumn &column, Slice &slice, const 
FormatOptions &options) const =0
+//  deserialize_column_from_json_vector (IColumn &column, std::vector< Slice > 
&slices, uint64_t *num_deserialized, const FormatOptions &options) const =0
+//  deserialize_column_from_fixed_json (IColumn &column, Slice &slice, 
uint64_t rows, uint64_t *num_deserialized, const FormatOptions &options) const
+//  insert_column_last_value_multiple_times (IColumn &column, uint64_t times) 
const
+// 3. fe|be protobuffer ser-deserialize
+//  write_column_to_pb (const IColumn &column, PValues &result, int start, int 
end) const =0
+//  read_column_from_pb (IColumn &column, const PValues &arg) const =0
+// 4. jsonb ser-deserialize which used in row-store situation
+//  write_one_cell_to_jsonb (const IColumn &column, JsonbWriter &result, Arena 
*mem_pool, int32_t col_id, int row_num) const =0
+//  read_one_cell_from_jsonb (IColumn &column, const JsonbValue *arg) const =0
+// 5. mysql text ser-deserialize
+//  write_column_to_mysql (const IColumn &column, MysqlRowBuffer< false > 
&row_buffer, int row_idx, bool col_const, const FormatOptions &options) const =0
+//  write_column_to_mysql (const IColumn &column, MysqlRowBuffer< true > 
&row_buffer, int row_idx, bool col_const, const FormatOptions &options) const =0
+// 6. arrow ser-deserialize which used in spark-flink connector
+//  write_column_to_arrow (const IColumn &column, const NullMap *null_map, 
arrow::ArrayBuilder *array_builder, int start, int end, const cctz::time_zone 
&ctz) const =0
+//  read_column_from_arrow (IColumn &column, const arrow::Array *arrow_array, 
int start, int end, const cctz::time_zone &ctz) const =0
+// 7. rapidjson ser-deserialize
+//  write_one_cell_to_json (const IColumn &column, rapidjson::Value &result, 
rapidjson::Document::AllocatorType &allocator, Arena &mem_pool, int row_num) 
const
+//  read_one_cell_from_json (IColumn &column, const rapidjson::Value &result) 
const
+//  convert_field_to_rapidjson (const vectorized::Field &field, 
rapidjson::Value &target, rapidjson::Document::AllocatorType &allocator)
+//  convert_array_to_rapidjson (const vectorized::Array &array, 
rapidjson::Value &target, rapidjson::Document::AllocatorType &allocator)
+
+namespace doris::vectorized {
+
+class CommonDataTypeSerdeTest : public ::testing::Test {
+public:
+    
////==================================================================================================================
+    // this is common function to check data in column against expected 
results according different function in assert function
+    // which can be used in all column test
+    // such as run regress tests
+    //  step1. we can set gen_check_data_in_assert to true, then we will 
generate a file for check data, otherwise we will read the file to check data
+    //  step2. we should write assert callback function to check data
+    static void check_data(
+            MutableColumns& columns, DataTypeSerDeSPtrs serders, char 
col_spliter,
+            std::set<int> idxes, const std::string& column_data_file,
+            std::function<void(MutableColumns& load_cols, DataTypeSerDeSPtrs 
serders)>
+                    assert_callback,
+            bool is_hive_format = false, DataTypes dataTypes = {}) {
+        ASSERT_EQ(serders.size(), columns.size());
+        // Step 1: Insert data from `column_data_file` into the column and 
check result with `check_data_file`
+        // Load column data and expected data from CSV files
+        std::vector<std::vector<std::string>> res;
+        struct stat buff;
+        if (stat(column_data_file.c_str(), &buff) == 0) {
+            if (S_ISREG(buff.st_mode)) {
+                // file
+                if (is_hive_format) {
+                    load_data_and_assert_from_csv<true, true>(serders, 
columns, column_data_file,
+                                                              col_spliter, 
idxes);
+                } else {
+                    load_data_and_assert_from_csv<false, true>(serders, 
columns, column_data_file,
+                                                               col_spliter, 
idxes);
+                }
+            } else if (S_ISDIR(buff.st_mode)) {
+                // dir
+                std::filesystem::path fs_path(column_data_file);
+                for (const auto& entry : 
std::filesystem::directory_iterator(fs_path)) {
+                    std::string file_path = entry.path().string();
+                    std::cout << "load data from file: " << file_path << 
std::endl;
+                    if (is_hive_format) {
+                        load_data_and_assert_from_csv<true, true>(serders, 
columns, file_path,
+                                                                  col_spliter, 
idxes);
+                    } else {
+                        load_data_and_assert_from_csv<false, true>(serders, 
columns, file_path,
+                                                                   
col_spliter, idxes);
+                    }
+                }
+            }
+        }
+
+        // Step 2: Validate the data in `column` matches `expected_data`
+        assert_callback(columns, serders);
+    }
+
+    // Helper function to load data from CSV, with index which splited by 
spliter and load to columns
+    template <bool is_hive_format, bool generate_res_file>
+    static void load_data_and_assert_from_csv(const DataTypeSerDeSPtrs serders,
+                                              MutableColumns& columns, const 
std::string& file_path,
+                                              const char spliter = ';',
+                                              const std::set<int> idxes = {0}) 
{
+        ASSERT_EQ(serders.size(), columns.size())
+                << "serder size: " << serders.size() << " column size: " << 
columns.size();
+        ASSERT_EQ(serders.size(), idxes.size())
+                << "serder size: " << serders.size() << " idxes size: " << 
idxes.size();
+        std::ifstream file(file_path);
+        if (!file) {
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "can not open 
the file: {} ",
+                                   file_path);
+        }
+
+        std::string line;
+        DataTypeSerDe::FormatOptions options;
+        std::vector<std::vector<std::string>> res;
+        MutableColumns assert_str_cols(columns.size());
+        for (size_t i = 0; i < columns.size(); ++i) {
+            assert_str_cols[i] = ColumnString::create();
+        }
+
+        while (std::getline(file, line)) {
+            std::stringstream lineStream(line);
+            std::string value;
+            int l_idx = 0;
+            int c_idx = 0;
+            std::vector<std::string> row;
+            while (std::getline(lineStream, value, spliter)) {
+                if (!value.starts_with("//") && idxes.contains(l_idx)) {
+                    // load csv data
+                    Slice string_slice(value.data(), value.size());
+                    Status st;
+                    // deserialize data
+                    if constexpr (is_hive_format) {
+                        st = 
serders[c_idx]->deserialize_one_cell_from_hive_text(
+                                *columns[c_idx], string_slice, options);
+                    } else {
+                        st = 
serders[c_idx]->deserialize_one_cell_from_json(*columns[c_idx],
+                                                                            
string_slice, options);
+                    }
+                    if (!st.ok()) {
+                        // deserialize if happen error now we do not insert 
any value for input column
+                        // so we push a default value to column for row 
alignment
+                        columns[c_idx]->insert_default();
+                        std::cout << "error in deserialize but continue: " << 
st.to_string()
+                                  << std::endl;
+                    }
+                    // serialize data
+                    size_t row_num = columns[c_idx]->size() - 1;
+                    assert_str_cols[c_idx]->reserve(columns[c_idx]->size());
+                    VectorBufferWriter 
bw(assert_cast<ColumnString&>(*assert_str_cols[c_idx]));
+                    if constexpr (is_hive_format) {
+                        st = 
serders[c_idx]->serialize_one_cell_to_hive_text(*columns[c_idx],
+                                                                             
row_num, bw, options);
+                        EXPECT_TRUE(st.ok()) << st.to_string();
+                    } else {
+                        st = 
serders[c_idx]->serialize_one_cell_to_json(*columns[c_idx], row_num,
+                                                                        bw, 
options);
+                        EXPECT_TRUE(st.ok()) << st.to_string();
+                    }
+                    bw.commit();
+                    // assert data : origin data and serialized data should be 
equal or generated
+                    // file to check data
+                    size_t assert_size = assert_str_cols[c_idx]->size();
+                    if constexpr (!generate_res_file) {
+                        
EXPECT_EQ(assert_str_cols[c_idx]->get_data_at(assert_size - 1).to_string(),
+                                  string_slice.to_string())
+                                << "column: " << columns[c_idx]->get_name() << 
" row: " << row_num
+                                << " is_hive_format: " << is_hive_format;
+                    }
+                    ++c_idx;
+                }
+                res.push_back(row);
+                ++l_idx;
+            }
+        }
+
+        if (generate_res_file) {
+            // generate res
+            auto pos = file_path.find_last_of(".");
+            std::string hive_format = is_hive_format ? "_hive" : "";
+            std::string res_file = file_path.substr(0, pos) + hive_format + 
"_serde_res.csv";
+            std::ofstream res_f(res_file);
+            if (!res_f.is_open()) {
+                throw std::ios_base::failure("Failed to open file." + 
res_file);
+            }
+            for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) {
+                for (size_t c = 0; c < assert_str_cols.size(); ++c) {
+                    res_f << assert_str_cols[c]->get_data_at(r).to_string() << 
spliter;
+                }
+                res_f << std::endl;
+            }
+            res_f.close();
+            std::cout << "generate res file: " << res_file << std::endl;
+        }
+    }
+
+    // standard hive text ser-deserialize assert function
+    // pb serde now is only used RPCFncall and fold_constant_executor which 
just write column data to pb value means
+    // just call write_column_to_pb
+    static void assert_pb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs 
serders) {
+        for (size_t i = 0; i < load_cols.size(); ++i) {
+            auto& col = load_cols[i];
+            std::cout << " now we are testing column : " << col->get_name() << 
std::endl;
+            // serialize to pb
+            PValues pv = PValues();
+            Status st = serders[i]->write_column_to_pb(*col, pv, 0, 
col->size());
+            if (!st.ok()) {
+                std::cerr << "write_column_to_pb error: " << st.msg() << 
std::endl;
+                continue;
+            }
+            // deserialize from pb
+            auto except_column = col->clone_empty();
+            st = serders[i]->read_column_from_pb(*except_column, pv);
+            EXPECT_TRUE(st.ok()) << st.to_string();
+            // check pb value from expected column
+            PValues as_pv = PValues();
+            st = serders[i]->write_column_to_pb(*except_column, as_pv, 0, 
except_column->size());
+            EXPECT_TRUE(st.ok()) << st.to_string();
+            EXPECT_EQ(pv.bytes_value_size(), as_pv.bytes_value_size());
+            // check column value
+            for (size_t j = 0; j < col->size(); ++j) {
+                auto cell = col->operator[](j);
+                auto except_cell = except_column->operator[](j);
+                EXPECT_EQ(cell, except_cell) << "column: " << col->get_name() 
<< " row: " << j;
+            }
+        }
+    }
+
+    // actually this is block_to_jsonb and jsonb_to_block test
+    // static void assert_jsonb_format(MutableColumns& load_cols, 
DataTypeSerDeSPtrs serders) {
+    //     Arena pool;
+    //     auto jsonb_column = ColumnString::create(); // jsonb column
+    //     // maybe these load_cols has different size, so we keep it same
+    //     size_t max_row_size = load_cols[0]->size();
+    //     for (size_t i = 1; i < load_cols.size(); ++i) {
+    //         if (load_cols[i]->size() > max_row_size) {
+    //             max_row_size = load_cols[i]->size();
+    //         }
+    //     }
+    //     // keep same rows
+    //     for (size_t i = 0; i < load_cols.size(); ++i) {
+    //         if (load_cols[i]->size() < max_row_size) {
+    //             load_cols[i]->insert_many_defaults(max_row_size - 
load_cols[i]->size());
+    //         } else if (load_cols[i]->size() > max_row_size) {
+    //             load_cols[i]->resize(max_row_size);
+    //         }
+    //     }
+    //     jsonb_column->reserve(load_cols[0]->size());
+    //     MutableColumns assert_cols;
+    //     for (size_t i = 0; i < load_cols.size(); ++i) {
+    //         assert_cols.push_back(load_cols[i]->assume_mutable());
+    //     }
+    //     for (size_t r = 0; r < load_cols[0]->size(); ++r) {
+    //         JsonbWriterT<JsonbOutStream> jw;
+    //         jw.writeStartObject();
+    //         // serialize to jsonb
+    //         for (size_t i = 0; i < load_cols.size(); ++i) {
+    //             auto& col = load_cols[i];
+    //             serders[i]->write_one_cell_to_jsonb(*col, jw, &pool, i, r);
+    //         }
+    //         jw.writeEndObject();
+    //         jsonb_column->insert_data(jw.getOutput()->getBuffer(), 
jw.getOutput()->getSize());
+    //     }
+    //     // deserialize jsonb column to assert column
+    //     EXPECT_EQ(jsonb_column->size(), load_cols[0]->size());
+    //     for (size_t r = 0; r < jsonb_column->size(); ++r) {
+    //         StringRef jsonb_data = jsonb_column->get_data_at(r);
+    //         auto pdoc = 
JsonbDocument::checkAndCreateDocument(jsonb_data.data, jsonb_data.size);
+    //         JsonbDocument& doc = *pdoc;
+    //         size_t cIdx = 0;
+    //         for (auto it = doc->begin(); it != doc->end(); ++it) {
+    //             serders[cIdx]->read_one_cell_from_jsonb(*assert_cols[cIdx], 
it->value());
+    //             ++cIdx;
+    //         }
+    //     }
+    //     // check column value
+    //     for (size_t i = 0; i < load_cols.size(); ++i) {
+    //         auto& col = load_cols[i];
+    //         auto& assert_col = assert_cols[i];
+    //         for (size_t j = 0; j < col->size(); ++j) {
+    //             auto cell = col->operator[](j);
+    //             auto assert_cell = assert_col->operator[](j);
+    //             EXPECT_EQ(cell, assert_cell) << "column: " << 
col->get_name() << " row: " << j;
+    //         }
+    //     }
+    // }
+
+    // assert mysql text format, now we just simple assert not to fatal or 
exception here
+    static void assert_mysql_format(MutableColumns& load_cols, 
DataTypeSerDeSPtrs serders) {
+        MysqlRowBuffer<false> row_buffer;
+        for (size_t i = 0; i < load_cols.size(); ++i) {
+            auto& col = load_cols[i];
+            for (size_t j = 0; j < col->size(); ++j) {
+                Status st;
+                EXPECT_NO_FATAL_FAILURE(
+                        st = serders[i]->write_column_to_mysql(*col, 
row_buffer, j, false, {}));
+                EXPECT_TRUE(st.ok()) << st.to_string();
+            }
+        }
+    }
+
+    // assert arrow serialize
+    static void assert_arrow_format(MutableColumns& load_cols, DataTypes 
types) {
+        // make a block to write to arrow
+        auto block = std::make_shared<Block>();
+        build_block(block, load_cols, types);
+        auto record_batch = serialize_arrow(block);
+        auto assert_block = std::make_shared<Block>(block->clone_empty());
+        deserialize_arrow(assert_block, record_batch);
+        compare_two_blocks(block, assert_block);
+    }
+
+    static void build_block(const std::shared_ptr<Block>& block, 
MutableColumns& load_cols,
+                            DataTypes types) {
+        // maybe these load_cols has different size, so we keep it same
+        size_t max_row_size = load_cols[0]->size();
+        for (size_t i = 1; i < load_cols.size(); ++i) {
+            if (load_cols[i]->size() > max_row_size) {
+                max_row_size = load_cols[i]->size();
+            }
+        }
+        // keep same rows
+        for (auto& load_col : load_cols) {
+            if (load_col->size() < max_row_size) {
+                load_col->insert_many_defaults(max_row_size - 
load_col->size());
+            } else if (load_col->size() > max_row_size) {
+                load_col->resize(max_row_size);
+            }
+        }
+        for (size_t i = 0; i < load_cols.size(); ++i) {
+            auto& col = load_cols[i];
+            block->insert(ColumnWithTypeAndName(std::move(col), types[i], 
types[i]->get_name()));
+        }
+        // print block
+        std::cout << "build block structure: " << block->dump_structure() << 
std::endl;
+        std::cout << "build block data: "
+                  << block->dump_data(0, std::min(max_row_size, 
static_cast<size_t>(5)))
+                  << std::endl;
+        for (int i = 0; i < block->columns(); i++) {
+            auto col = block->get_by_position(i);
+            std::cout << "col: " << i << ", " << col.column->get_name() << ", "
+                      << col.type->get_name() << ", " << col.to_string(0) << 
std::endl;
+        }
+    }
+
+    static std::shared_ptr<arrow::RecordBatch> serialize_arrow(
+            const std::shared_ptr<Block>& block) {
+        std::shared_ptr<arrow::Schema> block_arrow_schema;
+        EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema, 
"UTC"), Status::OK());
+        std::cout << "schema: " << block_arrow_schema->ToString(true) << 
std::endl;
+        // convert block to arrow
+        std::shared_ptr<arrow::RecordBatch> result;
+        cctz::time_zone _timezone_obj; //default UTC
+        Status stt = convert_to_arrow_batch(*block, block_arrow_schema,
+                                            arrow::default_memory_pool(), 
&result, _timezone_obj);
+        EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" << 
stt.to_string();
+        std::cout << "arrow serialize result: " << result->num_columns() << ", 
"
+                  << result->num_rows() << std::endl;
+        return result;
+    }
+
+    static void deserialize_arrow(const std::shared_ptr<Block>& new_block,
+                                  std::shared_ptr<arrow::RecordBatch> 
record_batch) {
+        // deserialize arrow to block
+        auto rows = record_batch->num_rows();
+        for (size_t i = 0; i < record_batch->num_columns(); ++i) {
+            auto array = record_batch->column(i);
+            std::cout << "arrow record_batch pos: " << i << ", array: " << 
array->ToString()
+                      << std::endl;
+            auto& column_with_type_and_name = new_block->get_by_position(i);
+            std::cout << "now we are testing column: "
+                      << column_with_type_and_name.column->get_name()
+                      << ", type: " << 
column_with_type_and_name.type->get_name() << std::endl;
+            auto ret =
+                    arrow_column_to_doris_column(array.get(), 0, 
column_with_type_and_name.column,
+                                                 
column_with_type_and_name.type, rows, "UTC");
+            // do check data
+            std::cout << "arrow_column_to_doris_column done, column data: "
+                      << column_with_type_and_name.to_string(0)
+                      << ", column size: " << 
column_with_type_and_name.column->size() << std::endl;
+            EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" << 
ret.to_string();
+        }
+        std::cout << "arrow deserialize block structure: " << 
new_block->dump_structure()
+                  << std::endl;
+        std::cout << "arrow deserialize block data: "
+                  << new_block->dump_data(
+                             0, std::min(static_cast<size_t>(rows), 
static_cast<size_t>(5)))
+                  << std::endl;
+    }
+
+    static void compare_two_blocks(const std::shared_ptr<Block>& frist_block,
+                                   const std::shared_ptr<Block>& second_block) 
{
+        for (size_t i = 0; i < frist_block->columns(); ++i) {
+            EXPECT_EQ(frist_block->get_by_position(i).type, 
second_block->get_by_position(i).type);
+            auto& col = frist_block->get_by_position(i).column;
+            auto& assert_col = second_block->get_by_position(i).column;
+            std::cout << "compare_two_blocks, column: " << col->get_name()
+                      << ", type: " << 
frist_block->get_by_position(i).type->get_name()
+                      << ", size: " << col->size() << ", assert size: " << 
assert_col->size()
+                      << std::endl;
+            EXPECT_EQ(assert_col->size(), col->size());
+            for (size_t j = 0; j < assert_col->size(); ++j) {
+                EXPECT_EQ(frist_block->get_by_position(i).to_string(j),
+                          second_block->get_by_position(i).to_string(j));
+                auto cell = col->operator[](j);
+                auto assert_cell = assert_col->operator[](j);
+                EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() 
<< " row: " << j;
+            }
+        }
+        EXPECT_EQ(frist_block->dump_data(), second_block->dump_data());
+    }
+
+    // assert rapidjson format
+    // now rapidjson write_one_cell_to_json and read_one_cell_from_json only 
used in column_object
+    // can just be replaced by jsonb format
+};
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/data_type_map_test.cpp 
b/be/test/vec/data_types/data_type_map_test.cpp
new file mode 100644
index 00000000000..548802b0d91
--- /dev/null
+++ b/be/test/vec/data_types/data_type_map_test.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_map.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "common/exception.h"
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeMapSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Map type nested Array and Struct and Map,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeMapTest, SerdeNestedTypeArrowTest) {
+    auto block = std::make_shared<Block>();
+    {
+        std::string col_name = "map_nesting_array";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr dt1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+        DataTypePtr dt2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f2));
+        DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+        Array a1, a2, a3, a4;
+        a1.push_back(Field("cute"));
+        a1.push_back(Null());
+        a2.push_back(Field("clever"));
+        a1.push_back(Field("hello"));
+        a3.push_back(1);
+        a3.push_back(2);
+        a4.push_back(11);
+        a4.push_back(22);
+
+        Array k1, v1;
+        k1.push_back(a1);
+        k1.push_back(a2);
+        v1.push_back(a3);
+        v1.push_back(a4);
+
+        Map m1;
+        m1.push_back(k1);
+        m1.push_back(v1);
+
+        MutableColumnPtr map_column = ma->create_column();
+        map_column->reserve(1);
+        map_column->insert(m1);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
ma, col_name);
+        block->insert(type_and_name);
+    }
+    {
+        std::string col_name = "map_nesting_struct";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+        DataTypePtr f3 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+        DataTypePtr f4 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr dt1 = std::make_shared<DataTypeNullable>(
+                std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f1, 
f2, f3}));
+        DataTypePtr dt2 = std::make_shared<DataTypeNullable>(
+                std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> 
{f4}));
+        DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+        Tuple t1, t2, t3, t4;
+        t1.push_back(Field("clever"));
+        t1.push_back(__int128_t(37));
+        t1.push_back(true);
+        t2.push_back("null");
+        t2.push_back(__int128_t(26));
+        t2.push_back(false);
+        t3.push_back(Field("cute"));
+        t4.push_back("null");
+
+        Array k1, v1;
+        k1.push_back(t1);
+        k1.push_back(t2);
+        v1.push_back(t3);
+        v1.push_back(t4);
+
+        Map m1;
+        m1.push_back(k1);
+        m1.push_back(v1);
+
+        MutableColumnPtr map_column = ma->create_column();
+        map_column->reserve(1);
+        map_column->insert(m1);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
ma, col_name);
+        block->insert(type_and_name);
+    }
+    {
+        std::string col_name = "map_nesting_map";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f3 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+        DataTypePtr f4 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+        DataTypePtr dt1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f1, f2));
+        DataTypePtr dt2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f3, f4));
+        DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+        Array k1, k2, k3, k4, v1, v2, v3, v4;
+        k1.push_back(1);
+        k1.push_back(2);
+        k2.push_back(11);
+        k2.push_back(22);
+        v1.push_back(Field("map"));
+        v1.push_back(Null());
+        v2.push_back(Field("clever map"));
+        v2.push_back(Field("hello map"));
+        k3.push_back(__int128_t(37));
+        k3.push_back(__int128_t(26));
+        k4.push_back(__int128_t(1111));
+        k4.push_back(__int128_t(432535423));
+        v3.push_back(true);
+        v3.push_back(false);
+        v4.push_back(false);
+        v4.push_back(true);
+
+        Map m11, m12, m21, m22;
+        m11.push_back(k1);
+        m11.push_back(v1);
+        m12.push_back(k2);
+        m12.push_back(v2);
+        m21.push_back(k3);
+        m21.push_back(v3);
+        m22.push_back(k4);
+        m22.push_back(v4);
+
+        Array kk1, vv1;
+        kk1.push_back(m11);
+        kk1.push_back(m12);
+        vv1.push_back(m21);
+        vv1.push_back(m22);
+
+        Map m1;
+        m1.push_back(kk1);
+        m1.push_back(vv1);
+
+        MutableColumnPtr map_column = ma->create_column();
+        map_column->reserve(1);
+        map_column->insert(m1);
+        vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
ma, col_name);
+        block->insert(type_and_name);
+    }
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/data_type_struct_test.cpp 
b/be/test/vec/data_types/data_type_struct_test.cpp
new file mode 100644
index 00000000000..1fc8aac0312
--- /dev/null
+++ b/be/test/vec/data_types/data_type_struct_test.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_struct.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeStructSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Struct type nested Array and Map and Struct,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeStructTest, SerdeNestedTypeArrowTest) {
+    auto block = std::make_shared<Block>();
+    {
+        std::string col_name = "struct_nesting_array_map_struct";
+        DataTypePtr f1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+        DataTypePtr f3 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f4 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+        DataTypePtr f5 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+        DataTypePtr f6 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+        DataTypePtr dt1 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+        DataTypePtr dt2 = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f2, f3));
+        DataTypePtr dt3 = std::make_shared<DataTypeNullable>(
+                std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f4, 
f5, f6}));
+        DataTypePtr st = 
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {dt1, dt2, dt3});
+
+        // nested Array
+        Array a1, a2;
+        a1.push_back(Field("array"));
+        a1.push_back(Null());
+        a2.push_back(Field("lucky array"));
+        a2.push_back(Field("cute array"));
+
+        // nested Map
+        Array k1, k2, v1, v2;
+        k1.push_back(1);
+        k1.push_back(2);
+        k2.push_back(11);
+        k2.push_back(22);
+        v1.push_back(Field("map"));
+        v1.push_back(Null());
+        v2.push_back(Field("clever map"));
+        v2.push_back(Field("hello map"));
+
+        Map m1, m2;
+        m1.push_back(k1);
+        m1.push_back(v1);
+        m2.push_back(k2);
+        m2.push_back(v2);
+
+        // nested Struct
+        Tuple t1, t2;
+        t1.push_back(Field("clever"));
+        t1.push_back(__int128_t(37));
+        t1.push_back(true);
+        t2.push_back("null");
+        t2.push_back(__int128_t(26));
+        t2.push_back(false);
+
+        // Struct
+        Tuple tt1, tt2;
+        tt1.push_back(a1);
+        tt1.push_back(m1);
+        tt1.push_back(t1);
+        tt2.push_back(a2);
+        tt2.push_back(m2);
+        tt2.push_back(t2);
+
+        MutableColumnPtr struct_column = st->create_column();
+        struct_column->reserve(2);
+        struct_column->insert(tt1);
+        struct_column->insert(tt2);
+        vectorized::ColumnWithTypeAndName 
type_and_name(struct_column->get_ptr(), st, col_name);
+        block->insert(type_and_name);
+    }
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
index eb960abdfc1..f4caa35069f 100644
--- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -31,11 +30,10 @@
 #include <gen_cpp/types.pb.h>
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
-#include <math.h>
-#include <stdint.h>
-#include <stdlib.h>
-#include <time.h>
+#include <gtest/gtest.h>
 
+#include <cmath>
+#include <cstdint>
 #include <iostream>
 #include <memory>
 #include <string>
@@ -43,26 +41,21 @@
 #include <utility>
 #include <vector>
 
-#include "gtest/gtest_pred_impl.h"
 #include "olap/hll.h"
 #include "runtime/descriptors.cpp"
-#include "runtime/descriptors.h"
 #include "util/arrow/block_convertor.h"
 #include "util/arrow/row_batch.h"
-#include "util/bitmap_value.h"
-#include "util/quantile_state.h"
 #include "util/string_parser.hpp"
 #include "vec/columns/column.h"
-#include "vec/columns/column_array.h"
 #include "vec/columns/column_complex.h"
 #include "vec/columns/column_decimal.h"
-#include "vec/columns/column_map.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_vector.h"
 #include "vec/core/block.h"
 #include "vec/core/field.h"
 #include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_bitmap.h"
@@ -79,62 +72,29 @@
 #include "vec/data_types/data_type_string.h"
 #include "vec/data_types/data_type_struct.h"
 #include "vec/data_types/data_type_time_v2.h"
-#include "vec/io/io_helper.h"
 #include "vec/runtime/vdatetime_value.h"
 #include "vec/utils/arrow_column_to_doris_column.h"
 
 namespace doris::vectorized {
 
-template <bool is_scalar>
-void serialize_and_deserialize_arrow_test() {
-    vectorized::Block block;
-    std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>> 
cols;
-    if constexpr (is_scalar) {
-        cols = {
-                {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false},
-                {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true},
-                {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING, 
false},
-                {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3, 
TYPE_DECIMAL128I, false},
-                {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11, 
TYPE_DATETIME, false},
-                {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN, 
false},
-                {"k5", FieldType::OLAP_FIELD_TYPE_DECIMAL32, 5, 
TYPE_DECIMAL32, false},
-                {"k6", FieldType::OLAP_FIELD_TYPE_DECIMAL64, 6, 
TYPE_DECIMAL64, false},
-                {"k12", FieldType::OLAP_FIELD_TYPE_DATETIMEV2, 12, 
TYPE_DATETIMEV2, false},
-                {"k8", FieldType::OLAP_FIELD_TYPE_IPV4, 8, TYPE_IPV4, false},
-                {"k9", FieldType::OLAP_FIELD_TYPE_IPV6, 9, TYPE_IPV6, false},
-        };
-    } else {
-        cols = {{"a", FieldType::OLAP_FIELD_TYPE_ARRAY, 6, TYPE_ARRAY, true},
-                {"m", FieldType::OLAP_FIELD_TYPE_MAP, 8, TYPE_MAP, true},
-                {"s", FieldType::OLAP_FIELD_TYPE_STRUCT, 5, TYPE_STRUCT, 
true}};
-    }
-
-    int row_num = 7;
-    // make desc and generate block
-    TupleDescriptor tuple_desc(PTupleDescriptor(), true);
-    for (auto t : cols) {
-        TSlotDescriptor tslot;
-        std::string col_name = std::get<0>(t);
-        tslot.__set_colName(col_name);
-        TypeDescriptor type_desc(std::get<3>(t));
-        bool is_nullable(std::get<4>(t));
-        switch (std::get<3>(t)) {
-        case TYPE_BOOLEAN:
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto vec = vectorized::ColumnVector<UInt8>::create();
-                auto& data = vec->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    data.push_back(i % 2);
-                }
-                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeUInt8>());
-                vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
-                                                                col_name);
-                block.insert(std::move(type_and_name));
+void serialize_and_deserialize_arrow_test(std::vector<PrimitiveType> cols, int 
row_num,
+                                          bool is_nullable) {
+    auto block = std::make_shared<Block>();
+    for (int i = 0; i < cols.size(); i++) {
+        std::string col_name = std::to_string(i);
+        TypeDescriptor type_desc(cols[i]);
+        switch (cols[i]) {
+        case TYPE_BOOLEAN: {
+            auto vec = vectorized::ColumnVector<UInt8>::create();
+            auto& data = vec->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                data.push_back(i % 2);
             }
-            break;
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeUInt8>());
+            vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, col_name);
+            block->insert(std::move(type_and_name));
+        } break;
         case TYPE_INT:
-            tslot.__set_slotType(type_desc.to_thrift());
             if (is_nullable) {
                 {
                     auto column_vector_int32 = 
vectorized::ColumnVector<Int32>::create();
@@ -152,7 +112,7 @@ void serialize_and_deserialize_arrow_test() {
                             std::make_shared<vectorized::DataTypeInt32>());
                     vectorized::ColumnWithTypeAndName type_and_name(
                             mutable_nullable_vector->get_ptr(), data_type, 
col_name);
-                    block.insert(type_and_name);
+                    block->insert(type_and_name);
                 }
             } else {
                 auto vec = vectorized::ColumnVector<Int32>::create();
@@ -163,13 +123,12 @@ void serialize_and_deserialize_arrow_test() {
                 vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
                 vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
                                                                 col_name);
-                block.insert(std::move(type_and_name));
+                block->insert(std::move(type_and_name));
             }
             break;
         case TYPE_DECIMAL32:
             type_desc.precision = 9;
             type_desc.scale = 2;
-            tslot.__set_slotType(type_desc.to_thrift());
             {
                 vectorized::DataTypePtr decimal_data_type =
                         
std::make_shared<DataTypeDecimal<Decimal32>>(type_desc.precision,
@@ -197,13 +156,12 @@ void serialize_and_deserialize_arrow_test() {
 
                 vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
                                                                 
decimal_data_type, col_name);
-                block.insert(type_and_name);
+                block->insert(type_and_name);
             }
             break;
         case TYPE_DECIMAL64:
             type_desc.precision = 18;
             type_desc.scale = 6;
-            tslot.__set_slotType(type_desc.to_thrift());
             {
                 vectorized::DataTypePtr decimal_data_type =
                         
std::make_shared<DataTypeDecimal<Decimal64>>(type_desc.precision,
@@ -229,13 +187,12 @@ void serialize_and_deserialize_arrow_test() {
                 }
                 vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
                                                                 
decimal_data_type, col_name);
-                block.insert(type_and_name);
+                block->insert(type_and_name);
             }
             break;
         case TYPE_DECIMAL128I:
             type_desc.precision = 27;
             type_desc.scale = 9;
-            tslot.__set_slotType(type_desc.to_thrift());
             {
                 vectorized::DataTypePtr decimal_data_type(
                         doris::vectorized::create_decimal(27, 9, true));
@@ -244,146 +201,125 @@ void serialize_and_deserialize_arrow_test() {
                                       decimal_column.get())
                                      ->get_data();
                 for (int i = 0; i < row_num; ++i) {
-                    __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 
8));
+                    auto value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
                     data.push_back(value);
                 }
                 vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
                                                                 
decimal_data_type, col_name);
-                block.insert(type_and_name);
+                block->insert(type_and_name);
             }
             break;
-        case TYPE_STRING:
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto strcol = vectorized::ColumnString::create();
-                for (int i = 0; i < row_num; ++i) {
-                    std::string is = std::to_string(i);
-                    strcol->insert_data(is.c_str(), is.size());
-                }
-                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
-                vectorized::ColumnWithTypeAndName 
type_and_name(strcol->get_ptr(), data_type,
-                                                                col_name);
-                block.insert(type_and_name);
+        case TYPE_STRING: {
+            auto strcol = vectorized::ColumnString::create();
+            for (int i = 0; i < row_num; ++i) {
+                std::string is = std::to_string(i);
+                strcol->insert_data(is.c_str(), is.size());
             }
-            break;
-        case TYPE_HLL:
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                vectorized::DataTypePtr 
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
-                auto hll_column = hll_data_type->create_column();
-                std::vector<HyperLogLog>& container =
-                        ((vectorized::ColumnHLL*)hll_column.get())->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    HyperLogLog hll;
-                    hll.update(i);
-                    container.push_back(hll);
-                }
-                vectorized::ColumnWithTypeAndName 
type_and_name(hll_column->get_ptr(),
-                                                                hll_data_type, 
col_name);
-
-                block.insert(type_and_name);
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+            vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), 
data_type, col_name);
+            block->insert(type_and_name);
+        } break;
+        case TYPE_HLL: {
+            vectorized::DataTypePtr 
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+            auto hll_column = hll_data_type->create_column();
+            std::vector<HyperLogLog>& container =
+                    ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                HyperLogLog hll;
+                hll.update(i);
+                container.push_back(hll);
             }
-            break;
-        case TYPE_DATEV2:
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto column_vector_date_v2 = 
vectorized::ColumnVector<vectorized::UInt32>::create();
-                auto& date_v2_data = column_vector_date_v2->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    DateV2Value<DateV2ValueType> value;
-                    value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
-                    
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
-                }
-                vectorized::DataTypePtr date_v2_type(
-                        std::make_shared<vectorized::DataTypeDateV2>());
-                vectorized::ColumnWithTypeAndName 
test_date_v2(column_vector_date_v2->get_ptr(),
-                                                               date_v2_type, 
col_name);
-                block.insert(test_date_v2);
+            vectorized::ColumnWithTypeAndName 
type_and_name(hll_column->get_ptr(), hll_data_type,
+                                                            col_name);
+
+            block->insert(type_and_name);
+        } break;
+        case TYPE_DATEV2: {
+            auto column_vector_date_v2 = 
vectorized::ColumnVector<vectorized::UInt32>::create();
+            auto& date_v2_data = column_vector_date_v2->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                DateV2Value<DateV2ValueType> value;
+                value.from_date_int64(20210501);
+                
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
             }
-            break;
+            vectorized::DataTypePtr 
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
+            vectorized::ColumnWithTypeAndName 
test_date_v2(column_vector_date_v2->get_ptr(),
+                                                           date_v2_type, 
col_name);
+            block->insert(test_date_v2);
+        } break;
         case TYPE_DATE: // int64
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto column_vector_date = 
vectorized::ColumnVector<vectorized::Int64>::create();
-                auto& date_data = column_vector_date->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    VecDateTimeValue value;
-                    value.from_date_int64(20210501);
-                    
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
-                }
-                vectorized::DataTypePtr 
date_type(std::make_shared<vectorized::DataTypeDate>());
-                vectorized::ColumnWithTypeAndName 
test_date(column_vector_date->get_ptr(),
-                                                            date_type, 
col_name);
-                block.insert(test_date);
+        {
+            auto column_vector_date = 
vectorized::ColumnVector<vectorized::Int64>::create();
+            auto& date_data = column_vector_date->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                VecDateTimeValue value;
+                value.from_date_int64(20210501);
+                
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
             }
-            break;
+            vectorized::DataTypePtr 
date_type(std::make_shared<vectorized::DataTypeDate>());
+            vectorized::ColumnWithTypeAndName 
test_date(column_vector_date->get_ptr(), date_type,
+                                                        col_name);
+            block->insert(test_date);
+        } break;
         case TYPE_DATETIME: // int64
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto column_vector_datetime = 
vectorized::ColumnVector<vectorized::Int64>::create();
-                auto& datetime_data = column_vector_datetime->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    VecDateTimeValue value;
-                    value.from_date_int64(20210501080910);
-                    
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
-                }
-                vectorized::DataTypePtr datetime_type(
-                        std::make_shared<vectorized::DataTypeDateTime>());
-                vectorized::ColumnWithTypeAndName 
test_datetime(column_vector_datetime->get_ptr(),
-                                                                datetime_type, 
col_name);
-                block.insert(test_datetime);
+        {
+            auto column_vector_datetime = 
vectorized::ColumnVector<vectorized::Int64>::create();
+            auto& datetime_data = column_vector_datetime->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                VecDateTimeValue value;
+                value.from_date_int64(20210501080910);
+                
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
             }
-            break;
+            vectorized::DataTypePtr 
datetime_type(std::make_shared<vectorized::DataTypeDateTime>());
+            vectorized::ColumnWithTypeAndName 
test_datetime(column_vector_datetime->get_ptr(),
+                                                            datetime_type, 
col_name);
+            block->insert(test_datetime);
+        } break;
         case TYPE_DATETIMEV2: // uint64
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                // 2022-01-01 11:11:11.111
-                auto column_vector_datetimev2 =
-                        vectorized::ColumnVector<vectorized::UInt64>::create();
-                //                auto& datetimev2_data = 
column_vector_datetimev2->get_data();
-                DateV2Value<DateTimeV2ValueType> value;
-                string date_literal = "2022-01-01 11:11:11.111";
-                value.from_date_str(date_literal.c_str(), date_literal.size());
-                char to[64] = {};
-                std::cout << "value: " << value.to_string(to) << std::endl;
-                for (int i = 0; i < row_num; ++i) {
-                    column_vector_datetimev2->insert(value.to_date_int_val());
-                }
-                vectorized::DataTypePtr datetimev2_type(
-                        std::make_shared<vectorized::DataTypeDateTimeV2>());
-                vectorized::ColumnWithTypeAndName test_datetimev2(
-                        column_vector_datetimev2->get_ptr(), datetimev2_type, 
col_name);
-                block.insert(test_datetimev2);
+        {
+            auto column_vector_datetimev2 = 
vectorized::ColumnVector<vectorized::UInt64>::create();
+            DateV2Value<DateTimeV2ValueType> value;
+            string date_literal = "2022-01-01 11:11:11.111";
+            cctz::time_zone ctz;
+            TimezoneUtils::find_cctz_time_zone("UTC", ctz);
+            EXPECT_TRUE(value.from_date_str(date_literal.c_str(), 
date_literal.size(), ctz, 3));
+            char to[64] = {};
+            std::cout << "value: " << value.to_string(to) << std::endl;
+            for (int i = 0; i < row_num; ++i) {
+                column_vector_datetimev2->insert(value.to_date_int_val());
             }
-            break;
+            vectorized::DataTypePtr datetimev2_type(
+                    std::make_shared<vectorized::DataTypeDateTimeV2>(3));
+            vectorized::ColumnWithTypeAndName 
test_datetimev2(column_vector_datetimev2->get_ptr(),
+                                                              datetimev2_type, 
col_name);
+            block->insert(test_datetimev2);
+        } break;
         case TYPE_ARRAY: // array
             type_desc.add_sub_type(TYPE_STRING, true);
-            tslot.__set_slotType(type_desc.to_thrift());
             {
                 DataTypePtr s =
                         
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
                 DataTypePtr au = std::make_shared<DataTypeArray>(s);
                 Array a1, a2;
-                a1.push_back(Field(String("sss")));
+                a1.push_back(Field("sss"));
                 a1.push_back(Null());
-                a1.push_back(Field(String("clever amory")));
-                a2.push_back(Field(String("hello amory")));
+                a1.push_back(Field("clever amory"));
+                a2.push_back(Field("hello amory"));
                 a2.push_back(Null());
-                a2.push_back(Field(String("cute amory")));
-                a2.push_back(Field(String("sf")));
+                a2.push_back(Field("cute amory"));
+                a2.push_back(Field("sf"));
                 MutableColumnPtr array_column = au->create_column();
                 array_column->reserve(2);
                 array_column->insert(a1);
                 array_column->insert(a2);
                 vectorized::ColumnWithTypeAndName 
type_and_name(array_column->get_ptr(), au,
                                                                 col_name);
-                block.insert(type_and_name);
+                block->insert(type_and_name);
             }
             break;
         case TYPE_MAP:
             type_desc.add_sub_type(TYPE_STRING, true);
             type_desc.add_sub_type(TYPE_STRING, true);
-            tslot.__set_slotType(type_desc.to_thrift());
             {
                 DataTypePtr s =
                         
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
@@ -416,14 +352,13 @@ void serialize_and_deserialize_arrow_test() {
                 map_column->insert(m1);
                 map_column->insert(m2);
                 vectorized::ColumnWithTypeAndName 
type_and_name(map_column->get_ptr(), m, col_name);
-                block.insert(type_and_name);
+                block->insert(type_and_name);
             }
             break;
         case TYPE_STRUCT:
             type_desc.add_sub_type(TYPE_STRING, "name", true);
             type_desc.add_sub_type(TYPE_LARGEINT, "age", true);
             type_desc.add_sub_type(TYPE_BOOLEAN, "is", true);
-            tslot.__set_slotType(type_desc.to_thrift());
             {
                 DataTypePtr s =
                         
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
@@ -434,10 +369,10 @@ void serialize_and_deserialize_arrow_test() {
                 DataTypePtr st =
                         
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m});
                 Tuple t1, t2;
-                t1.push_back(Field(String("amory cute")));
+                t1.push_back(Field("amory cute"));
                 t1.push_back(__int128_t(37));
                 t1.push_back(true);
-                t2.push_back(Field("null"));
+                t2.push_back("null");
                 t2.push_back(__int128_t(26));
                 t2.push_back(false);
                 MutableColumnPtr struct_column = st->create_column();
@@ -446,144 +381,61 @@ void serialize_and_deserialize_arrow_test() {
                 struct_column->insert(t2);
                 vectorized::ColumnWithTypeAndName 
type_and_name(struct_column->get_ptr(), st,
                                                                 col_name);
-                block.insert(type_and_name);
+                block->insert(type_and_name);
             }
             break;
-        case TYPE_IPV4:
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto vec = vectorized::ColumnIPv4::create();
-                auto& data = vec->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    data.push_back(i);
-                }
-                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeIPv4>());
-                vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
-                                                                col_name);
-                block.insert(std::move(type_and_name));
+        case TYPE_IPV4: {
+            auto vec = vectorized::ColumnIPv4::create();
+            auto& data = vec->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                data.push_back(i);
             }
-            break;
-        case TYPE_IPV6:
-            tslot.__set_slotType(type_desc.to_thrift());
-            {
-                auto vec = vectorized::ColumnIPv6::create();
-                auto& data = vec->get_data();
-                for (int i = 0; i < row_num; ++i) {
-                    data.push_back(i);
-                }
-                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeIPv6>());
-                vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
-                                                                col_name);
-                block.insert(std::move(type_and_name));
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeIPv4>());
+            vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, col_name);
+            block->insert(std::move(type_and_name));
+        } break;
+        case TYPE_IPV6: {
+            auto vec = vectorized::ColumnIPv6::create();
+            auto& data = vec->get_data();
+            for (int i = 0; i < row_num; ++i) {
+                data.push_back(i);
             }
-            break;
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeIPv6>());
+            vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, col_name);
+            block->insert(std::move(type_and_name));
+        } break;
         default:
-            break;
+            LOG(FATAL) << "error column type";
         }
-
-        tslot.__set_col_unique_id(std::get<2>(t));
-        SlotDescriptor* slot = new SlotDescriptor(tslot);
-        tuple_desc.add_slot(slot);
     }
-
-    RowDescriptor row_desc(&tuple_desc, true);
-    // arrow schema
-    std::shared_ptr<arrow::Schema> _arrow_schema;
-    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), 
Status::OK());
-
-    // serialize
-    std::shared_ptr<arrow::RecordBatch> result;
-    std::cout << "block data: " << block.dump_data(0, row_num) << std::endl;
-    std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << 
std::endl;
-
-    cctz::time_zone timezone_obj;
-    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
timezone_obj);
-    static_cast<void>(convert_to_arrow_batch(block, _arrow_schema, 
arrow::default_memory_pool(),
-                                             &result, timezone_obj));
-    Block new_block = block.clone_empty();
-    EXPECT_TRUE(result != nullptr);
-    std::cout << "result: " << result->ToString() << std::endl;
-    // deserialize
-    for (auto t : cols) {
-        std::string real_column_name = std::get<0>(t);
-        auto* array = result->GetColumnByName(real_column_name).get();
-        auto& column_with_type_and_name = 
new_block.get_by_name(real_column_name);
-        if (std::get<3>(t) == PrimitiveType::TYPE_DATE ||
-            std::get<3>(t) == PrimitiveType::TYPE_DATETIME) {
-            {
-                auto strcol = vectorized::ColumnString::create();
-                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
-                vectorized::ColumnWithTypeAndName 
type_and_name(strcol->get_ptr(), data_type,
-                                                                
real_column_name);
-                static_cast<void>(arrow_column_to_doris_column(
-                        array, 0, type_and_name.column, type_and_name.type, 
block.rows(), "UTC"));
-                {
-                    auto& col = 
column_with_type_and_name.column.get()->assume_mutable_ref();
-                    auto& date_data = 
static_cast<ColumnVector<Int64>&>(col).get_data();
-                    for (int i = 0; i < strcol->size(); ++i) {
-                        StringRef str = strcol->get_data_at(i);
-                        VecDateTimeValue value;
-                        value.from_date_str(str.data, str.size);
-                        
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
-                    }
-                }
-            }
-            continue;
-        } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) {
-            auto strcol = vectorized::ColumnString::create();
-            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
-            vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), 
data_type,
-                                                            real_column_name);
-            static_cast<void>(arrow_column_to_doris_column(
-                    array, 0, type_and_name.column, type_and_name.type, 
block.rows(), "UTC"));
-            {
-                auto& col = 
column_with_type_and_name.column.get()->assume_mutable_ref();
-                auto& date_data = 
static_cast<ColumnVector<UInt32>&>(col).get_data();
-                for (int i = 0; i < strcol->size(); ++i) {
-                    StringRef str = strcol->get_data_at(i);
-                    DateV2Value<DateV2ValueType> value;
-                    value.from_date_str(str.data, str.size);
-                    
date_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
-                }
-            }
-            continue;
-        } else if (std::get<3>(t) == PrimitiveType::TYPE_DATETIMEV2) {
-            // now we only support read doris datetimev2 to arrow
-            block.erase(real_column_name);
-            new_block.erase(real_column_name);
-            continue;
-        }
-        static_cast<void>(arrow_column_to_doris_column(array, 0, 
column_with_type_and_name.column,
-                                                       
column_with_type_and_name.type, block.rows(),
-                                                       "UTC"));
-    }
-
-    std::cout << block.dump_data() << std::endl;
-    std::cout << new_block.dump_data() << std::endl;
-    EXPECT_EQ(block.dump_data(), new_block.dump_data());
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
 }
 
 TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
-    serialize_and_deserialize_arrow_test<true>();
+    std::vector<PrimitiveType> cols = {
+            TYPE_INT,        TYPE_INT,       TYPE_STRING, TYPE_DECIMAL128I, 
TYPE_BOOLEAN,
+            TYPE_DECIMAL32,  TYPE_DECIMAL64, TYPE_IPV4,   TYPE_IPV6,        
TYPE_DATETIME,
+            TYPE_DATETIMEV2, TYPE_DATE,      TYPE_DATEV2,
+    };
+    serialize_and_deserialize_arrow_test(cols, 7, true);
+    serialize_and_deserialize_arrow_test(cols, 7, false);
 }
 
 TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) {
-    serialize_and_deserialize_arrow_test<false>();
+    std::vector<PrimitiveType> cols = {TYPE_ARRAY, TYPE_MAP, TYPE_STRUCT};
+    serialize_and_deserialize_arrow_test(cols, 7, true);
+    serialize_and_deserialize_arrow_test(cols, 7, false);
 }
 
 TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
-    TupleDescriptor tuple_desc(PTupleDescriptor(), true);
-    TSlotDescriptor tslot;
     std::string col_name = "map_null_key";
-    tslot.__set_colName(col_name);
-    TypeDescriptor type_desc(TYPE_MAP);
-    type_desc.add_sub_type(TYPE_STRING, true);
-    type_desc.add_sub_type(TYPE_INT, true);
-    tslot.__set_slotType(type_desc.to_thrift());
-    vectorized::Block block;
+    auto block = std::make_shared<Block>();
     {
         DataTypePtr s = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
-        ;
         DataTypePtr d = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
         DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
         Array k1, k2, v1, v2, k3, v3;
@@ -614,41 +466,14 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) 
{
         map_column->insert(m2);
         map_column->insert(m3);
         vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), 
m, col_name);
-        block.insert(type_and_name);
+        block->insert(type_and_name);
     }
 
-    tslot.__set_col_unique_id(1);
-    SlotDescriptor* slot = new SlotDescriptor(tslot);
-    tuple_desc.add_slot(slot);
-    RowDescriptor row_desc(&tuple_desc, true);
-    // arrow schema
-    std::shared_ptr<arrow::Schema> _arrow_schema;
-    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), 
Status::OK());
-
-    // serialize
-    std::shared_ptr<arrow::RecordBatch> result;
-    std::cout << "block structure: " << block.dump_structure() << std::endl;
-    std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << 
std::endl;
-
-    cctz::time_zone timezone_obj;
-    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
timezone_obj);
-    static_cast<void>(convert_to_arrow_batch(block, _arrow_schema, 
arrow::default_memory_pool(),
-                                             &result, timezone_obj));
-    Block new_block = block.clone_empty();
-    EXPECT_TRUE(result != nullptr);
-    std::cout << "result: " << result->ToString() << std::endl;
-    // deserialize
-    auto* array = result->GetColumnByName(col_name).get();
-    auto& column_with_type_and_name = new_block.get_by_name(col_name);
-    static_cast<void>(arrow_column_to_doris_column(array, 0, 
column_with_type_and_name.column,
-                                                   
column_with_type_and_name.type, block.rows(),
-                                                   "UTC"));
-    std::cout << block.dump_data() << std::endl;
-    std::cout << new_block.dump_data() << std::endl;
-    // new block row_index 0, 2 which row has key null will be filter
-    EXPECT_EQ(new_block.dump_one_line(0, 1), "{\"doris\":null, \"clever 
amory\":30}");
-    EXPECT_EQ(new_block.dump_one_line(2, 1), "{\"test\":11}");
-    EXPECT_EQ(block.dump_data(1, 1), new_block.dump_data(1, 1));
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
 }
 
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 19b21c16a45..be8c3dfd201 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -294,8 +294,10 @@ static doris::TupleDescriptor* create_tuple_desc(
             
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 
9).to_thrift());
         } else {
             TypeDescriptor descriptor(column_descs[i].type);
-            if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) {
+            if (column_descs[i].precision >= 0) {
                 descriptor.precision = column_descs[i].precision;
+            }
+            if (column_descs[i].scale >= 0) {
                 descriptor.scale = column_descs[i].scale;
             }
             t_slot_desc.__set_slotType(descriptor.to_thrift());
diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp
index 4c075cba848..76982d399fb 100644
--- a/be/test/vec/exprs/vexpr_test.cpp
+++ b/be/test/vec/exprs/vexpr_test.cpp
@@ -93,8 +93,10 @@ static doris::TupleDescriptor* create_tuple_desc(
             
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 
9).to_thrift());
         } else {
             TypeDescriptor descriptor(column_descs[i].type);
-            if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) {
+            if (column_descs[i].precision >= 0) {
                 descriptor.precision = column_descs[i].precision;
+            }
+            if (column_descs[i].scale >= 0) {
                 descriptor.scale = column_descs[i].scale;
             }
             t_slot_desc.__set_slotType(descriptor.to_thrift());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to