This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a4cbcdf9 [Refact](type system) refact column with arrow serde 
(#19091)
e9a4cbcdf9 is described below

commit e9a4cbcdf9442e3577e54e5742d9c26335453551
Author: amory <wangqian...@selectdb.com>
AuthorDate: Thu May 4 15:28:46 2023 +0800

    [Refact](type system) refact column with arrow serde (#19091)
    
    * refact arrow serde
    
    * add date serde
    
    * update arrow and fix nullable and date type
---
 be/src/util/arrow/block_convertor.cpp              |   6 +-
 be/src/vec/CMakeLists.txt                          |   3 +
 be/src/vec/columns/column.h                        |   1 +
 be/src/vec/data_types/data_type_date.h             |   3 +
 be/src/vec/data_types/data_type_date_time.h        |   3 +
 be/src/vec/data_types/data_type_time_v2.h          |   6 +-
 .../vec/data_types/serde/data_type_array_serde.cpp |  40 +++
 .../vec/data_types/serde/data_type_array_serde.h   |   6 +
 .../vec/data_types/serde/data_type_bitmap_serde.h  |   9 +
 .../data_types/serde/data_type_date64_serde.cpp    | 111 +++++++
 ...pe_array_serde.cpp => data_type_date64_serde.h} |  41 +--
 .../serde/data_type_datetimev2_serde.cpp           |  49 +++
 ..._array_serde.h => data_type_datetimev2_serde.h} |  42 ++-
 .../data_types/serde/data_type_datev2_serde.cpp    |  68 +++++
 ...pe_array_serde.cpp => data_type_datev2_serde.h} |  41 +--
 .../data_types/serde/data_type_decimal_serde.cpp   | 133 ++++++++-
 .../vec/data_types/serde/data_type_decimal_serde.h |   6 +
 .../serde/data_type_fixedlengthobject_serde.h      |   9 +
 .../vec/data_types/serde/data_type_hll_serde.cpp   |  21 ++
 be/src/vec/data_types/serde/data_type_hll_serde.h  |   7 +
 .../vec/data_types/serde/data_type_map_serde.cpp   |  12 +
 be/src/vec/data_types/serde/data_type_map_serde.h  |   5 +
 .../data_types/serde/data_type_nullable_serde.cpp  |  37 +++
 .../data_types/serde/data_type_nullable_serde.h    |   5 +
 .../data_types/serde/data_type_number_serde.cpp    | 110 ++++++-
 .../vec/data_types/serde/data_type_number_serde.h  |  12 +
 .../vec/data_types/serde/data_type_object_serde.h  |  10 +
 .../serde/data_type_quantilestate_serde.h          |   9 +
 be/src/vec/data_types/serde/data_type_serde.h      |  24 ++
 .../data_types/serde/data_type_string_serde.cpp    |  60 ++++
 .../vec/data_types/serde/data_type_string_serde.h  |   6 +
 .../data_types/serde/data_type_struct_serde.cpp    |  12 +
 .../vec/data_types/serde/data_type_struct_serde.h  |   6 +
 be/src/vec/utils/arrow_column_to_doris_column.cpp  | 329 +--------------------
 be/test/CMakeLists.txt                             |   3 +-
 .../serde/data_type_serde_arrow_test.cpp           | 321 ++++++++++++++++++++
 .../data_types/serde/data_type_serde_pb_test.cpp   | 200 +++++++++++++
 37 files changed, 1375 insertions(+), 391 deletions(-)

diff --git a/be/src/util/arrow/block_convertor.cpp 
b/be/src/util/arrow/block_convertor.cpp
index 2b426ca2f2..89a1e09a73 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -389,10 +389,8 @@ Status 
FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
             return to_status(arrow_st);
         }
         _cur_builder = builder.get();
-        arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this);
-        if (!arrow_st.ok()) {
-            return to_status(arrow_st);
-        }
+        _cur_type->get_serde()->write_column_to_arrow(*_cur_col, nullptr, 
_cur_builder, _cur_start,
+                                                      _cur_start + _cur_rows);
         arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]);
         if (!arrow_st.ok()) {
             return to_status(arrow_st);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 6c82623816..086a301456 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -87,6 +87,9 @@ set(VEC_FILES
   data_types/serde/data_type_array_serde.cpp
   data_types/serde/data_type_struct_serde.cpp
   data_types/serde/data_type_number_serde.cpp
+  data_types/serde/data_type_datev2_serde.cpp
+  data_types/serde/data_type_datetimev2_serde.cpp
+  data_types/serde/data_type_date64_serde.cpp
   data_types/serde/data_type_string_serde.cpp
   data_types/serde/data_type_decimal_serde.cpp
   data_types/serde/data_type_object_serde.cpp
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index e30c35ba31..8bf83b533c 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -148,6 +148,7 @@ public:
     virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t> 
rowset_segment_id) {}
 
     virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const { 
return {}; }
+    // todo(Amory) from column to get data type is not correct ,column is 
memory data,can not to assume memory data belong to which data type
     virtual TypeIndex get_data_type() const {
         LOG(FATAL) << "Cannot get_data_type() column " << get_name();
         __builtin_unreachable();
diff --git a/be/src/vec/data_types/data_type_date.h 
b/be/src/vec/data_types/data_type_date.h
index 6648932d5f..54e45e63d3 100644
--- a/be/src/vec/data_types/data_type_date.h
+++ b/be/src/vec/data_types/data_type_date.h
@@ -32,6 +32,7 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_number_base.h"
+#include "vec/data_types/serde/data_type_date64_serde.h"
 
 namespace doris {
 namespace vectorized {
@@ -64,6 +65,8 @@ public:
     static void cast_to_date(Int64& x);
 
     MutableColumnPtr create_column() const override;
+
+    DataTypeSerDeSPtr get_serde() const override { return 
std::make_shared<DataTypeDate64SerDe>(); }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_date_time.h 
b/be/src/vec/data_types/data_type_date_time.h
index c4056f3612..98aa4b26be 100644
--- a/be/src/vec/data_types/data_type_date_time.h
+++ b/be/src/vec/data_types/data_type_date_time.h
@@ -32,6 +32,7 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_number_base.h"
+#include "vec/data_types/serde/data_type_date64_serde.h"
 
 namespace doris {
 namespace vectorized {
@@ -84,6 +85,8 @@ public:
 
     std::string to_string(const IColumn& column, size_t row_num) const 
override;
 
+    DataTypeSerDeSPtr get_serde() const override { return 
std::make_shared<DataTypeDate64SerDe>(); }
+
     void to_string(const IColumn& column, size_t row_num, BufferWritable& 
ostr) const override;
 
     Status from_string(ReadBuffer& rb, IColumn* column) const override;
diff --git a/be/src/vec/data_types/data_type_time_v2.h 
b/be/src/vec/data_types/data_type_time_v2.h
index 8872eeafc0..336048b39a 100644
--- a/be/src/vec/data_types/data_type_time_v2.h
+++ b/be/src/vec/data_types/data_type_time_v2.h
@@ -34,6 +34,8 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_number_base.h"
+#include "vec/data_types/serde/data_type_datetimev2_serde.h"
+#include "vec/data_types/serde/data_type_datev2_serde.h"
 #include "vec/data_types/serde/data_type_number_serde.h"
 #include "vec/data_types/serde/data_type_serde.h"
 
@@ -66,6 +68,8 @@ public:
     bool can_be_used_as_version() const override { return true; }
     bool can_be_inside_nullable() const override { return true; }
 
+    DataTypeSerDeSPtr get_serde() const override { return 
std::make_shared<DataTypeDateV2SerDe>(); }
+
     bool equals(const IDataType& rhs) const override;
     std::string to_string(const IColumn& column, size_t row_num) const 
override;
     void to_string(const IColumn& column, size_t row_num, BufferWritable& 
ostr) const override;
@@ -113,7 +117,7 @@ public:
     void to_string(const IColumn& column, size_t row_num, BufferWritable& 
ostr) const override;
     Status from_string(ReadBuffer& rb, IColumn* column) const override;
     DataTypeSerDeSPtr get_serde() const override {
-        return std::make_shared<DataTypeNumberSerDe<UInt64>>();
+        return std::make_shared<DataTypeDateTimeV2SerDe>();
     };
 
     MutableColumnPtr create_column() const override;
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp 
b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index 8ae9c61858..15c1a4f683 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -17,8 +17,13 @@
 
 #include "data_type_array_serde.h"
 
+#include <arrow/array/builder_nested.h>
+
+#include "gutil/casts.h"
 #include "util/jsonb_document.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/common/assert_cast.h"
 #include "vec/common/string_ref.h"
 
 namespace doris {
@@ -43,5 +48,40 @@ void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& 
column, const JsonbVa
     column.deserialize_and_insert_from_arena(blob->getBlob());
 }
 
+void DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                               arrow::ArrayBuilder* 
array_builder, int start,
+                                               int end) const {
+    auto& array_column = static_cast<const ColumnArray&>(column);
+    auto& offsets = array_column.get_offsets();
+    auto& nested_data = array_column.get_data();
+    auto& builder = assert_cast<arrow::ListBuilder&>(*array_builder);
+    auto nested_builder = builder.value_builder();
+    for (size_t array_idx = start; array_idx < end; ++array_idx) {
+        checkArrowStatus(builder.Append(), column.get_name(), 
array_builder->type()->name());
+        nested_serde->write_column_to_arrow(nested_data, null_map, 
nested_builder,
+                                            offsets[array_idx - 1], 
offsets[array_idx]);
+    }
+}
+
+void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                int start, int end,
+                                                const cctz::time_zone& ctz) 
const {
+    auto& column_array = static_cast<ColumnArray&>(column);
+    auto& offsets_data = column_array.get_offsets();
+    auto concrete_array = down_cast<const arrow::ListArray*>(arrow_array);
+    auto arrow_offsets_array = concrete_array->offsets();
+    auto arrow_offsets = 
down_cast<arrow::Int32Array*>(arrow_offsets_array.get());
+    auto prev_size = offsets_data.back();
+    auto arrow_nested_start_offset = arrow_offsets->Value(start);
+    auto arrow_nested_end_offset = arrow_offsets->Value(end);
+    for (int64_t i = start + 1; i < end + 1; ++i) {
+        // convert to doris offset, start from offsets.back()
+        offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - 
arrow_nested_start_offset);
+    }
+    return nested_serde->read_column_from_arrow(
+            column_array.get_data(), concrete_array->values().get(), 
arrow_nested_start_offset,
+            arrow_nested_end_offset, ctz);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h 
b/be/src/vec/data_types/serde/data_type_array_serde.h
index cf28e33728..e8d08bbf10 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -51,6 +51,12 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
+    void write_column_to_arrow(const IColumn& column, const UInt8* 
null_bytemap,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
+
 private:
     DataTypeSerDeSPtr nested_serde;
 };
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h 
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index 23a2a689d9..dc3dd8f096 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -41,6 +41,15 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+    void write_column_to_arrow(const IColumn& column, const UInt8* 
null_bytemap,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override {
+        LOG(FATAL) << "Not support write bitmap column to arrow";
+    }
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override {
+        LOG(FATAL) << "Not support read bitmap column from arrow";
+    }
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp 
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
new file mode 100644
index 0000000000..1f6a3766f5
--- /dev/null
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_date64_serde.h"
+
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
+namespace doris {
+namespace vectorized {
+
+void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                                arrow::ArrayBuilder* 
array_builder, int start,
+                                                int end) const {
+    auto& col_data = static_cast<const 
ColumnVector<Int64>&>(column).get_data();
+    auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+    for (size_t i = start; i < end; ++i) {
+        char buf[64];
+        const vectorized::VecDateTimeValue* time_val =
+                (const vectorized::VecDateTimeValue*)(&col_data[i]);
+        int len = time_val->to_buffer(buf);
+        if (null_map && null_map[i]) {
+            checkArrowStatus(string_builder.AppendNull(), column.get_name(),
+                             array_builder->type()->name());
+        } else {
+            checkArrowStatus(string_builder.Append(buf, len), 
column.get_name(),
+                             array_builder->type()->name());
+        }
+    }
+}
+
+static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
+    // Doris only supports seconds
+    switch (unit) {
+    case arrow::TimeUnit::type::SECOND: {
+        return 1L;
+    }
+    case arrow::TimeUnit::type::MILLI: {
+        return 1000L;
+    }
+    case arrow::TimeUnit::type::MICRO: {
+        return 1000000L;
+    }
+    case arrow::TimeUnit::type::NANO: {
+        return 1000000000L;
+    }
+    default:
+        return 0L;
+    }
+}
+
+void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int start, int end,
+                                                 const cctz::time_zone& ctz) 
const {
+    auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
+    int64_t divisor = 1;
+    int64_t multiplier = 1;
+    if (arrow_array->type()->id() == arrow::Type::DATE64) {
+        auto concrete_array = down_cast<const 
arrow::Date64Array*>(arrow_array);
+        divisor = 1000; //ms => secs
+        for (size_t value_i = start; value_i < end; ++value_i) {
+            VecDateTimeValue v;
+            v.from_unixtime(
+                    static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier, ctz);
+            col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+        }
+    } else if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) {
+        auto concrete_array = down_cast<const 
arrow::TimestampArray*>(arrow_array);
+        const auto type = 
std::static_pointer_cast<arrow::TimestampType>(arrow_array->type());
+        divisor = time_unit_divisor(type->unit());
+        if (divisor == 0L) {
+            LOG(FATAL) << "Invalid Time Type:" << type->name();
+        }
+        for (size_t value_i = start; value_i < end; ++value_i) {
+            VecDateTimeValue v;
+            v.from_unixtime(
+                    static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier, ctz);
+            col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+        }
+    } else if (arrow_array->type()->id() == arrow::Type::DATE32) {
+        auto concrete_array = down_cast<const 
arrow::Date32Array*>(arrow_array);
+        multiplier = 24 * 60 * 60; // day => secs
+        for (size_t value_i = start; value_i < end; ++value_i) {
+            //            std::cout << "serde : " <<  
concrete_array->Value(value_i) << std::endl;
+            VecDateTimeValue v;
+            v.from_unixtime(
+                    static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier, ctz);
+            v.cast_to_date();
+            col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+        }
+    }
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp 
b/be/src/vec/data_types/serde/data_type_date64_serde.h
similarity index 54%
copy from be/src/vec/data_types/serde/data_type_array_serde.cpp
copy to be/src/vec/data_types/serde/data_type_date64_serde.h
index 8ae9c61858..80a0a1518c 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -15,33 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "data_type_array_serde.h"
+#pragma once
 
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <ostream>
+#include <string>
+
+#include "common/status.h"
+#include "data_type_number_serde.h"
+#include "olap/olap_common.h"
 #include "util/jsonb_document.h"
+#include "util/jsonb_writer.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
 #include "vec/common/string_ref.h"
+#include "vec/core/types.h"
 
 namespace doris {
+class JsonbOutStream;
 
 namespace vectorized {
 class Arena;
 
-void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column, 
JsonbWriter& result,
-                                                 Arena* mem_pool, int32_t 
col_id,
-                                                 int row_num) const {
-    result.writeKey(col_id);
-    const char* begin = nullptr;
-    // maybe serialize_value_into_arena should move to here later.
-    StringRef value = column.serialize_value_into_arena(row_num, *mem_pool, 
begin);
-    result.writeStartBinary();
-    result.writeBinary(value.data, value.size);
-    result.writeEndBinary();
-}
-
-void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const 
JsonbValue* arg) const {
-    auto blob = static_cast<const JsonbBlobVal*>(arg);
-    column.deserialize_and_insert_from_arena(blob->getBlob());
-}
-
+class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
+    void write_column_to_arrow(const IColumn& column, const UInt8* 
null_bytemap,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
+};
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
new file mode 100644
index 0000000000..a8476197dd
--- /dev/null
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_datetimev2_serde.h"
+
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
+namespace doris {
+namespace vectorized {
+
+void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, 
const UInt8* null_map,
+                                                    arrow::ArrayBuilder* 
array_builder, int start,
+                                                    int end) const {
+    auto& col_data = static_cast<const 
ColumnVector<UInt64>&>(column).get_data();
+    auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+    for (size_t i = start; i < end; ++i) {
+        char buf[64];
+        const vectorized::DateV2Value<vectorized::DateTimeV2ValueType>* 
time_val =
+                (const 
vectorized::DateV2Value<vectorized::DateTimeV2ValueType>*)(col_data[i]);
+        int len = time_val->to_buffer(buf);
+        if (null_map && null_map[i]) {
+            checkArrowStatus(string_builder.AppendNull(), column.get_name(),
+                             array_builder->type()->name());
+        } else {
+            checkArrowStatus(string_builder.Append(buf, len), 
column.get_name(),
+                             array_builder->type()->name());
+        }
+    }
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h 
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
similarity index 53%
copy from be/src/vec/data_types/serde/data_type_array_serde.h
copy to be/src/vec/data_types/serde/data_type_datetimev2_serde.h
index cf28e33728..7302a1d4d5 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
@@ -17,42 +17,38 @@
 
 #pragma once
 
+#include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
+#include <stddef.h>
 #include <stdint.h>
 
 #include <ostream>
+#include <string>
 
 #include "common/status.h"
-#include "data_type_serde.h"
+#include "data_type_number_serde.h"
+#include "olap/olap_common.h"
+#include "util/jsonb_document.h"
 #include "util/jsonb_writer.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/types.h"
 
 namespace doris {
-class PValues;
-class JsonbValue;
+class JsonbOutStream;
 
 namespace vectorized {
-class IColumn;
 class Arena;
 
-class DataTypeArraySerDe : public DataTypeSerDe {
-public:
-    DataTypeArraySerDe(const DataTypeSerDeSPtr& _nested_serde) : 
nested_serde(_nested_serde) {}
-
-    Status write_column_to_pb(const IColumn& column, PValues& result, int 
start,
-                              int end) const override {
-        LOG(FATAL) << "Not support write array column to pb";
-    }
-    Status read_column_from_pb(IColumn& column, const PValues& arg) const 
override {
-        LOG(FATAL) << "Not support read from pb to array";
+class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> {
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override {
+        LOG(FATAL) << "not support read arrow array to uint64 column";
     }
-
-    void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, 
Arena* mem_pool,
-                                 int32_t col_id, int row_num) const override;
-
-    void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
-
-private:
-    DataTypeSerDeSPtr nested_serde;
 };
 } // namespace vectorized
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
new file mode 100644
index 0000000000..4095c8d872
--- /dev/null
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_datev2_serde.h"
+
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
+namespace doris {
+namespace vectorized {
+
+void DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                                arrow::ArrayBuilder* 
array_builder, int start,
+                                                int end) const {
+    auto& col_data = static_cast<const 
ColumnVector<UInt32>&>(column).get_data();
+    auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+    for (size_t i = start; i < end; ++i) {
+        char buf[64];
+        const vectorized::DateV2Value<vectorized::DateV2ValueType>* time_val =
+                (const 
vectorized::DateV2Value<vectorized::DateV2ValueType>*)(&col_data[i]);
+        int len = time_val->to_buffer(buf);
+        if (null_map && null_map[i]) {
+            checkArrowStatus(string_builder.AppendNull(), column.get_name(),
+                             array_builder->type()->name());
+        } else {
+            checkArrowStatus(string_builder.Append(buf, len), 
column.get_name(),
+                             array_builder->type()->name());
+        }
+    }
+}
+
+void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int start, int end,
+                                                 const cctz::time_zone& ctz) 
const {
+    std::cout << "column : " << column.get_name() << " data" << 
getTypeName(column.get_data_type())
+              << " array " << arrow_array->type_id() << std::endl;
+    auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
+    auto concrete_array = down_cast<const arrow::Date64Array*>(arrow_array);
+    int64_t divisor = 1;
+    int64_t multiplier = 1;
+
+    multiplier = 24 * 60 * 60; // day => secs
+    for (size_t value_i = start; value_i < end; ++value_i) {
+        DateV2Value<DateV2ValueType> v;
+        v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier,
+                        ctz);
+        col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, 
UInt32>(v));
+    }
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datev2_serde.h
similarity index 54%
copy from be/src/vec/data_types/serde/data_type_array_serde.cpp
copy to be/src/vec/data_types/serde/data_type_datev2_serde.h
index 8ae9c61858..587f2be0c2 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h
@@ -15,33 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "data_type_array_serde.h"
+#pragma once
 
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <ostream>
+#include <string>
+
+#include "common/status.h"
+#include "data_type_number_serde.h"
+#include "olap/olap_common.h"
 #include "util/jsonb_document.h"
+#include "util/jsonb_writer.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
 #include "vec/common/string_ref.h"
+#include "vec/core/types.h"
 
 namespace doris {
+class JsonbOutStream;
 
 namespace vectorized {
 class Arena;
 
-void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column, 
JsonbWriter& result,
-                                                 Arena* mem_pool, int32_t 
col_id,
-                                                 int row_num) const {
-    result.writeKey(col_id);
-    const char* begin = nullptr;
-    // maybe serialize_value_into_arena should move to here later.
-    StringRef value = column.serialize_value_into_arena(row_num, *mem_pool, 
begin);
-    result.writeStartBinary();
-    result.writeBinary(value.data, value.size);
-    result.writeEndBinary();
-}
-
-void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const 
JsonbValue* arg) const {
-    auto blob = static_cast<const JsonbBlobVal*>(arg);
-    column.deserialize_and_insert_from_arena(blob->getBlob());
-}
-
+class DataTypeDateV2SerDe : public DataTypeNumberSerDe<UInt32> {
+    void write_column_to_arrow(const IColumn& column, const UInt8* 
null_bytemap,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
+};
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp 
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index 6e6fa5f4d3..893e10769a 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -16,7 +16,138 @@
 // under the License.
 
 #include "data_type_decimal_serde.h"
+
+#include <arrow/array/array_base.h>
+#include <arrow/array/array_decimal.h>
+#include <arrow/builder.h>
+#include <arrow/util/decimal.h>
+
+#include "arrow/type.h"
+#include "gutil/casts.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/common/arithmetic_overflow.h"
+
 namespace doris {
 
-namespace vectorized {} // namespace vectorized
+namespace vectorized {
+
+template <typename T>
+void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, 
const UInt8* null_map,
+                                                    arrow::ArrayBuilder* 
array_builder, int start,
+                                                    int end) const {
+    auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
+    auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
+    if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+        std::shared_ptr<arrow::DataType> s_decimal_ptr =
+                std::make_shared<arrow::Decimal128Type>(27, 9);
+        for (size_t i = start; i < end; ++i) {
+            if (null_map && null_map[i]) {
+                checkArrowStatus(builder.AppendNull(), column.get_name(),
+                                 array_builder->type()->name());
+                continue;
+            }
+            const auto& data_ref = col.get_data_at(i);
+            const PackedInt128* p_value = reinterpret_cast<const 
PackedInt128*>(data_ref.data);
+            int64_t high = (p_value->value) >> 64;
+            uint64 low = p_value->value;
+            arrow::Decimal128 value(high, low);
+            checkArrowStatus(builder.Append(value), column.get_name(),
+                             array_builder->type()->name());
+        }
+    } else if constexpr (std::is_same_v<T, Decimal<Int128I>>) {
+        std::shared_ptr<arrow::DataType> s_decimal_ptr =
+                std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
+        for (size_t i = start; i < end; ++i) {
+            if (null_map && null_map[i]) {
+                checkArrowStatus(builder.AppendNull(), column.get_name(),
+                                 array_builder->type()->name());
+                continue;
+            }
+            const auto& data_ref = col.get_data_at(i);
+            const PackedInt128* p_value = reinterpret_cast<const 
PackedInt128*>(data_ref.data);
+            int64_t high = (p_value->value) >> 64;
+            uint64 low = p_value->value;
+            arrow::Decimal128 value(high, low);
+            checkArrowStatus(builder.Append(value), column.get_name(),
+                             array_builder->type()->name());
+        }
+    } else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
+        std::shared_ptr<arrow::DataType> s_decimal_ptr =
+                std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
+        for (size_t i = start; i < end; ++i) {
+            if (null_map && null_map[i]) {
+                checkArrowStatus(builder.AppendNull(), column.get_name(),
+                                 array_builder->type()->name());
+                continue;
+            }
+            const auto& data_ref = col.get_data_at(i);
+            const int32_t* p_value = reinterpret_cast<const 
int32_t*>(data_ref.data);
+            int64_t high = *p_value > 0 ? 0 : 1UL << 63;
+            arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value);
+            checkArrowStatus(builder.Append(value), column.get_name(),
+                             array_builder->type()->name());
+        }
+    } else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
+        std::shared_ptr<arrow::DataType> s_decimal_ptr =
+                std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
+        for (size_t i = start; i < end; ++i) {
+            if (null_map && null_map[i]) {
+                checkArrowStatus(builder.AppendNull(), column.get_name(),
+                                 array_builder->type()->name());
+                continue;
+            }
+            const auto& data_ref = col.get_data_at(i);
+            const int64_t* p_value = reinterpret_cast<const 
int64_t*>(data_ref.data);
+            int64_t high = *p_value > 0 ? 0 : 1UL << 63;
+            arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value);
+            checkArrowStatus(builder.Append(value), column.get_name(),
+                             array_builder->type()->name());
+        }
+    } else {
+        LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+    }
+}
+
+template <typename T>
+void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
+                                                     const arrow::Array* 
arrow_array, int start,
+                                                     int end, const 
cctz::time_zone& ctz) const {
+    if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+        auto& column_data = 
static_cast<ColumnDecimal<vectorized::Decimal128>&>(column).get_data();
+        auto concrete_array = down_cast<const 
arrow::DecimalArray*>(arrow_array);
+        const auto* arrow_decimal_type =
+                static_cast<const 
arrow::DecimalType*>(arrow_array->type().get());
+        // TODO check precision
+        const auto scale = arrow_decimal_type->scale();
+        for (size_t value_i = start; value_i < end; ++value_i) {
+            auto value = *reinterpret_cast<const vectorized::Decimal128*>(
+                    concrete_array->Value(value_i));
+            // convert scale to 9;
+            if (9 > scale) {
+                using MaxNativeType = typename Decimal128::NativeType;
+                MaxNativeType converted_value = common::exp10_i128(9 - scale);
+                if (common::mul_overflow(static_cast<MaxNativeType>(value), 
converted_value,
+                                         converted_value)) {
+                    VLOG_DEBUG << "Decimal convert overflow";
+                    value = converted_value < 0
+                                    ? std::numeric_limits<typename Decimal128 
::NativeType>::min()
+                                    : std::numeric_limits<typename Decimal128 
::NativeType>::max();
+                } else {
+                    value = converted_value;
+                }
+            } else if (9 < scale) {
+                value = value / common::exp10_i128(scale - 9);
+            }
+            column_data.emplace_back(value);
+        }
+    } else {
+        LOG(FATAL) << "Not support read " << column.get_name() << " from 
arrow";
+    }
+}
+
+template class DataTypeDecimalSerDe<Decimal32>;
+template class DataTypeDecimalSerDe<Decimal64>;
+template class DataTypeDecimalSerDe<Decimal128>;
+template class DataTypeDecimalSerDe<Decimal128I>;
+} // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h 
b/be/src/vec/data_types/serde/data_type_decimal_serde.h
index 79293e9dd8..0e9685d936 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.h
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h
@@ -54,6 +54,12 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 };
 
 template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h 
b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
index 7d5db6c041..af02dc9824 100644
--- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
+++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
@@ -51,6 +51,15 @@ public:
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override {
         LOG(FATAL) << "Not support read from jsonb to FixedLengthObject";
     }
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override {
+        LOG(FATAL) << "Not support write FixedLengthObject column to arrow";
+    }
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override {
+        LOG(FATAL) << "Not support read FixedLengthObject column from arrow";
+    }
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp 
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index b7e1afb3a7..a3fc40aebc 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -23,6 +23,7 @@
 
 #include <string>
 
+#include "arrow/array/builder_binary.h"
 #include "olap/hll.h"
 #include "util/jsonb_document.h"
 #include "util/slice.h"
@@ -79,5 +80,25 @@ void DataTypeHLLSerDe::read_one_cell_from_jsonb(IColumn& 
column, const JsonbValu
     col.insert_value(hyper_log_log);
 }
 
+void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                             arrow::ArrayBuilder* 
array_builder, int start,
+                                             int end) const {
+    const auto& col = assert_cast<const ColumnHLL&>(column);
+    auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+    for (size_t string_i = start; string_i < end; ++string_i) {
+        if (null_map && null_map[string_i]) {
+            checkArrowStatus(builder.AppendNull(), column.get_name(),
+                             array_builder->type()->name());
+        } else {
+            auto& hll_value = 
const_cast<HyperLogLog&>(col.get_element(string_i));
+            std::string memory_buffer(hll_value.max_serialized_size(), '0');
+            hll_value.serialize((uint8_t*)memory_buffer.data());
+            checkArrowStatus(
+                    builder.Append(memory_buffer.data(), 
static_cast<int>(memory_buffer.size())),
+                    column.get_name(), array_builder->type()->name());
+        }
+    }
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h 
b/be/src/vec/data_types/serde/data_type_hll_serde.h
index facd6aaa72..9a47f8fbd7 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -41,6 +41,13 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override {
+        LOG(FATAL) << "Not support read hll column from arrow";
+    }
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp 
b/be/src/vec/data_types/serde/data_type_map_serde.cpp
index 9f430e3198..87dd4e5615 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp
@@ -40,5 +40,17 @@ void DataTypeMapSerDe::write_one_cell_to_jsonb(const 
IColumn& column, JsonbWrite
     result.writeBinary(value.data, value.size);
     result.writeEndBinary();
 }
+
+void DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                             arrow::ArrayBuilder* 
array_builder, int start,
+                                             int end) const {
+    LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+}
+
+void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                              int start, int end,
+                                              const cctz::time_zone& ctz) 
const {
+    LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
+}
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h 
b/be/src/vec/data_types/serde/data_type_map_serde.h
index dfbe55cf19..4708e36afd 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.h
+++ b/be/src/vec/data_types/serde/data_type_map_serde.h
@@ -50,6 +50,11 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 
 private:
     DataTypeSerDeSPtr key_serde;
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp 
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index 8072c785c9..0c7e72bfda 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -17,6 +17,7 @@
 
 #include "data_type_nullable_serde.h"
 
+#include <arrow/array/array_base.h>
 #include <gen_cpp/types.pb.h>
 
 #include <algorithm>
@@ -94,5 +95,41 @@ void 
DataTypeNullableSerDe::read_one_cell_from_jsonb(IColumn& column, const Json
     auto& null_map_data = col.get_null_map_data();
     null_map_data.push_back(0);
 }
+
+/**nullable serialize to arrow
+   1/ convert the null_map from doris to arrow null byte map
+   2/ pass the arrow null byteamp to nested column , and call AppendValues
+**/
+void DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                                  arrow::ArrayBuilder* 
array_builder, int start,
+                                                  int end) const {
+    const auto& column_nullable = assert_cast<const ColumnNullable&>(column);
+    const PaddedPODArray<UInt8>& bytemap = column_nullable.get_null_map_data();
+    PaddedPODArray<UInt8> res;
+    if (column_nullable.has_null()) {
+        res.reserve(end - start);
+        for (size_t i = start; i < end; ++i) {
+            res.emplace_back(
+                    !(bytemap)[i]); //Invert values since Arrow interprets 1 
as a non-null value
+        }
+    }
+    const UInt8* arrow_null_bytemap_raw_ptr = res.empty() ? nullptr : 
res.data();
+    nested_serde->write_column_to_arrow(column_nullable.get_nested_column(),
+                                        arrow_null_bytemap_raw_ptr, 
array_builder, start, end);
+}
+
+void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                   int start, int end,
+                                                   const cctz::time_zone& ctz) 
const {
+    auto& col = reinterpret_cast<ColumnNullable&>(column);
+    NullMap& map_data = col.get_null_map_data();
+    for (size_t i = start; i < end; ++i) {
+        auto is_null = arrow_array->IsNull(i);
+        map_data.emplace_back(is_null);
+    }
+    return nested_serde->read_column_from_arrow(col.get_nested_column(), 
arrow_array, start, end,
+                                                ctz);
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h 
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 3975675715..7631ed90ef 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -43,6 +43,11 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 
 private:
     DataTypeSerDeSPtr nested_serde;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index f6667e8930..d8736d30a8 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -17,6 +17,114 @@
 
 #include "data_type_number_serde.h"
 
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
 namespace doris {
-namespace vectorized {} // namespace vectorized
+namespace vectorized {
+
+// Type map的基本结构
+template <typename Key, typename Value, typename... Rest>
+struct TypeMap {
+    using KeyType = Key;
+    using ValueType = Value;
+    using Next = TypeMap<Rest...>;
+};
+
+// Type map的末端
+template <>
+struct TypeMap<void, void> {};
+
+// TypeMapLookup 前向声明
+template <typename Key, typename Map>
+struct TypeMapLookup;
+
+// Type map查找:找到匹配的键时的情况
+template <typename Key, typename Value, typename... Rest>
+struct TypeMapLookup<Key, TypeMap<Key, Value, Rest...>> {
+    using ValueType = Value;
+};
+
+// Type map查找:递归查找
+template <typename Key, typename K, typename V, typename... Rest>
+struct TypeMapLookup<Key, TypeMap<K, V, Rest...>> {
+    using ValueType = typename TypeMapLookup<Key, TypeMap<Rest...>>::ValueType;
+};
+
+using DORIS_NUMERIC_ARROW_BUILDER =
+        TypeMap<UInt8, arrow::BooleanBuilder, Int8, arrow::Int8Builder, UInt16,
+                arrow::UInt16Builder, Int16, arrow::Int16Builder, UInt32, 
arrow::UInt32Builder,
+                Int32, arrow::Int32Builder, UInt64, arrow::UInt64Builder, 
Int64,
+                arrow::Int64Builder, UInt128, arrow::FixedSizeBinaryBuilder, 
Int128,
+                arrow::FixedSizeBinaryBuilder, Float32, arrow::FloatBuilder, 
Float64,
+                arrow::DoubleBuilder, void,
+                void // 添加这一行来表示TypeMap的末端
+                >;
+
+template <typename T>
+void DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column, 
const UInt8* null_map,
+                                                   arrow::ArrayBuilder* 
array_builder, int start,
+                                                   int end) const {
+    auto& col_data = assert_cast<const ColumnType&>(column).get_data();
+    using ARROW_BUILDER_TYPE = typename TypeMapLookup<T, 
DORIS_NUMERIC_ARROW_BUILDER>::ValueType;
+    if constexpr (std::is_same_v<T, UInt8>) {
+        ARROW_BUILDER_TYPE& builder = 
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+        checkArrowStatus(
+                builder.AppendValues(reinterpret_cast<const 
uint8_t*>(col_data.data() + start),
+                                     end - start, reinterpret_cast<const 
uint8_t*>(null_map)),
+                column.get_name(), array_builder->type()->name());
+    } else if constexpr (std::is_same_v<T, Int128> || std::is_same_v<T, 
UInt128>) {
+        ARROW_BUILDER_TYPE& builder = 
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+        size_t fixed_length = sizeof(typename ColumnType::value_type);
+        const uint8_t* data_start =
+                reinterpret_cast<const uint8_t*>(col_data.data()) + start * 
fixed_length;
+        checkArrowStatus(builder.AppendValues(data_start, end - start,
+                                              reinterpret_cast<const 
uint8_t*>(null_map)),
+                         column.get_name(), array_builder->type()->name());
+    } else {
+        ARROW_BUILDER_TYPE& builder = 
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+        checkArrowStatus(builder.AppendValues(col_data.data() + start, end - 
start,
+                                              reinterpret_cast<const 
uint8_t*>(null_map)),
+                         column.get_name(), array_builder->type()->name());
+    }
+}
+
+template <typename T>
+void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
+                                                    const arrow::Array* 
arrow_array, int start,
+                                                    int end, const 
cctz::time_zone& ctz) const {
+    int row_count = end - start;
+    auto& col_data = static_cast<ColumnVector<T>&>(column).get_data();
+
+    // now uint8 for bool
+    if constexpr (std::is_same_v<T, UInt8>) {
+        auto concrete_array = down_cast<const 
arrow::BooleanArray*>(arrow_array);
+        for (size_t bool_i = 0; bool_i != 
static_cast<size_t>(concrete_array->length()); ++bool_i) {
+            col_data.emplace_back(concrete_array->Value(bool_i));
+        }
+        return;
+    }
+    /// buffers[0] is a null bitmap and buffers[1] are actual values
+    std::shared_ptr<arrow::Buffer> buffer = arrow_array->data()->buffers[1];
+    const auto* raw_data = reinterpret_cast<const T*>(buffer->data()) + start;
+    col_data.insert(raw_data, raw_data + row_count);
+}
+
+/// Explicit template instantiations - to avoid code bloat in headers.
+template class DataTypeNumberSerDe<UInt8>;
+template class DataTypeNumberSerDe<UInt16>;
+template class DataTypeNumberSerDe<UInt32>;
+template class DataTypeNumberSerDe<UInt64>;
+template class DataTypeNumberSerDe<UInt128>;
+template class DataTypeNumberSerDe<Int8>;
+template class DataTypeNumberSerDe<Int16>;
+template class DataTypeNumberSerDe<Int32>;
+template class DataTypeNumberSerDe<Int64>;
+template class DataTypeNumberSerDe<Int128>;
+template class DataTypeNumberSerDe<Float32>;
+template class DataTypeNumberSerDe<Float64>;
+} // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h 
b/be/src/vec/data_types/serde/data_type_number_serde.h
index 62632b946d..90566510ca 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -41,6 +41,12 @@ class JsonbOutStream;
 namespace vectorized {
 class Arena;
 
+// special data type using, maybe has various serde actions, so use specific 
date serde
+//  DataTypeDateV2 => T:UInt32
+//  DataTypeDateTimeV2 => T:UInt64
+//  DataTypeTime => T:Float64
+//  DataTypeDate => T:Int64
+//  DataTypeDateTime => T:Int64
 template <typename T>
 class DataTypeNumberSerDe : public DataTypeSerDe {
     static_assert(IsNumber<T>);
@@ -55,6 +61,12 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 };
 
 template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h 
b/be/src/vec/data_types/serde/data_type_object_serde.h
index c72deb59ea..d8c93b9afe 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -51,6 +51,16 @@ public:
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override {
         LOG(FATAL) << "Not support write json object to column";
     }
+
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override {
+        LOG(FATAL) << "Not support write object column to arrow";
+    }
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override {
+        LOG(FATAL) << "Not support read object column from arrow";
+    }
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h 
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index bf5912446b..dc2031c5ef 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -47,6 +47,15 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override {
+        LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+    }
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override {
+        LOG(FATAL) << "Not support read " << column.get_name() << " from 
arrow";
+    }
 };
 
 template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_serde.h 
b/be/src/vec/data_types/serde/data_type_serde.h
index 5908c0836e..2d196fcb9e 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -22,8 +22,19 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/status.h"
 #include "common/status.h"
 #include "util/jsonb_writer.h"
+#include "vec/common/pod_array_fwd.h"
+#include "vec/core/types.h"
+
+namespace arrow {
+class ArrayBuilder;
+class Array;
+} // namespace arrow
+namespace cctz {
+class time_zone;
+} // namespace cctz
 
 namespace doris {
 class PValues;
@@ -70,8 +81,21 @@ public:
     // JSON serializer and deserializer
 
     // Arrow serializer and deserializer
+    virtual void write_column_to_arrow(const IColumn& column, const UInt8* 
null_map,
+                                       arrow::ArrayBuilder* array_builder, int 
start,
+                                       int end) const = 0;
+    virtual void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                        int end, const cctz::time_zone& ctz) 
const = 0;
 };
 
+inline void checkArrowStatus(const arrow::Status& status, const std::string& 
column,
+                             const std::string& format_name) {
+    if (!status.ok()) {
+        LOG(FATAL) << "arrow serde with arrow: " << format_name << " with 
column : " << column
+                   << " with error msg: " << status.ToString();
+    }
+}
+
 using DataTypeSerDeSPtr = std::shared_ptr<DataTypeSerDe>;
 using DataTypeSerDeSPtrs = std::vector<DataTypeSerDeSPtr>;
 
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp 
b/be/src/vec/data_types/serde/data_type_string_serde.cpp
index 081b9b1748..ba230da987 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp
@@ -21,7 +21,10 @@
 #include <gen_cpp/types.pb.h>
 #include <stddef.h>
 
+#include "arrow/array/builder_binary.h"
+#include "gutil/casts.h"
 #include "util/jsonb_document.h"
+#include "util/jsonb_utils.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_string.h"
 #include "vec/common/string_ref.h"
@@ -63,5 +66,62 @@ void DataTypeStringSerDe::read_one_cell_from_jsonb(IColumn& 
column, const JsonbV
     auto blob = static_cast<const JsonbBlobVal*>(arg);
     col.insert_data(blob->getBlob(), blob->getBlobLen());
 }
+
+void DataTypeStringSerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                                arrow::ArrayBuilder* 
array_builder, int start,
+                                                int end) const {
+    const auto& string_column = assert_cast<const ColumnString&>(column);
+    auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+    for (size_t string_i = start; string_i < end; ++string_i) {
+        if (null_map && null_map[string_i]) {
+            checkArrowStatus(builder.AppendNull(), column.get_name(),
+                             array_builder->type()->name());
+            continue;
+        }
+        std::string_view string_ref = 
string_column.get_data_at(string_i).to_string_view();
+        if (column.get_data_type() == TypeIndex::JSONB) {
+            std::string json_string =
+                    JsonbToJson::jsonb_to_json_string(string_ref.data(), 
string_ref.size());
+            checkArrowStatus(builder.Append(json_string.data(), 
json_string.size()),
+                             column.get_name(), array_builder->type()->name());
+        } else {
+            checkArrowStatus(builder.Append(string_ref.data(), 
string_ref.size()),
+                             column.get_name(), array_builder->type()->name());
+        }
+    }
+}
+
+void DataTypeStringSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int start, int end,
+                                                 const cctz::time_zone& ctz) 
const {
+    auto& column_chars_t = assert_cast<ColumnString&>(column).get_chars();
+    auto& column_offsets = assert_cast<ColumnString&>(column).get_offsets();
+    if (arrow_array->type_id() == arrow::Type::STRING ||
+        arrow_array->type_id() == arrow::Type::BINARY) {
+        auto concrete_array = down_cast<const 
arrow::BinaryArray*>(arrow_array);
+        std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
+
+        for (size_t offset_i = start; offset_i < end; ++offset_i) {
+            if (!concrete_array->IsNull(offset_i)) {
+                const auto* raw_data = buffer->data() + 
concrete_array->value_offset(offset_i);
+                column_chars_t.insert(raw_data, raw_data + 
concrete_array->value_length(offset_i));
+            }
+            column_offsets.emplace_back(column_chars_t.size());
+        }
+    } else if (arrow_array->type_id() == arrow::Type::FIXED_SIZE_BINARY) {
+        auto concrete_array = down_cast<const 
arrow::FixedSizeBinaryArray*>(arrow_array);
+        uint32_t width = concrete_array->byte_width();
+        const auto* array_data = concrete_array->GetValue(start);
+
+        for (size_t offset_i = 0; offset_i < end - start; ++offset_i) {
+            if (!concrete_array->IsNull(offset_i)) {
+                const auto* raw_data = array_data + (offset_i * width);
+                column_chars_t.insert(raw_data, raw_data + width);
+            }
+            column_offsets.emplace_back(column_chars_t.size());
+        }
+    }
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h 
b/be/src/vec/data_types/serde/data_type_string_serde.h
index 9893c5721e..5fe7d00db8 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -40,6 +40,12 @@ public:
                                  int32_t col_id, int row_num) const override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
+
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp 
b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
index 2af87dd484..be21680009 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
@@ -42,5 +42,17 @@ void DataTypeStructSerDe::read_one_cell_from_jsonb(IColumn& 
column, const JsonbV
     auto blob = static_cast<const JsonbBlobVal*>(arg);
     column.deserialize_and_insert_from_arena(blob->getBlob());
 }
+
+void DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const 
UInt8* null_map,
+                                                arrow::ArrayBuilder* 
array_builder, int start,
+                                                int end) const {
+    LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+}
+
+void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int start, int end,
+                                                 const cctz::time_zone& ctz) 
const {
+    LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
+}
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h 
b/be/src/vec/data_types/serde/data_type_struct_serde.h
index e6abe47b7d..836d5bdbdd 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.h
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.h
@@ -51,6 +51,12 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
+    void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+                               arrow::ArrayBuilder* array_builder, int start,
+                               int end) const override;
+    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int start,
+                                int end, const cctz::time_zone& ctz) const 
override;
+
 private:
     DataTypeSerDeSPtrs elemSerDeSPtrs;
 };
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp 
b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index af94664f1b..5de57156dd 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -71,19 +71,6 @@
     M(::arrow::Type::DATE64, TYPE_DATETIME)           \
     M(::arrow::Type::DECIMAL, TYPE_DECIMALV2)
 
-#define FOR_ARROW_NUMERIC_TYPES(M)      \
-    M(arrow::Type::UINT8, UInt8)        \
-    M(arrow::Type::INT8, Int8)          \
-    M(arrow::Type::INT16, Int16)        \
-    M(arrow::Type::UINT16, UInt16)      \
-    M(arrow::Type::INT32, Int32)        \
-    M(arrow::Type::UINT32, UInt32)      \
-    M(arrow::Type::UINT64, UInt64)      \
-    M(arrow::Type::INT64, Int64)        \
-    M(arrow::Type::HALF_FLOAT, Float32) \
-    M(arrow::Type::FLOAT, Float32)      \
-    M(arrow::Type::DOUBLE, Float64)
-
 namespace doris::vectorized {
 
 PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) {
@@ -100,254 +87,7 @@ PrimitiveType 
arrow_type_to_primitive_type(::arrow::Type::type type) {
     return INVALID_TYPE;
 }
 
-static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx,
-                                   vectorized::ColumnNullable* nullable_column,
-                                   size_t num_elements) {
-    size_t null_elements_count = 0;
-    NullMap& map_data = nullable_column->get_null_map_data();
-    for (size_t i = 0; i < num_elements; ++i) {
-        auto is_null = array->IsNull(array_idx + i);
-        map_data.emplace_back(is_null);
-        null_elements_count += is_null;
-    }
-    return null_elements_count;
-}
-
-/// Inserts chars and offsets right into internal column data to reduce an 
overhead.
-/// Internal offsets are shifted by one to the right in comparison with Arrow 
ones. So the last offset should map to the end of all chars.
-/// Also internal strings are null terminated.
-static Status convert_column_with_string_data(const arrow::Array* array, 
size_t array_idx,
-                                              MutableColumnPtr& data_column, 
size_t num_elements) {
-    auto& column_chars_t = 
assert_cast<ColumnString&>(*data_column).get_chars();
-    auto& column_offsets = 
assert_cast<ColumnString&>(*data_column).get_offsets();
-
-    auto concrete_array = down_cast<const arrow::BinaryArray*>(array);
-    std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
-
-    for (size_t offset_i = array_idx; offset_i < array_idx + num_elements; 
++offset_i) {
-        if (!concrete_array->IsNull(offset_i) && buffer) {
-            const auto* raw_data = buffer->data() + 
concrete_array->value_offset(offset_i);
-            column_chars_t.insert(raw_data, raw_data + 
concrete_array->value_length(offset_i));
-        }
-
-        column_offsets.emplace_back(column_chars_t.size());
-    }
-    return Status::OK();
-}
-
-static Status convert_column_with_fixed_size_data(const arrow::Array* array, 
size_t array_idx,
-                                                  MutableColumnPtr& 
data_column,
-                                                  size_t num_elements) {
-    auto& column_chars_t = 
assert_cast<ColumnString&>(*data_column).get_chars();
-    auto& column_offsets = 
assert_cast<ColumnString&>(*data_column).get_offsets();
-
-    auto concrete_array = down_cast<const arrow::FixedSizeBinaryArray*>(array);
-    uint32_t width = concrete_array->byte_width();
-    const auto* array_data = concrete_array->GetValue(array_idx);
-
-    for (size_t offset_i = 0; offset_i < num_elements; ++offset_i) {
-        if (!concrete_array->IsNull(offset_i)) {
-            const auto* raw_data = array_data + (offset_i * width);
-            column_chars_t.insert(raw_data, raw_data + width);
-        }
-        column_offsets.emplace_back(column_chars_t.size());
-    }
-    return Status::OK();
-}
-
-/// Inserts numeric data right into internal column data to reduce an overhead
-template <typename NumericType, typename VectorType = 
ColumnVector<NumericType>>
-Status convert_column_with_numeric_data(const arrow::Array* array, size_t 
array_idx,
-                                        MutableColumnPtr& data_column, size_t 
num_elements) {
-    auto& column_data = static_cast<VectorType&>(*data_column).get_data();
-    /// buffers[0] is a null bitmap and buffers[1] are actual values
-    std::shared_ptr<arrow::Buffer> buffer = array->data()->buffers[1];
-    const auto* raw_data = reinterpret_cast<const 
NumericType*>(buffer->data()) + array_idx;
-    column_data.insert(raw_data, raw_data + num_elements);
-    return Status::OK();
-}
-
-static Status convert_column_with_boolean_data(const arrow::Array* array, 
size_t array_idx,
-                                               MutableColumnPtr& data_column, 
size_t num_elements) {
-    auto& column_data = 
static_cast<ColumnVector<UInt8>&>(*data_column).get_data();
-    auto concrete_array = down_cast<const arrow::BooleanArray*>(array);
-    for (size_t bool_i = array_idx; bool_i < array_idx + num_elements; 
++bool_i) {
-        column_data.emplace_back(concrete_array->Value(bool_i));
-    }
-    return Status::OK();
-}
-
-static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
-    // Doris only supports seconds
-    switch (unit) {
-    case arrow::TimeUnit::type::SECOND: {
-        return 1L;
-    }
-    case arrow::TimeUnit::type::MILLI: {
-        return 1000L;
-    }
-    case arrow::TimeUnit::type::MICRO: {
-        return 1000000L;
-    }
-    case arrow::TimeUnit::type::NANO: {
-        return 1000000000L;
-    }
-    default:
-        return 0L;
-    }
-}
-
-template <typename ArrowType>
-Status convert_column_with_timestamp_data(const arrow::Array* array, size_t 
array_idx,
-                                          MutableColumnPtr& data_column, 
size_t num_elements,
-                                          const cctz::time_zone& ctz) {
-    auto& column_data = 
static_cast<ColumnVector<Int64>&>(*data_column).get_data();
-    auto concrete_array = down_cast<const ArrowType*>(array);
-    int64_t divisor = 1;
-    int64_t multiplier = 1;
-    if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
-        const auto type = 
std::static_pointer_cast<arrow::TimestampType>(array->type());
-        divisor = time_unit_divisor(type->unit());
-        if (divisor == 0L) {
-            return Status::InternalError(fmt::format("Invalid Time Type:{}", 
type->name()));
-        }
-    } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
-        multiplier = 24 * 60 * 60; // day => secs
-    } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
-        divisor = 1000; //ms => secs
-    }
-
-    for (size_t value_i = array_idx; value_i < array_idx + num_elements; 
++value_i) {
-        VecDateTimeValue v;
-        v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier,
-                        ctz);
-        if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
-            v.cast_to_date();
-        }
-        column_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
-    }
-    return Status::OK();
-}
-
-template <typename ArrowType>
-Status convert_column_with_date_v2_data(const arrow::Array* array, size_t 
array_idx,
-                                        MutableColumnPtr& data_column, size_t 
num_elements,
-                                        const cctz::time_zone& ctz) {
-    auto& column_data = 
static_cast<ColumnVector<UInt32>&>(*data_column).get_data();
-    auto concrete_array = down_cast<const ArrowType*>(array);
-    int64_t divisor = 1;
-    int64_t multiplier = 1;
-    if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
-        const auto type = 
std::static_pointer_cast<arrow::TimestampType>(array->type());
-        divisor = time_unit_divisor(type->unit());
-        if (divisor == 0L) {
-            return Status::InternalError(fmt::format("Invalid Time Type:{}", 
type->name()));
-        }
-    } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
-        multiplier = 24 * 60 * 60; // day => secs
-    } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
-        divisor = 1000; //ms => secs
-    }
-
-    for (size_t value_i = array_idx; value_i < array_idx + num_elements; 
++value_i) {
-        DateV2Value<DateV2ValueType> v;
-        v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier,
-                        ctz);
-        column_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, 
UInt32>(v));
-    }
-    return Status::OK();
-}
-
-template <typename ArrowType>
-Status convert_column_with_datetime_v2_data(const arrow::Array* array, size_t 
array_idx,
-                                            MutableColumnPtr& data_column, 
size_t num_elements,
-                                            const cctz::time_zone& ctz) {
-    auto& column_data = 
static_cast<ColumnVector<UInt64>&>(*data_column).get_data();
-    auto concrete_array = down_cast<const ArrowType*>(array);
-    int64_t divisor = 1;
-    int64_t multiplier = 1;
-    if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
-        const auto type = 
std::static_pointer_cast<arrow::TimestampType>(array->type());
-        divisor = time_unit_divisor(type->unit());
-        if (divisor == 0L) {
-            return Status::InternalError(fmt::format("Invalid Time Type:{}", 
type->name()));
-        }
-    } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
-        multiplier = 24 * 60 * 60; // day => secs
-    } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
-        divisor = 1000; //ms => secs
-    }
-
-    for (size_t value_i = array_idx; value_i < array_idx + num_elements; 
++value_i) {
-        DateV2Value<DateTimeV2ValueType> v;
-        v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / 
divisor * multiplier,
-                        ctz);
-        column_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, 
UInt64>(v));
-    }
-    return Status::OK();
-}
-
-static Status convert_column_with_decimal_data(const arrow::Array* array, 
size_t array_idx,
-                                               MutableColumnPtr& data_column, 
size_t num_elements) {
-    auto& column_data =
-            
static_cast<ColumnDecimal<vectorized::Decimal128>&>(*data_column).get_data();
-    auto concrete_array = down_cast<const arrow::DecimalArray*>(array);
-    const auto* arrow_decimal_type = 
static_cast<arrow::DecimalType*>(array->type().get());
-    // TODO check precision
-    //size_t precision = arrow_decimal_type->precision();
-    const auto scale = arrow_decimal_type->scale();
-
-    for (size_t value_i = array_idx; value_i < array_idx + num_elements; 
++value_i) {
-        auto value =
-                *reinterpret_cast<const 
vectorized::Decimal128*>(concrete_array->Value(value_i));
-        // convert scale to 9
-        if (scale != 9) {
-            value = 
convert_decimals<vectorized::DataTypeDecimal<vectorized::Decimal128>,
-                                     
vectorized::DataTypeDecimal<vectorized::Decimal128>>(value,
-                                                                               
           scale, 9);
-        }
-        column_data.emplace_back(value);
-    }
-    return Status::OK();
-}
-
-static Status convert_offset_from_list_column(const arrow::Array* array, 
size_t array_idx,
-                                              MutableColumnPtr& data_column, 
size_t num_elements,
-                                              size_t* start_idx_for_data, 
size_t* num_for_data) {
-    auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets();
-    auto concrete_array = down_cast<const arrow::ListArray*>(array);
-    auto arrow_offsets_array = concrete_array->offsets();
-    auto arrow_offsets = 
down_cast<arrow::Int32Array*>(arrow_offsets_array.get());
-    auto prev_size = offsets_data.back();
-    for (int64_t i = array_idx + 1; i < array_idx + num_elements + 1; ++i) {
-        // convert to doris offset, start from offsets.back()
-        offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) -
-                                  arrow_offsets->Value(array_idx));
-    }
-    *start_idx_for_data = arrow_offsets->Value(array_idx);
-    *num_for_data = offsets_data.back() - prev_size;
-
-    return Status::OK();
-}
-
-static Status convert_column_with_list_data(const arrow::Array* array, size_t 
array_idx,
-                                            MutableColumnPtr& data_column, 
size_t num_elements,
-                                            const cctz::time_zone& ctz,
-                                            const DataTypePtr& nested_type) {
-    size_t start_idx_of_data = 0;
-    size_t num_of_data = 0;
-    // get start idx and num of values from arrow offsets
-    RETURN_IF_ERROR(convert_offset_from_list_column(array, array_idx, 
data_column, num_elements,
-                                                    &start_idx_of_data, 
&num_of_data));
-    auto& data_column_ptr = 
static_cast<ColumnArray&>(*data_column).get_data_ptr();
-    auto concrete_array = down_cast<const arrow::ListArray*>(array);
-    std::shared_ptr<arrow::Array> arrow_data = concrete_array->values();
-
-    return arrow_column_to_doris_column(arrow_data.get(), start_idx_of_data, 
data_column_ptr,
-                                        nested_type, num_of_data, ctz);
-}
-
-// For convenient unit test. Not use this in formal code.
+//// For convenient unit test. Not use this in formal code.
 Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t 
arrow_batch_cur_idx,
                                     ColumnPtr& doris_column, const 
DataTypePtr& type,
                                     size_t num_elements, const std::string& 
timezone) {
@@ -360,69 +100,10 @@ Status arrow_column_to_doris_column(const arrow::Array* 
arrow_column, size_t arr
 Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t 
arrow_batch_cur_idx,
                                     ColumnPtr& doris_column, const 
DataTypePtr& type,
                                     size_t num_elements, const 
cctz::time_zone& ctz) {
-    // src column always be nullable for simplify converting
-    CHECK(doris_column->is_nullable());
-    MutableColumnPtr data_column = nullptr;
-    auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-            (*std::move(doris_column)).mutate().get());
-    fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column, 
num_elements);
-    data_column = nullable_column->get_nested_column_ptr();
-    WhichDataType which_type(remove_nullable(type));
-    // process data
-    switch (arrow_column->type()->id()) {
-    case arrow::Type::STRING:
-    case arrow::Type::BINARY:
-        return convert_column_with_string_data(arrow_column, 
arrow_batch_cur_idx, data_column,
-                                               num_elements);
-    case arrow::Type::FIXED_SIZE_BINARY:
-        return convert_column_with_fixed_size_data(arrow_column, 
arrow_batch_cur_idx, data_column,
-                                                   num_elements);
-#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE)             \
-    case ARROW_NUMERIC_TYPE:                                       \
-        return convert_column_with_numeric_data<CPP_NUMERIC_TYPE>( \
-                arrow_column, arrow_batch_cur_idx, data_column, num_elements);
-        FOR_ARROW_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
-    case arrow::Type::BOOL:
-        return convert_column_with_boolean_data(arrow_column, 
arrow_batch_cur_idx, data_column,
-                                                num_elements);
-    case arrow::Type::DATE32:
-        if (which_type.is_date_v2()) {
-            return convert_column_with_date_v2_data<arrow::Date32Array>(
-                    arrow_column, arrow_batch_cur_idx, data_column, 
num_elements, ctz);
-        } else {
-            return convert_column_with_timestamp_data<arrow::Date32Array>(
-                    arrow_column, arrow_batch_cur_idx, data_column, 
num_elements, ctz);
-        }
-    case arrow::Type::DATE64:
-        if (which_type.is_date_v2_or_datetime_v2()) {
-            return convert_column_with_datetime_v2_data<arrow::Date64Array>(
-                    arrow_column, arrow_batch_cur_idx, data_column, 
num_elements, ctz);
-        } else {
-            return convert_column_with_timestamp_data<arrow::Date64Array>(
-                    arrow_column, arrow_batch_cur_idx, data_column, 
num_elements, ctz);
-        }
-    case arrow::Type::TIMESTAMP:
-        if (which_type.is_date_v2_or_datetime_v2()) {
-            return convert_column_with_datetime_v2_data<arrow::TimestampArray>(
-                    arrow_column, arrow_batch_cur_idx, data_column, 
num_elements, ctz);
-        } else {
-            return convert_column_with_timestamp_data<arrow::TimestampArray>(
-                    arrow_column, arrow_batch_cur_idx, data_column, 
num_elements, ctz);
-        }
-    case arrow::Type::DECIMAL:
-        return convert_column_with_decimal_data(arrow_column, 
arrow_batch_cur_idx, data_column,
-                                                num_elements);
-    case arrow::Type::LIST:
-        CHECK(type->have_subtypes());
-        return convert_column_with_list_data(
-                arrow_column, arrow_batch_cur_idx, data_column, num_elements, 
ctz,
-                (reinterpret_cast<const 
DataTypeArray*>(type.get()))->get_nested_type());
-    default:
-        break;
-    }
-    return Status::NotSupported(
-            fmt::format("Not support arrow type:{}", 
arrow_column->type()->name()));
+    
type->get_serde()->read_column_from_arrow(doris_column->assume_mutable_ref(), 
arrow_column,
+                                              arrow_batch_cur_idx,
+                                              arrow_batch_cur_idx + 
num_elements, ctz);
+    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index d543c48570..50cc5ad6c4 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -228,7 +228,8 @@ set(VEC_TEST_FILES
     vec/columns/column_decimal_test.cpp
     vec/columns/column_fixed_length_object_test.cpp
     vec/data_types/complex_type_test.cpp
-    vec/data_types/serde/data_type_serde_test.cpp
+    vec/data_types/serde/data_type_serde_pb_test.cpp
+    vec/data_types/serde/data_type_serde_arrow_test.cpp
     vec/core/block_test.cpp
     vec/core/block_spill_test.cpp
     vec/core/column_array_test.cpp
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
new file mode 100644
index 0000000000..e7917e7cea
--- /dev/null
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -0,0 +1,321 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/array/builder_base.h>
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/record_batch.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+#include <arrow/visit_type_inline.h>
+#include <arrow/visitor.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/types.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <math.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/hll.h"
+#include "runtime/descriptors.cpp"
+#include "runtime/descriptors.h"
+#include "util/arrow/block_convertor.h"
+#include "util/arrow/row_batch.h"
+#include "util/bitmap_value.h"
+#include "util/quantile_state.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/block.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_time_v2.h"
+#include "vec/runtime/vdatetime_value.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+namespace doris::vectorized {
+
+void serialize_and_deserialize_arrow_test() {
+    vectorized::Block block;
+    std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>> 
cols {
+            {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false},
+            {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true},
+            {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING, false},
+            {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3, 
TYPE_DECIMAL128I, false},
+            {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11, TYPE_DATETIME, 
false},
+            {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN, false}};
+    int row_num = 7;
+    // make desc and generate block
+    TupleDescriptor tuple_desc(PTupleDescriptor(), true);
+    for (auto t : cols) {
+        TSlotDescriptor tslot;
+        std::string col_name = std::get<0>(t);
+        tslot.__set_colName(col_name);
+        TypeDescriptor type_desc(std::get<3>(t));
+        bool is_nullable(std::get<4>(t));
+        switch (std::get<3>(t)) {
+        case TYPE_BOOLEAN:
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                auto vec = vectorized::ColumnVector<UInt8>::create();
+                auto& data = vec->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    data.push_back(i % 2);
+                }
+                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeUInt8>());
+                vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
+                                                                col_name);
+                block.insert(std::move(type_and_name));
+            }
+            break;
+        case TYPE_INT:
+            tslot.__set_slotType(type_desc.to_thrift());
+            if (is_nullable) {
+                {
+                    auto column_vector_int32 = 
vectorized::ColumnVector<Int32>::create();
+                    auto column_nullable_vector =
+                            
vectorized::make_nullable(std::move(column_vector_int32));
+                    auto mutable_nullable_vector = 
std::move(*column_nullable_vector).mutate();
+                    for (int i = 0; i < row_num; i++) {
+                        mutable_nullable_vector->insert(int32(i));
+                    }
+                    auto data_type = vectorized::make_nullable(
+                            std::make_shared<vectorized::DataTypeInt32>());
+                    vectorized::ColumnWithTypeAndName type_and_name(
+                            mutable_nullable_vector->get_ptr(), data_type, 
col_name);
+                    block.insert(type_and_name);
+                }
+            } else {
+                auto vec = vectorized::ColumnVector<Int32>::create();
+                auto& data = vec->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    data.push_back(i);
+                }
+                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
+                vectorized::ColumnWithTypeAndName 
type_and_name(vec->get_ptr(), data_type,
+                                                                col_name);
+                block.insert(std::move(type_and_name));
+            }
+            break;
+        case TYPE_DECIMAL128I:
+            type_desc.precision = 27;
+            type_desc.scale = 9;
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                vectorized::DataTypePtr decimal_data_type(
+                        doris::vectorized::create_decimal(27, 9, true));
+                auto decimal_column = decimal_data_type->create_column();
+                auto& data = 
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
+                                      decimal_column.get())
+                                     ->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    __int128_t value = i * pow(10, 9) + i * pow(10, 8);
+                    data.push_back(value);
+                }
+                vectorized::ColumnWithTypeAndName 
type_and_name(decimal_column->get_ptr(),
+                                                                
decimal_data_type, col_name);
+                block.insert(type_and_name);
+            }
+            break;
+        case TYPE_STRING:
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                auto strcol = vectorized::ColumnString::create();
+                for (int i = 0; i < row_num; ++i) {
+                    std::string is = std::to_string(i);
+                    strcol->insert_data(is.c_str(), is.size());
+                }
+                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+                vectorized::ColumnWithTypeAndName 
type_and_name(strcol->get_ptr(), data_type,
+                                                                col_name);
+                block.insert(type_and_name);
+            }
+            break;
+        case TYPE_HLL:
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                vectorized::DataTypePtr 
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+                auto hll_column = hll_data_type->create_column();
+                std::vector<HyperLogLog>& container =
+                        ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    HyperLogLog hll;
+                    hll.update(i);
+                    container.push_back(hll);
+                }
+                vectorized::ColumnWithTypeAndName 
type_and_name(hll_column->get_ptr(),
+                                                                hll_data_type, 
col_name);
+
+                block.insert(type_and_name);
+            }
+            break;
+        case TYPE_DATEV2:
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                auto column_vector_date_v2 = 
vectorized::ColumnVector<vectorized::UInt32>::create();
+                auto& date_v2_data = column_vector_date_v2->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    
vectorized::DateV2Value<doris::vectorized::DateV2ValueType> value;
+                    value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+                    
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+                }
+                vectorized::DataTypePtr date_v2_type(
+                        std::make_shared<vectorized::DataTypeDateV2>());
+                vectorized::ColumnWithTypeAndName 
test_date_v2(column_vector_date_v2->get_ptr(),
+                                                               date_v2_type, 
col_name);
+                block.insert(test_date_v2);
+            }
+            break;
+        case TYPE_DATE: // int64
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                auto column_vector_date = 
vectorized::ColumnVector<vectorized::Int64>::create();
+                auto& date_data = column_vector_date->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    vectorized::VecDateTimeValue value;
+                    value.from_date_int64(20210501);
+                    
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+                }
+                vectorized::DataTypePtr 
date_type(std::make_shared<vectorized::DataTypeDate>());
+                vectorized::ColumnWithTypeAndName 
test_date(column_vector_date->get_ptr(),
+                                                            date_type, 
col_name);
+                block.insert(test_date);
+            }
+            break;
+        case TYPE_DATETIME: // int64
+            tslot.__set_slotType(type_desc.to_thrift());
+            {
+                auto column_vector_datetime = 
vectorized::ColumnVector<vectorized::Int64>::create();
+                auto& datetime_data = column_vector_datetime->get_data();
+                for (int i = 0; i < row_num; ++i) {
+                    vectorized::VecDateTimeValue value;
+                    value.from_date_int64(20210501080910);
+                    
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+                }
+                vectorized::DataTypePtr datetime_type(
+                        std::make_shared<vectorized::DataTypeDateTime>());
+                vectorized::ColumnWithTypeAndName 
test_datetime(column_vector_datetime->get_ptr(),
+                                                                datetime_type, 
col_name);
+                block.insert(test_datetime);
+            }
+            break;
+        default:
+            break;
+        }
+
+        tslot.__set_col_unique_id(std::get<2>(t));
+        SlotDescriptor* slot = new SlotDescriptor(tslot);
+        tuple_desc.add_slot(slot);
+    }
+
+    RowDescriptor row_desc(&tuple_desc, true);
+    // arrow schema
+    std::shared_ptr<arrow::Schema> _arrow_schema;
+    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
+
+    // serialize
+    std::shared_ptr<arrow::RecordBatch> result;
+    std::cout << "block structure: " << block.dump_structure() << std::endl;
+    std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << 
std::endl;
+
+    convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), 
&result);
+    Block new_block = block.clone_empty();
+    // deserialize
+    for (auto t : cols) {
+        std::string real_column_name = std::get<0>(t);
+        auto* array = result->GetColumnByName(real_column_name).get();
+        auto& column_with_type_and_name = 
new_block.get_by_name(real_column_name);
+        if (std::get<3>(t) == PrimitiveType::TYPE_DATE ||
+            std::get<3>(t) == PrimitiveType::TYPE_DATETIME) {
+            {
+                auto strcol = vectorized::ColumnString::create();
+                vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+                vectorized::ColumnWithTypeAndName 
type_and_name(strcol->get_ptr(), data_type,
+                                                                
real_column_name);
+                arrow_column_to_doris_column(array, 0, type_and_name.column, 
type_and_name.type,
+                                             block.rows(), "UTC");
+                {
+                    auto& col = 
column_with_type_and_name.column.get()->assume_mutable_ref();
+                    auto& date_data = 
static_cast<ColumnVector<Int64>&>(col).get_data();
+                    for (int i = 0; i < strcol->size(); ++i) {
+                        StringRef str = strcol->get_data_at(i);
+                        vectorized::VecDateTimeValue value;
+                        value.from_date_str(str.data, str.size);
+                        
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+                    }
+                }
+            }
+            continue;
+        } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) {
+            auto strcol = vectorized::ColumnString::create();
+            vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+            vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), 
data_type,
+                                                            real_column_name);
+            arrow_column_to_doris_column(array, 0, type_and_name.column, 
type_and_name.type,
+                                         block.rows(), "UTC");
+            {
+                auto& col = 
column_with_type_and_name.column.get()->assume_mutable_ref();
+                auto& date_data = 
static_cast<ColumnVector<UInt32>&>(col).get_data();
+                for (int i = 0; i < strcol->size(); ++i) {
+                    StringRef str = strcol->get_data_at(i);
+                    DateV2Value<DateV2ValueType> value;
+                    value.from_date_str(str.data, str.size);
+                    
date_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+                }
+            }
+            continue;
+        }
+        arrow_column_to_doris_column(array, 0, 
column_with_type_and_name.column,
+                                     column_with_type_and_name.type, 
block.rows(), "UTC");
+    }
+
+    std::cout << block.dump_data() << std::endl;
+    std::cout << new_block.dump_data() << std::endl;
+    EXPECT_EQ(block.dump_data(), new_block.dump_data());
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
+    serialize_and_deserialize_arrow_test();
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
new file mode 100644
index 0000000000..7bed95be8d
--- /dev/null
+++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
@@ -0,0 +1,200 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/types.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <math.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/hll.h"
+#include "util/bitmap_value.h"
+#include "util/quantile_state.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/serde/data_type_serde.h"
+
+namespace doris::vectorized {
+
+void column_to_pb(const DataTypePtr data_type, const IColumn& col, PValues* 
result) {
+    const DataTypeSerDeSPtr serde = data_type->get_serde();
+    serde->write_column_to_pb(col, *result, 0, col.size());
+}
+
+void pb_to_column(const DataTypePtr data_type, PValues& result, IColumn& col) {
+    auto serde = data_type->get_serde();
+    serde->read_column_from_pb(col, result);
+}
+
+void check_pb_col(const DataTypePtr data_type, const IColumn& col) {
+    PValues pv = PValues();
+    column_to_pb(data_type, col, &pv);
+    std::string s1 = pv.DebugString();
+
+    auto col1 = data_type->create_column();
+    pb_to_column(data_type, pv, *col1);
+    PValues as_pv = PValues();
+    column_to_pb(data_type, *col1, &as_pv);
+
+    std::string s2 = as_pv.DebugString();
+    EXPECT_EQ(s1, s2);
+}
+
+void serialize_and_deserialize_pb_test() {
+    // int
+    {
+        auto vec = vectorized::ColumnVector<Int32>::create();
+        auto& data = vec->get_data();
+        for (int i = 0; i < 1024; ++i) {
+            data.push_back(i);
+        }
+        vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
+        check_pb_col(data_type, *vec.get());
+    }
+    // string
+    {
+        auto strcol = vectorized::ColumnString::create();
+        for (int i = 0; i < 1024; ++i) {
+            std::string is = std::to_string(i);
+            strcol->insert_data(is.c_str(), is.size());
+        }
+        vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+        check_pb_col(data_type, *strcol.get());
+    }
+    // decimal
+    {
+        vectorized::DataTypePtr 
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
+        auto decimal_column = decimal_data_type->create_column();
+        auto& data = 
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
+                              decimal_column.get())
+                             ->get_data();
+        for (int i = 0; i < 1024; ++i) {
+            __int128_t value = i * pow(10, 9) + i * pow(10, 8);
+            data.push_back(value);
+        }
+        check_pb_col(decimal_data_type, *decimal_column.get());
+    }
+    // bitmap
+    {
+        vectorized::DataTypePtr 
bitmap_data_type(std::make_shared<vectorized::DataTypeBitMap>());
+        auto bitmap_column = bitmap_data_type->create_column();
+        std::vector<BitmapValue>& container =
+                ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data();
+        for (int i = 0; i < 4; ++i) {
+            BitmapValue bv;
+            for (int j = 0; j <= i; ++j) {
+                bv.add(j);
+            }
+            container.push_back(bv);
+        }
+        check_pb_col(bitmap_data_type, *bitmap_column.get());
+    }
+    // hll
+    {
+        vectorized::DataTypePtr 
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+        auto hll_column = hll_data_type->create_column();
+        std::vector<HyperLogLog>& container =
+                ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+        for (int i = 0; i < 4; ++i) {
+            HyperLogLog hll;
+            hll.update(i);
+            container.push_back(hll);
+        }
+        check_pb_col(hll_data_type, *hll_column.get());
+    }
+    // quantilestate
+    {
+        vectorized::DataTypePtr quantile_data_type(
+                std::make_shared<vectorized::DataTypeQuantileStateDouble>());
+        auto quantile_column = quantile_data_type->create_column();
+        std::vector<QuantileStateDouble>& container =
+                
((vectorized::ColumnQuantileStateDouble*)quantile_column.get())->get_data();
+        const long max_rand = 1000000L;
+        double lower_bound = 0;
+        double upper_bound = 100;
+        srandom(time(nullptr));
+        for (int i = 0; i < 1024; ++i) {
+            QuantileStateDouble q;
+            double random_double =
+                    lower_bound + (upper_bound - lower_bound) * (random() % 
max_rand) / max_rand;
+            q.add_value(random_double);
+            container.push_back(q);
+        }
+        check_pb_col(quantile_data_type, *quantile_column.get());
+    }
+    // nullable string
+    {
+        vectorized::DataTypePtr 
string_data_type(std::make_shared<vectorized::DataTypeString>());
+        vectorized::DataTypePtr nullable_data_type(
+                
std::make_shared<vectorized::DataTypeNullable>(string_data_type));
+        auto nullable_column = nullable_data_type->create_column();
+        
((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024);
+        check_pb_col(nullable_data_type, *nullable_column.get());
+    }
+    // nullable decimal
+    {
+        vectorized::DataTypePtr 
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
+        vectorized::DataTypePtr nullable_data_type(
+                
std::make_shared<vectorized::DataTypeNullable>(decimal_data_type));
+        auto nullable_column = nullable_data_type->create_column();
+        
((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024);
+        check_pb_col(nullable_data_type, *nullable_column.get());
+    }
+    // int with 1024 batch size
+    {
+        auto vec = vectorized::ColumnVector<Int32>::create();
+        auto& data = vec->get_data();
+        for (int i = 0; i < 1024; ++i) {
+            data.push_back(i);
+        }
+        std::cout << vec->size() << std::endl;
+        vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
+        vectorized::DataTypePtr nullable_data_type(
+                std::make_shared<vectorized::DataTypeNullable>(data_type));
+        auto nullable_column = nullable_data_type->create_column();
+        ((vectorized::ColumnNullable*)nullable_column.get())
+                ->insert_range_from_not_nullable(*vec, 0, 1024);
+        check_pb_col(nullable_data_type, *nullable_column.get());
+    }
+}
+
+TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest) {
+    serialize_and_deserialize_pb_test();
+}
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to