This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 82679824d83 [Feature](agg-state) OLAP_FIELD_TYPE_AGG_STATE support 
more serialized_type (#35628)
82679824d83 is described below

commit 82679824d8389bfde7c44f3772a80699e3727c31
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Jun 6 12:21:47 2024 +0800

    [Feature](agg-state) OLAP_FIELD_TYPE_AGG_STATE support more serialized_type 
(#35628)
    
    ## Proposed changes
    OLAP_FIELD_TYPE_AGG_STATE support more serialized_type
---
 be/src/olap/field.h                                |   2 -
 be/src/olap/rowset/segment_v2/binary_plain_page.h  |   2 +-
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 353 +++++++++------
 be/src/olap/rowset/segment_v2/column_reader.h      |  21 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp    | 479 ++++++++++-----------
 be/src/olap/rowset/segment_v2/column_writer.h      |  12 +
 be/src/olap/rowset/segment_v2/segment.cpp          |   2 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   3 +
 be/src/olap/tablet_schema.h                        |  23 +-
 be/src/olap/types.cpp                              |   2 +-
 be/src/vec/data_types/data_type_factory.cpp        |  28 +-
 be/src/vec/olap/olap_data_convertor.cpp            |  80 ++--
 be/src/vec/olap/olap_data_convertor.h              |  20 +-
 gensrc/proto/segment_v2.proto                      |   3 +
 .../diffrent_serialize/diffrent_serialize.out      |  22 +
 .../diffrent_serialize/diffrent_serialize.groovy   |  93 ++++
 16 files changed, 693 insertions(+), 452 deletions(-)

diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 6a2d407ff6c..91b54e89474 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -490,8 +490,6 @@ public:
             case FieldType::OLAP_FIELD_TYPE_CHAR:
                 return new CharField(column);
             case FieldType::OLAP_FIELD_TYPE_VARCHAR:
-            case FieldType::OLAP_FIELD_TYPE_AGG_STATE:
-                return new VarcharField(column);
             case FieldType::OLAP_FIELD_TYPE_STRING:
                 return new StringField(column);
             case FieldType::OLAP_FIELD_TYPE_STRUCT: {
diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h 
b/be/src/olap/rowset/segment_v2/binary_plain_page.h
index 674f8a278cf..b05ab4906d1 100644
--- a/be/src/olap/rowset/segment_v2/binary_plain_page.h
+++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h
@@ -69,7 +69,7 @@ public:
 
         // If the page is full, should stop adding more items.
         while (!is_page_full() && i < *count) {
-            auto src = reinterpret_cast<const Slice*>(vals);
+            const auto* src = reinterpret_cast<const Slice*>(vals);
             if constexpr (Type == FieldType::OLAP_FIELD_TYPE_OBJECT) {
                 if (_options.need_check_bitmap) {
                     RETURN_IF_ERROR(BitmapTypeCode::validate(*(src->data)));
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index a069034cd23..392917e0d83 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -24,8 +24,10 @@
 #include <memory>
 #include <ostream>
 #include <set>
+#include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/exception.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_system.h"
@@ -72,13 +74,140 @@
 #include "vec/common/schema_util.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type_agg_state.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/runtime/vdatetime_value.h" //for VecDateTime
 
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
+
+inline bool read_as_string(PrimitiveType type) {
+    return type == PrimitiveType::TYPE_STRING || type == 
PrimitiveType::INVALID_TYPE ||
+           type == PrimitiveType::TYPE_OBJECT;
+}
 
 static bvar::Adder<size_t> 
g_column_reader_memory_bytes("doris_column_reader_memory_bytes");
 static bvar::Adder<size_t> g_column_reader_num("doris_column_reader_num");
+Status ColumnReader::create_array(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                                  const io::FileReaderSPtr& file_reader,
+                                  std::unique_ptr<ColumnReader>* reader) {
+    DCHECK(meta.children_columns_size() == 2 || meta.children_columns_size() 
== 3);
+
+    std::unique_ptr<ColumnReader> item_reader;
+    RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0),
+                                         meta.children_columns(0).num_rows(), 
file_reader,
+                                         &item_reader));
+
+    std::unique_ptr<ColumnReader> offset_reader;
+    RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1),
+                                         meta.children_columns(1).num_rows(), 
file_reader,
+                                         &offset_reader));
+
+    std::unique_ptr<ColumnReader> null_reader;
+    if (meta.is_nullable()) {
+        RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2),
+                                             
meta.children_columns(2).num_rows(), file_reader,
+                                             &null_reader));
+    }
+
+    // The num rows of the array reader equals to the num rows of the length 
reader.
+    uint64_t array_num_rows = meta.children_columns(1).num_rows();
+    std::unique_ptr<ColumnReader> array_reader(
+            new ColumnReader(opts, meta, array_num_rows, file_reader));
+    //  array reader do not need to init
+    array_reader->_sub_readers.resize(meta.children_columns_size());
+    array_reader->_sub_readers[0] = std::move(item_reader);
+    array_reader->_sub_readers[1] = std::move(offset_reader);
+    if (meta.is_nullable()) {
+        array_reader->_sub_readers[2] = std::move(null_reader);
+    }
+    *reader = std::move(array_reader);
+    return Status::OK();
+}
+
+Status ColumnReader::create_map(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                                const io::FileReaderSPtr& file_reader,
+                                std::unique_ptr<ColumnReader>* reader) {
+    // map reader now has 3 sub readers for key, value, offsets(scalar), 
null(scala)
+    std::unique_ptr<ColumnReader> key_reader;
+    RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0),
+                                         meta.children_columns(0).num_rows(), 
file_reader,
+                                         &key_reader));
+    std::unique_ptr<ColumnReader> val_reader;
+    RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1),
+                                         meta.children_columns(1).num_rows(), 
file_reader,
+                                         &val_reader));
+    std::unique_ptr<ColumnReader> offset_reader;
+    RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2),
+                                         meta.children_columns(2).num_rows(), 
file_reader,
+                                         &offset_reader));
+    std::unique_ptr<ColumnReader> null_reader;
+    if (meta.is_nullable()) {
+        RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(3),
+                                             
meta.children_columns(3).num_rows(), file_reader,
+                                             &null_reader));
+    }
+
+    // The num rows of the map reader equals to the num rows of the length 
reader.
+    uint64_t map_num_rows = meta.children_columns(2).num_rows();
+    std::unique_ptr<ColumnReader> map_reader(
+            new ColumnReader(opts, meta, map_num_rows, file_reader));
+    map_reader->_sub_readers.resize(meta.children_columns_size());
+
+    map_reader->_sub_readers[0] = std::move(key_reader);
+    map_reader->_sub_readers[1] = std::move(val_reader);
+    map_reader->_sub_readers[2] = std::move(offset_reader);
+    if (meta.is_nullable()) {
+        map_reader->_sub_readers[3] = std::move(null_reader);
+    }
+    *reader = std::move(map_reader);
+    return Status::OK();
+}
+
+Status ColumnReader::create_struct(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                                   uint64_t num_rows, const 
io::FileReaderSPtr& file_reader,
+                                   std::unique_ptr<ColumnReader>* reader) {
+    // not support empty struct
+    DCHECK(meta.children_columns_size() >= 1);
+    // create struct column reader
+    std::unique_ptr<ColumnReader> struct_reader(
+            new ColumnReader(opts, meta, num_rows, file_reader));
+    struct_reader->_sub_readers.reserve(meta.children_columns_size());
+    for (size_t i = 0; i < meta.children_columns_size(); i++) {
+        std::unique_ptr<ColumnReader> sub_reader;
+        RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(i),
+                                             
meta.children_columns(i).num_rows(), file_reader,
+                                             &sub_reader));
+        struct_reader->_sub_readers.push_back(std::move(sub_reader));
+    }
+    *reader = std::move(struct_reader);
+    return Status::OK();
+}
+
+Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                                      uint64_t num_rows, const 
io::FileReaderSPtr& file_reader,
+                                      std::unique_ptr<ColumnReader>* reader) {
+    if (!meta.has_function_name()) { // meet old version ColumnMetaPB
+        std::unique_ptr<ColumnReader> reader_local(
+                new ColumnReader(opts, meta, num_rows, file_reader));
+        RETURN_IF_ERROR(reader_local->init(&meta));
+        *reader = std::move(reader_local);
+        return Status::OK();
+    }
+
+    auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(meta);
+    const auto* agg_state_type = assert_cast<const 
vectorized::DataTypeAggState*>(data_type.get());
+    auto type = 
agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;
+
+    if (read_as_string(type)) {
+        std::unique_ptr<ColumnReader> reader_local(
+                new ColumnReader(opts, meta, num_rows, file_reader));
+        RETURN_IF_ERROR(reader_local->init(&meta));
+        *reader = std::move(reader_local);
+        return Status::OK();
+    }
+
+    return Status::InternalError("Not supported");
+}
 
 Status ColumnReader::create(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
                             uint64_t num_rows, const io::FileReaderSPtr& 
file_reader,
@@ -92,92 +221,17 @@ Status ColumnReader::create(const ColumnReaderOptions& 
opts, const ColumnMetaPB&
     } else {
         auto type = (FieldType)meta.type();
         switch (type) {
+        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
+            return create_agg_state(opts, meta, num_rows, file_reader, reader);
+        }
         case FieldType::OLAP_FIELD_TYPE_STRUCT: {
-            // not support empty struct
-            DCHECK(meta.children_columns_size() >= 1);
-            // create struct column reader
-            std::unique_ptr<ColumnReader> struct_reader(
-                    new ColumnReader(opts, meta, num_rows, file_reader));
-            struct_reader->_sub_readers.reserve(meta.children_columns_size());
-            for (size_t i = 0; i < meta.children_columns_size(); i++) {
-                std::unique_ptr<ColumnReader> sub_reader;
-                RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(i),
-                                                     
meta.children_columns(i).num_rows(),
-                                                     file_reader, 
&sub_reader));
-                struct_reader->_sub_readers.push_back(std::move(sub_reader));
-            }
-            *reader = std::move(struct_reader);
-            return Status::OK();
+            return create_struct(opts, meta, num_rows, file_reader, reader);
         }
         case FieldType::OLAP_FIELD_TYPE_ARRAY: {
-            DCHECK(meta.children_columns_size() == 2 || 
meta.children_columns_size() == 3);
-
-            std::unique_ptr<ColumnReader> item_reader;
-            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(0),
-                                                 
meta.children_columns(0).num_rows(), file_reader,
-                                                 &item_reader));
-
-            std::unique_ptr<ColumnReader> offset_reader;
-            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(1),
-                                                 
meta.children_columns(1).num_rows(), file_reader,
-                                                 &offset_reader));
-
-            std::unique_ptr<ColumnReader> null_reader;
-            if (meta.is_nullable()) {
-                RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(2),
-                                                     
meta.children_columns(2).num_rows(),
-                                                     file_reader, 
&null_reader));
-            }
-
-            // The num rows of the array reader equals to the num rows of the 
length reader.
-            num_rows = meta.children_columns(1).num_rows();
-            std::unique_ptr<ColumnReader> array_reader(
-                    new ColumnReader(opts, meta, num_rows, file_reader));
-            //  array reader do not need to init
-            array_reader->_sub_readers.resize(meta.children_columns_size());
-            array_reader->_sub_readers[0] = std::move(item_reader);
-            array_reader->_sub_readers[1] = std::move(offset_reader);
-            if (meta.is_nullable()) {
-                array_reader->_sub_readers[2] = std::move(null_reader);
-            }
-            *reader = std::move(array_reader);
-            return Status::OK();
+            return create_array(opts, meta, file_reader, reader);
         }
         case FieldType::OLAP_FIELD_TYPE_MAP: {
-            // map reader now has 3 sub readers for key, value, 
offsets(scalar), null(scala)
-            std::unique_ptr<ColumnReader> key_reader;
-            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(0),
-                                                 
meta.children_columns(0).num_rows(), file_reader,
-                                                 &key_reader));
-            std::unique_ptr<ColumnReader> val_reader;
-            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(1),
-                                                 
meta.children_columns(1).num_rows(), file_reader,
-                                                 &val_reader));
-            std::unique_ptr<ColumnReader> offset_reader;
-            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(2),
-                                                 
meta.children_columns(2).num_rows(), file_reader,
-                                                 &offset_reader));
-            std::unique_ptr<ColumnReader> null_reader;
-            if (meta.is_nullable()) {
-                RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(3),
-                                                     
meta.children_columns(3).num_rows(),
-                                                     file_reader, 
&null_reader));
-            }
-
-            // The num rows of the map reader equals to the num rows of the 
length reader.
-            num_rows = meta.children_columns(2).num_rows();
-            std::unique_ptr<ColumnReader> map_reader(
-                    new ColumnReader(opts, meta, num_rows, file_reader));
-            map_reader->_sub_readers.resize(meta.children_columns_size());
-
-            map_reader->_sub_readers[0] = std::move(key_reader);
-            map_reader->_sub_readers[1] = std::move(val_reader);
-            map_reader->_sub_readers[2] = std::move(offset_reader);
-            if (meta.is_nullable()) {
-                map_reader->_sub_readers[3] = std::move(null_reader);
-            }
-            *reader = std::move(map_reader);
-            return Status::OK();
+            return create_map(opts, meta, file_reader, reader);
         }
         case FieldType::OLAP_FIELD_TYPE_VARIANT: {
             // Read variant only root data using a single ColumnReader
@@ -195,12 +249,14 @@ Status ColumnReader::create(const ColumnReaderOptions& 
opts, const ColumnMetaPB&
 }
 
 ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
-                           uint64_t num_rows, io::FileReaderSPtr file_reader)
+                           uint64_t num_rows, io::FileReaderSPtr file_reader,
+                           vectorized::DataTypePtr agg_state_ptr)
         : _use_index_page_cache(!config::disable_storage_page_cache),
           _opts(opts),
           _num_rows(num_rows),
           _file_reader(std::move(file_reader)),
-          _dict_encoding_type(UNKNOWN_DICT_ENCODING) {
+          _dict_encoding_type(UNKNOWN_DICT_ENCODING),
+          _agg_state_ptr(std::move(agg_state_ptr)) {
     _meta_length = meta.length();
     _meta_type = (FieldType)meta.type();
     if (_meta_type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
@@ -645,57 +701,17 @@ Status ColumnReader::new_iterator(ColumnIterator** 
iterator) {
     } else {
         auto type = (FieldType)_meta_type;
         switch (type) {
+        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
+            return new_agg_state_iterator(iterator);
+        }
         case FieldType::OLAP_FIELD_TYPE_STRUCT: {
-            std::vector<ColumnIterator*> sub_column_iterators;
-            size_t child_size = is_nullable() ? _sub_readers.size() - 1 : 
_sub_readers.size();
-            sub_column_iterators.reserve(child_size);
-
-            ColumnIterator* sub_column_iterator;
-            for (size_t i = 0; i < child_size; i++) {
-                
RETURN_IF_ERROR(_sub_readers[i]->new_iterator(&sub_column_iterator));
-                sub_column_iterators.push_back(sub_column_iterator);
-            }
-
-            ColumnIterator* null_iterator = nullptr;
-            if (is_nullable()) {
-                
RETURN_IF_ERROR(_sub_readers[child_size]->new_iterator(&null_iterator));
-            }
-            *iterator = new StructFileColumnIterator(this, null_iterator, 
sub_column_iterators);
-            return Status::OK();
+            return new_struct_iterator(iterator);
         }
         case FieldType::OLAP_FIELD_TYPE_ARRAY: {
-            ColumnIterator* item_iterator = nullptr;
-            RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&item_iterator));
-
-            ColumnIterator* offset_iterator = nullptr;
-            RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator));
-            OffsetFileColumnIterator* ofcIter = new OffsetFileColumnIterator(
-                    reinterpret_cast<FileColumnIterator*>(offset_iterator));
-
-            ColumnIterator* null_iterator = nullptr;
-            if (is_nullable()) {
-                RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
-            }
-            *iterator = new ArrayFileColumnIterator(this, ofcIter, 
item_iterator, null_iterator);
-            return Status::OK();
+            return new_array_iterator(iterator);
         }
         case FieldType::OLAP_FIELD_TYPE_MAP: {
-            ColumnIterator* key_iterator = nullptr;
-            RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&key_iterator));
-            ColumnIterator* val_iterator = nullptr;
-            RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&val_iterator));
-            ColumnIterator* offsets_iterator = nullptr;
-            RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&offsets_iterator));
-            OffsetFileColumnIterator* ofcIter = new OffsetFileColumnIterator(
-                    reinterpret_cast<FileColumnIterator*>(offsets_iterator));
-
-            ColumnIterator* null_iterator = nullptr;
-            if (is_nullable()) {
-                RETURN_IF_ERROR(_sub_readers[3]->new_iterator(&null_iterator));
-            }
-            *iterator = new MapFileColumnIterator(this, null_iterator, 
ofcIter, key_iterator,
-                                                  val_iterator);
-            return Status::OK();
+            return new_map_iterator(iterator);
         }
         case FieldType::OLAP_FIELD_TYPE_VARIANT: {
             *iterator = new VariantRootColumnIterator(new 
FileColumnIterator(this));
@@ -708,6 +724,78 @@ Status ColumnReader::new_iterator(ColumnIterator** 
iterator) {
     }
 }
 
+Status ColumnReader::new_agg_state_iterator(ColumnIterator** iterator) {
+    if (!_agg_state_ptr) { // meet old version ColumnMetaPB
+        *iterator = new FileColumnIterator(this);
+        return Status::OK();
+    }
+
+    const auto* agg_state_type =
+            assert_cast<const 
vectorized::DataTypeAggState*>(_agg_state_ptr.get());
+    auto type = 
agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;
+
+    if (read_as_string(type)) {
+        *iterator = new FileColumnIterator(this);
+        return Status::OK();
+    }
+
+    return Status::InternalError("Not supported");
+}
+
+Status ColumnReader::new_array_iterator(ColumnIterator** iterator) {
+    ColumnIterator* item_iterator = nullptr;
+    RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&item_iterator));
+
+    ColumnIterator* offset_iterator = nullptr;
+    RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator));
+    auto* ofcIter =
+            new 
OffsetFileColumnIterator(reinterpret_cast<FileColumnIterator*>(offset_iterator));
+
+    ColumnIterator* null_iterator = nullptr;
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
+    }
+    *iterator = new ArrayFileColumnIterator(this, ofcIter, item_iterator, 
null_iterator);
+    return Status::OK();
+}
+
+Status ColumnReader::new_map_iterator(ColumnIterator** iterator) {
+    ColumnIterator* key_iterator = nullptr;
+    RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&key_iterator));
+    ColumnIterator* val_iterator = nullptr;
+    RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&val_iterator));
+    ColumnIterator* offsets_iterator = nullptr;
+    RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&offsets_iterator));
+    auto* ofcIter =
+            new 
OffsetFileColumnIterator(reinterpret_cast<FileColumnIterator*>(offsets_iterator));
+
+    ColumnIterator* null_iterator = nullptr;
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_sub_readers[3]->new_iterator(&null_iterator));
+    }
+    *iterator = new MapFileColumnIterator(this, null_iterator, ofcIter, 
key_iterator, val_iterator);
+    return Status::OK();
+}
+
+Status ColumnReader::new_struct_iterator(ColumnIterator** iterator) {
+    std::vector<ColumnIterator*> sub_column_iterators;
+    size_t child_size = is_nullable() ? _sub_readers.size() - 1 : 
_sub_readers.size();
+    sub_column_iterators.reserve(child_size);
+
+    ColumnIterator* sub_column_iterator;
+    for (size_t i = 0; i < child_size; i++) {
+        RETURN_IF_ERROR(_sub_readers[i]->new_iterator(&sub_column_iterator));
+        sub_column_iterators.push_back(sub_column_iterator);
+    }
+
+    ColumnIterator* null_iterator = nullptr;
+    if (is_nullable()) {
+        
RETURN_IF_ERROR(_sub_readers[child_size]->new_iterator(&null_iterator));
+    }
+    *iterator = new StructFileColumnIterator(this, null_iterator, 
sub_column_iterators);
+    return Status::OK();
+}
+
 ///====================== MapFileColumnIterator 
============================////
 MapFileColumnIterator::MapFileColumnIterator(ColumnReader* reader, 
ColumnIterator* null_iterator,
                                              OffsetFileColumnIterator* 
offsets_iterator,
@@ -1568,5 +1656,4 @@ Status VariantRootColumnIterator::read_by_rowids(const 
rowid_t* rowids, const si
     return Status::OK();
 }
 
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index 94494b4d23c..30b7e3b3750 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -114,13 +114,28 @@ public:
     static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& 
meta,
                          uint64_t num_rows, const io::FileReaderSPtr& 
file_reader,
                          std::unique_ptr<ColumnReader>* reader);
-
+    static Status create_array(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                               const io::FileReaderSPtr& file_reader,
+                               std::unique_ptr<ColumnReader>* reader);
+    static Status create_map(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                             const io::FileReaderSPtr& file_reader,
+                             std::unique_ptr<ColumnReader>* reader);
+    static Status create_struct(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                                uint64_t num_rows, const io::FileReaderSPtr& 
file_reader,
+                                std::unique_ptr<ColumnReader>* reader);
+    static Status create_agg_state(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
+                                   uint64_t num_rows, const 
io::FileReaderSPtr& file_reader,
+                                   std::unique_ptr<ColumnReader>* reader);
     enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, 
ALL_DICT_ENCODING };
 
     virtual ~ColumnReader();
 
     // create a new column iterator. Client should delete returned iterator
     Status new_iterator(ColumnIterator** iterator);
+    Status new_array_iterator(ColumnIterator** iterator);
+    Status new_struct_iterator(ColumnIterator** iterator);
+    Status new_map_iterator(ColumnIterator** iterator);
+    Status new_agg_state_iterator(ColumnIterator** iterator);
     // Client should delete returned iterator
     Status new_bitmap_index_iterator(BitmapIndexIterator** iterator);
 
@@ -189,7 +204,7 @@ public:
 
 private:
     ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, 
uint64_t num_rows,
-                 io::FileReaderSPtr file_reader);
+                 io::FileReaderSPtr file_reader, vectorized::DataTypePtr 
agg_state_ptr = nullptr);
     Status init(const ColumnMetaPB* meta);
 
     // Read column inverted indexes into memory
@@ -259,6 +274,8 @@ private:
 
     std::vector<std::unique_ptr<ColumnReader>> _sub_readers;
 
+    vectorized::DataTypePtr _agg_state_ptr;
+
     DorisCallOnce<Status> _set_dict_encoding_type_once;
 };
 
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index dee0d520d1f..5b19f5669ac 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -17,11 +17,11 @@
 
 #include "olap/rowset/segment_v2/column_writer.h"
 
-#include <assert.h>
 #include <gen_cpp/segment_v2.pb.h>
 
 #include <algorithm>
 #include <filesystem>
+#include <memory>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -47,11 +47,10 @@
 #include "util/faststring.h"
 #include "util/rle_encoding.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type_agg_state.h"
+#include "vec/data_types/data_type_factory.hpp"
 
-namespace doris {
-namespace segment_v2 {
-
-using strings::Substitute;
+namespace doris::segment_v2 {
 
 class NullBitmapBuilder {
 public:
@@ -88,267 +87,230 @@ private:
     RleEncoder<bool> _rle_encoder;
 };
 
+inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts,
+                                           io::FileWriter* file_writer, 
uint32_t id) {
+    if (!opts.meta->is_nullable()) {
+        return nullptr;
+    }
+
+    FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
+    ColumnWriterOptions null_options;
+    null_options.meta = opts.meta->add_children_columns();
+    null_options.meta->set_column_id(id);
+    null_options.meta->set_unique_id(id);
+    null_options.meta->set_type(int(null_type));
+    null_options.meta->set_is_nullable(false);
+    null_options.meta->set_length(
+            
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size());
+    null_options.meta->set_encoding(DEFAULT_ENCODING);
+    null_options.meta->set_compression(opts.meta->compression());
+
+    null_options.need_zone_map = false;
+    null_options.need_bloom_filter = false;
+    null_options.need_bitmap_index = false;
+
+    TabletColumn null_column =
+            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, 
null_type, false,
+                         null_options.meta->unique_id(), 
null_options.meta->length());
+    null_column.set_name("nullable");
+    null_column.set_index_length(-1); // no short key index
+    std::unique_ptr<Field> null_field(FieldFactory::create(null_column));
+    return new ScalarColumnWriter(null_options, std::move(null_field), 
file_writer);
+}
+
+Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts,
+                                          const TabletColumn* column, 
io::FileWriter* file_writer,
+                                          std::unique_ptr<ColumnWriter>* 
writer) {
+    // not support empty struct
+    DCHECK(column->get_subtype_count() >= 1);
+    std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
+    sub_column_writers.reserve(column->get_subtype_count());
+    for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
+        const TabletColumn& sub_column = column->get_sub_column(i);
+        RETURN_IF_ERROR(sub_column.check_valid());
+
+        // create sub writer
+        ColumnWriterOptions column_options;
+        column_options.meta = opts.meta->mutable_children_columns(i);
+        column_options.need_zone_map = false;
+        column_options.need_bloom_filter = sub_column.is_bf_column();
+        column_options.need_bitmap_index = sub_column.has_bitmap_index();
+        std::unique_ptr<ColumnWriter> sub_column_writer;
+        RETURN_IF_ERROR(
+                ColumnWriter::create(column_options, &sub_column, file_writer, 
&sub_column_writer));
+        sub_column_writers.push_back(std::move(sub_column_writer));
+    }
+
+    ScalarColumnWriter* null_writer =
+            get_null_writer(opts, file_writer, column->get_subtype_count() + 
1);
+
+    *writer = std::unique_ptr<ColumnWriter>(
+            new StructColumnWriter(opts, 
std::unique_ptr<Field>(FieldFactory::create(*column)),
+                                   null_writer, sub_column_writers));
+    return Status::OK();
+}
+
+Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts,
+                                         const TabletColumn* column, 
io::FileWriter* file_writer,
+                                         std::unique_ptr<ColumnWriter>* 
writer) {
+    DCHECK(column->get_subtype_count() == 1);
+    const TabletColumn& item_column = column->get_sub_column(0);
+    RETURN_IF_ERROR(item_column.check_valid());
+
+    // create item writer
+    ColumnWriterOptions item_options;
+    item_options.meta = opts.meta->mutable_children_columns(0);
+    item_options.need_zone_map = false;
+    item_options.need_bloom_filter = item_column.is_bf_column();
+    item_options.need_bitmap_index = item_column.has_bitmap_index();
+    std::unique_ptr<ColumnWriter> item_writer;
+    RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, 
file_writer, &item_writer));
+
+    // create length writer
+    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
+
+    ColumnWriterOptions length_options;
+    length_options.meta = opts.meta->add_children_columns();
+    length_options.meta->set_column_id(2);
+    length_options.meta->set_unique_id(2);
+    length_options.meta->set_type(int(length_type));
+    length_options.meta->set_is_nullable(false);
+    length_options.meta->set_length(
+            
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size());
+    length_options.meta->set_encoding(DEFAULT_ENCODING);
+    length_options.meta->set_compression(opts.meta->compression());
+
+    length_options.need_zone_map = false;
+    length_options.need_bloom_filter = false;
+    length_options.need_bitmap_index = false;
+
+    TabletColumn length_column =
+            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, 
length_type,
+                         length_options.meta->is_nullable(), 
length_options.meta->unique_id(),
+                         length_options.meta->length());
+    length_column.set_name("length");
+    length_column.set_index_length(-1); // no short key index
+    std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
+    auto* length_writer =
+            new OffsetColumnWriter(length_options, std::move(bigint_field), 
file_writer);
+
+    ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3);
+
+    *writer = std::unique_ptr<ColumnWriter>(
+            new ArrayColumnWriter(opts, 
std::unique_ptr<Field>(FieldFactory::create(*column)),
+                                  length_writer, null_writer, 
std::move(item_writer)));
+    return Status::OK();
+}
+
+Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const 
TabletColumn* column,
+                                       io::FileWriter* file_writer,
+                                       std::unique_ptr<ColumnWriter>* writer) {
+    DCHECK(column->get_subtype_count() == 2);
+    // create key & value writer
+    std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
+    for (int i = 0; i < 2; ++i) {
+        const TabletColumn& item_column = column->get_sub_column(i);
+        RETURN_IF_ERROR(item_column.check_valid());
+
+        // create item writer
+        ColumnWriterOptions item_options;
+        item_options.meta = opts.meta->mutable_children_columns(i);
+        item_options.need_zone_map = false;
+        item_options.need_bloom_filter = item_column.is_bf_column();
+        item_options.need_bitmap_index = item_column.has_bitmap_index();
+        std::unique_ptr<ColumnWriter> item_writer;
+        RETURN_IF_ERROR(
+                ColumnWriter::create(item_options, &item_column, file_writer, 
&item_writer));
+        inner_writer_list.push_back(std::move(item_writer));
+    }
+
+    // create offset writer
+    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
+
+    // Be Cautious: column unique id is used for column reader creation
+    ColumnWriterOptions length_options;
+    length_options.meta = opts.meta->add_children_columns();
+    length_options.meta->set_column_id(column->get_subtype_count() + 1);
+    length_options.meta->set_unique_id(column->get_subtype_count() + 1);
+    length_options.meta->set_type(int(length_type));
+    length_options.meta->set_is_nullable(false);
+    length_options.meta->set_length(
+            
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size());
+    length_options.meta->set_encoding(DEFAULT_ENCODING);
+    length_options.meta->set_compression(opts.meta->compression());
+
+    length_options.need_zone_map = false;
+    length_options.need_bloom_filter = false;
+    length_options.need_bitmap_index = false;
+
+    TabletColumn length_column =
+            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, 
length_type,
+                         length_options.meta->is_nullable(), 
length_options.meta->unique_id(),
+                         length_options.meta->length());
+    length_column.set_name("length");
+    length_column.set_index_length(-1); // no short key index
+    std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
+    auto* length_writer =
+            new OffsetColumnWriter(length_options, std::move(bigint_field), 
file_writer);
+
+    ScalarColumnWriter* null_writer =
+            get_null_writer(opts, file_writer, column->get_subtype_count() + 
2);
+
+    *writer = std::unique_ptr<ColumnWriter>(
+            new MapColumnWriter(opts, 
std::unique_ptr<Field>(FieldFactory::create(*column)),
+                                null_writer, length_writer, 
inner_writer_list));
+
+    return Status::OK();
+}
+
+Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
+                                             const TabletColumn* column,
+                                             io::FileWriter* file_writer,
+                                             std::unique_ptr<ColumnWriter>* 
writer) {
+    auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(*column);
+    const auto* agg_state_type = assert_cast<const 
vectorized::DataTypeAggState*>(data_type.get());
+    auto type = 
agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;
+    if (type == PrimitiveType::TYPE_STRING || type == 
PrimitiveType::INVALID_TYPE ||
+        type == PrimitiveType::TYPE_OBJECT) {
+        *writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
+                opts, std::unique_ptr<Field>(FieldFactory::create(*column)), 
file_writer));
+    } else if (type == PrimitiveType::TYPE_ARRAY) {
+        RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, 
writer));
+    } else if (type == PrimitiveType::TYPE_MAP) {
+        RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
+    } else {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
+                        agg_state_type->get_name());
+    }
+    return Status::OK();
+}
+
 //Todo(Amory): here should according nullable and offset and need sub to 
simply this function
 Status ColumnWriter::create(const ColumnWriterOptions& opts, const 
TabletColumn* column,
                             io::FileWriter* file_writer, 
std::unique_ptr<ColumnWriter>* writer) {
     std::unique_ptr<Field> field(FieldFactory::create(*column));
     DCHECK(field.get() != nullptr);
     if (is_scalar_type(column->type())) {
-        std::unique_ptr<ColumnWriter> writer_local = 
std::unique_ptr<ColumnWriter>(
+        *writer = std::unique_ptr<ColumnWriter>(
                 new ScalarColumnWriter(opts, std::move(field), file_writer));
-        *writer = std::move(writer_local);
         return Status::OK();
     } else {
         switch (column->type()) {
+        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
+            RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, 
writer));
+            return Status::OK();
+        }
         case FieldType::OLAP_FIELD_TYPE_STRUCT: {
-            // not support empty struct
-            DCHECK(column->get_subtype_count() >= 1);
-            std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
-            sub_column_writers.reserve(column->get_subtype_count());
-            for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
-                const TabletColumn& sub_column = column->get_sub_column(i);
-
-                // create sub writer
-                ColumnWriterOptions column_options;
-                column_options.meta = opts.meta->mutable_children_columns(i);
-                column_options.need_zone_map = false;
-                column_options.need_bloom_filter = sub_column.is_bf_column();
-                column_options.need_bitmap_index = 
sub_column.has_bitmap_index();
-                if (sub_column.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
-                    if (column_options.need_bloom_filter) {
-                        return Status::NotSupported("Do not support bloom 
filter for struct type");
-                    }
-                    if (column_options.need_bitmap_index) {
-                        return Status::NotSupported("Do not support bitmap 
index for struct type");
-                    }
-                }
-                if (sub_column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
-                    if (column_options.need_bloom_filter) {
-                        return Status::NotSupported("Do not support bloom 
filter for array type");
-                    }
-                    if (column_options.need_bitmap_index) {
-                        return Status::NotSupported("Do not support bitmap 
index for array type");
-                    }
-                }
-                std::unique_ptr<ColumnWriter> sub_column_writer;
-                RETURN_IF_ERROR(ColumnWriter::create(column_options, 
&sub_column, file_writer,
-                                                     &sub_column_writer));
-                sub_column_writers.push_back(std::move(sub_column_writer));
-            }
-
-            // if nullable, create null writer
-            ScalarColumnWriter* null_writer = nullptr;
-            if (opts.meta->is_nullable()) {
-                FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
-                ColumnWriterOptions null_options;
-                null_options.meta = opts.meta->add_children_columns();
-                null_options.meta->set_column_id(column->get_subtype_count() + 
1);
-                null_options.meta->set_unique_id(column->get_subtype_count() + 
1);
-                null_options.meta->set_type(int(null_type));
-                null_options.meta->set_is_nullable(false);
-                null_options.meta->set_length(
-                        
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size());
-                null_options.meta->set_encoding(DEFAULT_ENCODING);
-                null_options.meta->set_compression(opts.meta->compression());
-
-                null_options.need_zone_map = false;
-                null_options.need_bloom_filter = false;
-                null_options.need_bitmap_index = false;
-
-                TabletColumn null_column =
-                        
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type,
-                                     null_options.meta->is_nullable(),
-                                     null_options.meta->unique_id(), 
null_options.meta->length());
-                null_column.set_name("nullable");
-                null_column.set_index_length(-1); // no short key index
-                std::unique_ptr<Field> 
null_field(FieldFactory::create(null_column));
-                null_writer =
-                        new ScalarColumnWriter(null_options, 
std::move(null_field), file_writer);
-            }
-
-            std::unique_ptr<ColumnWriter> writer_local =
-                    std::unique_ptr<ColumnWriter>(new StructColumnWriter(
-                            opts, std::move(field), null_writer, 
sub_column_writers));
-            *writer = std::move(writer_local);
+            RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, 
writer));
             return Status::OK();
         }
         case FieldType::OLAP_FIELD_TYPE_ARRAY: {
-            DCHECK(column->get_subtype_count() == 1);
-            const TabletColumn& item_column = column->get_sub_column(0);
-
-            // create item writer
-            ColumnWriterOptions item_options;
-            item_options.meta = opts.meta->mutable_children_columns(0);
-            item_options.need_zone_map = false;
-            item_options.need_bloom_filter = item_column.is_bf_column();
-            item_options.need_bitmap_index = item_column.has_bitmap_index();
-            if (item_column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
-                if (item_options.need_bloom_filter) {
-                    return Status::NotSupported("Do not support bloom filter 
for array type");
-                }
-                if (item_options.need_bitmap_index) {
-                    return Status::NotSupported("Do not support bitmap index 
for array type");
-                }
-            }
-            std::unique_ptr<ColumnWriter> item_writer;
-            RETURN_IF_ERROR(
-                    ColumnWriter::create(item_options, &item_column, 
file_writer, &item_writer));
-
-            // create length writer
-            FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
-
-            ColumnWriterOptions length_options;
-            length_options.meta = opts.meta->add_children_columns();
-            length_options.meta->set_column_id(2);
-            length_options.meta->set_unique_id(2);
-            length_options.meta->set_type(int(length_type));
-            length_options.meta->set_is_nullable(false);
-            length_options.meta->set_length(
-                    
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size());
-            length_options.meta->set_encoding(DEFAULT_ENCODING);
-            length_options.meta->set_compression(opts.meta->compression());
-
-            length_options.need_zone_map = false;
-            length_options.need_bloom_filter = false;
-            length_options.need_bitmap_index = false;
-
-            TabletColumn length_column =
-                    
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
-                                 length_options.meta->is_nullable(),
-                                 length_options.meta->unique_id(), 
length_options.meta->length());
-            length_column.set_name("length");
-            length_column.set_index_length(-1); // no short key index
-            std::unique_ptr<Field> 
bigint_field(FieldFactory::create(length_column));
-            auto* length_writer =
-                    new OffsetColumnWriter(length_options, 
std::move(bigint_field), file_writer);
-
-            // if nullable, create null writer
-            ScalarColumnWriter* null_writer = nullptr;
-            if (opts.meta->is_nullable()) {
-                FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
-                ColumnWriterOptions null_options;
-                null_options.meta = opts.meta->add_children_columns();
-                null_options.meta->set_column_id(3);
-                null_options.meta->set_unique_id(3);
-                null_options.meta->set_type(int(null_type));
-                null_options.meta->set_is_nullable(false);
-                null_options.meta->set_length(
-                        
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size());
-                null_options.meta->set_encoding(DEFAULT_ENCODING);
-                null_options.meta->set_compression(opts.meta->compression());
-
-                null_options.need_zone_map = false;
-                null_options.need_bloom_filter = false;
-                null_options.need_bitmap_index = false;
-
-                TabletColumn null_column =
-                        
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type,
-                                     length_options.meta->is_nullable(),
-                                     null_options.meta->unique_id(), 
null_options.meta->length());
-                null_column.set_name("nullable");
-                null_column.set_index_length(-1); // no short key index
-                std::unique_ptr<Field> 
null_field(FieldFactory::create(null_column));
-                null_writer =
-                        new ScalarColumnWriter(null_options, 
std::move(null_field), file_writer);
-            }
-
-            std::unique_ptr<ColumnWriter> writer_local = 
std::unique_ptr<ColumnWriter>(
-                    new ArrayColumnWriter(opts, std::move(field), 
length_writer, null_writer,
-                                          std::move(item_writer)));
-            *writer = std::move(writer_local);
+            RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, 
writer));
             return Status::OK();
         }
         case FieldType::OLAP_FIELD_TYPE_MAP: {
-            DCHECK(column->get_subtype_count() == 2);
-            // create key & value writer
-            std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
-            for (int i = 0; i < 2; ++i) {
-                const TabletColumn& item_column = column->get_sub_column(i);
-                // create item writer
-                ColumnWriterOptions item_options;
-                item_options.meta = opts.meta->mutable_children_columns(i);
-                item_options.need_zone_map = false;
-                item_options.need_bloom_filter = item_column.is_bf_column();
-                item_options.need_bitmap_index = 
item_column.has_bitmap_index();
-                if (item_column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
-                    if (item_options.need_bloom_filter) {
-                        return Status::NotSupported("Do not support bloom 
filter for map type");
-                    }
-                    if (item_options.need_bitmap_index) {
-                        return Status::NotSupported("Do not support bitmap 
index for map type");
-                    }
-                }
-                std::unique_ptr<ColumnWriter> item_writer;
-                RETURN_IF_ERROR(ColumnWriter::create(item_options, 
&item_column, file_writer,
-                                                     &item_writer));
-                inner_writer_list.push_back(std::move(item_writer));
-            }
-
-            ScalarColumnWriter* null_writer = nullptr;
-            // create offset writer
-            FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
-
-            // Be Cautious: column unique id is used for column reader creation
-            ColumnWriterOptions length_options;
-            length_options.meta = opts.meta->add_children_columns();
-            length_options.meta->set_column_id(column->get_subtype_count() + 
1);
-            length_options.meta->set_unique_id(column->get_subtype_count() + 
1);
-            length_options.meta->set_type(int(length_type));
-            length_options.meta->set_is_nullable(false);
-            length_options.meta->set_length(
-                    
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size());
-            length_options.meta->set_encoding(DEFAULT_ENCODING);
-            length_options.meta->set_compression(opts.meta->compression());
-
-            length_options.need_zone_map = false;
-            length_options.need_bloom_filter = false;
-            length_options.need_bitmap_index = false;
-
-            TabletColumn length_column =
-                    
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
-                                 length_options.meta->is_nullable(),
-                                 length_options.meta->unique_id(), 
length_options.meta->length());
-            length_column.set_name("length");
-            length_column.set_index_length(-1); // no short key index
-            std::unique_ptr<Field> 
bigint_field(FieldFactory::create(length_column));
-            auto* length_writer =
-                    new OffsetColumnWriter(length_options, 
std::move(bigint_field), file_writer);
-
-            // create null writer
-            if (opts.meta->is_nullable()) {
-                FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
-                ColumnWriterOptions null_options;
-                null_options.meta = opts.meta->add_children_columns();
-                null_options.meta->set_column_id(column->get_subtype_count() + 
2);
-                null_options.meta->set_unique_id(column->get_subtype_count() + 
2);
-                null_options.meta->set_type(int(null_type));
-                null_options.meta->set_is_nullable(false);
-                null_options.meta->set_length(
-                        
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size());
-                null_options.meta->set_encoding(DEFAULT_ENCODING);
-                null_options.meta->set_compression(opts.meta->compression());
-
-                null_options.need_zone_map = false;
-                null_options.need_bloom_filter = false;
-                null_options.need_bitmap_index = false;
-
-                TabletColumn null_column = TabletColumn(
-                        FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, 
null_type, false,
-                        null_options.meta->unique_id(), 
null_options.meta->length());
-                null_column.set_name("nullable");
-                null_column.set_index_length(-1); // no short key index
-                std::unique_ptr<Field> 
null_field(FieldFactory::create(null_column));
-                null_writer =
-                        new ScalarColumnWriter(null_options, 
std::move(null_field), file_writer);
-            }
-
-            // create map writer
-            std::unique_ptr<ColumnWriter> sub_column_writer;
-            std::unique_ptr<ColumnWriter> writer_local =
-                    std::unique_ptr<ColumnWriter>(new MapColumnWriter(
-                            opts, std::move(field), null_writer, 
length_writer, inner_writer_list));
-
-            *writer = std::move(writer_local);
+            RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, 
writer));
             return Status::OK();
         }
         case FieldType::OLAP_FIELD_TYPE_VARIANT: {
@@ -367,7 +329,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
 
 Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* 
data,
                                      size_t num_rows) {
-    const uint8_t* ptr = (const uint8_t*)data;
+    const auto* ptr = (const uint8_t*)data;
     BitmapIterator null_iter(is_null_bits, num_rows);
     bool is_null = false;
     size_t this_run = 0;
@@ -468,10 +430,10 @@ Status ScalarColumnWriter::init() {
     DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
     _page_builder.reset(page_builder);
     // create ordinal builder
-    _ordinal_index_builder.reset(new OrdinalIndexWriter());
+    _ordinal_index_builder = std::make_unique<OrdinalIndexWriter>();
     // create null bitmap builder
     if (is_nullable()) {
-        _null_bitmap_builder.reset(new NullBitmapBuilder());
+        _null_bitmap_builder = std::make_unique<NullBitmapBuilder>();
     }
     if (_opts.need_zone_map) {
         RETURN_IF_ERROR(ZoneMapIndexWriter::create(get_field(), 
_zone_map_index_builder));
@@ -734,7 +696,7 @@ Status ScalarColumnWriter::finish_current_page() {
     std::unique_ptr<Page> page(new Page());
     page->footer.set_type(DATA_PAGE);
     page->footer.set_uncompressed_size(Slice::compute_total_size(body));
-    auto data_page_footer = page->footer.mutable_data_page_footer();
+    auto* data_page_footer = page->footer.mutable_data_page_footer();
     data_page_footer->set_first_ordinal(_first_rowid);
     data_page_footer->set_num_values(_next_rowid - _first_rowid);
     data_page_footer->set_nullmap_size(nullmap.slice().size);
@@ -855,7 +817,7 @@ Status StructColumnWriter::append_nullable(const uint8_t* 
null_map, const uint8_
 }
 
 Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
-    auto results = reinterpret_cast<const uint64_t*>(*ptr);
+    const auto* results = reinterpret_cast<const uint64_t*>(*ptr);
     for (size_t i = 0; i < _num_sub_column_writers; ++i) {
         auto nullmap = *(results + _num_sub_column_writers + i);
         auto data = *(results + i);
@@ -922,8 +884,6 @@ Status StructColumnWriter::finish_current_page() {
     return Status::NotSupported("struct writer has no data, can not 
finish_current_page");
 }
 
-////////////////////////////////////////////////////////////////////////////////
-
 ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
                                      OffsetColumnWriter* offset_writer,
                                      ScalarColumnWriter* null_writer,
@@ -944,7 +904,7 @@ Status ArrayColumnWriter::init() {
     }
     RETURN_IF_ERROR(_item_writer->init());
     if (_opts.need_inverted_index) {
-        auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
+        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
         if (writer != nullptr) {
             RETURN_IF_ERROR(InvertedIndexColumnWriter::create(get_field(), 
&_inverted_index_builder,
                                                               
_opts.inverted_index_file_writer,
@@ -985,7 +945,7 @@ Status ArrayColumnWriter::append_data(const uint8_t** ptr, 
size_t num_rows) {
                                              reinterpret_cast<const 
void*>(data), element_cnt));
     }
     if (_opts.need_inverted_index) {
-        auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
+        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
         // now only support nested type is scala
         if (writer != nullptr) {
             //NOTE: use array field name as index field, but item_writer size 
should be used when moving item_data_ptr
@@ -1217,5 +1177,4 @@ size_t MapColumnWriter::get_inverted_index_size() {
     return 0;
 }
 
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index 1f60b006e58..410ae3eb768 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -89,6 +89,18 @@ class ColumnWriter {
 public:
     static Status create(const ColumnWriterOptions& opts, const TabletColumn* 
column,
                          io::FileWriter* file_writer, 
std::unique_ptr<ColumnWriter>* writer);
+    static Status create_struct_writer(const ColumnWriterOptions& opts, const 
TabletColumn* column,
+                                       io::FileWriter* file_writer,
+                                       std::unique_ptr<ColumnWriter>* writer);
+    static Status create_array_writer(const ColumnWriterOptions& opts, const 
TabletColumn* column,
+                                      io::FileWriter* file_writer,
+                                      std::unique_ptr<ColumnWriter>* writer);
+    static Status create_map_writer(const ColumnWriterOptions& opts, const 
TabletColumn* column,
+                                    io::FileWriter* file_writer,
+                                    std::unique_ptr<ColumnWriter>* writer);
+    static Status create_agg_state_writer(const ColumnWriterOptions& opts,
+                                          const TabletColumn* column, 
io::FileWriter* file_writer,
+                                          std::unique_ptr<ColumnWriter>* 
writer);
 
     explicit ColumnWriter(std::unique_ptr<Field> field, bool is_nullable)
             : _field(std::move(field)), _is_nullable(is_nullable) {}
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index f6a74112256..841c4403e9c 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -636,7 +636,7 @@ Status Segment::new_column_iterator(const TabletColumn& 
tablet_column,
         LOG(WARNING) << "different type between schema and column reader,"
                      << " column schema name: " << tablet_column.name()
                      << " column schema type: " << int(tablet_column.type())
-                     << " column reader meta type"
+                     << " column reader meta type: "
                      << 
int(_column_readers.at(tablet_column.unique_id())->get_meta_type());
         return Status::InternalError("different type between schema and column 
reader");
     }
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 3cb08bd0138..b7fa5aea794 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -172,6 +172,9 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, 
uint32_t column_id,
     for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
         init_column_meta(meta->add_sparse_columns(), -1, 
column.sparse_column_at(i), tablet_schema);
     }
+
+    meta->set_result_is_nullable(column.get_result_is_nullable());
+    meta->set_function_name(column.get_aggregation_name());
 }
 
 Status SegmentWriter::init() {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index f5c4e927ca6..a355f99d23d 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -22,8 +22,6 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <gen_cpp/segment_v2.pb.h>
 #include <parallel_hashmap/phmap.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <map>
 #include <memory>
@@ -170,6 +168,23 @@ public:
     const std::vector<TabletColumnPtr>& sparse_columns() const;
     size_t num_sparse_columns() const { return _num_sparse_columns; }
 
+    Status check_valid() const {
+        if (type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
+            type() != FieldType::OLAP_FIELD_TYPE_STRUCT &&
+            type() != FieldType::OLAP_FIELD_TYPE_MAP) {
+            return Status::OK();
+        }
+        if (is_bf_column()) {
+            return Status::NotSupported("Do not support bloom filter index, 
type={}",
+                                        get_string_by_field_type(type()));
+        }
+        if (has_bitmap_index()) {
+            return Status::NotSupported("Do not support bitmap index, type={}",
+                                        get_string_by_field_type(type()));
+        }
+        return Status::OK();
+    }
+
 private:
     int32_t _unique_id = -1;
     std::string _col_name;
@@ -235,14 +250,14 @@ public:
     const vector<int32_t>& col_unique_ids() const { return _col_unique_ids; }
     const std::map<string, string>& properties() const { return _properties; }
     int32_t get_gram_size() const {
-        if (_properties.count("gram_size")) {
+        if (_properties.contains("gram_size")) {
             return std::stoi(_properties.at("gram_size"));
         }
 
         return 0;
     }
     int32_t get_gram_bf_size() const {
-        if (_properties.count("bf_size")) {
+        if (_properties.contains("bf_size")) {
             return std::stoi(_properties.at("bf_size"));
         }
 
diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp
index b9a072cc5df..771223e2ca5 100644
--- a/be/src/olap/types.cpp
+++ b/be/src/olap/types.cpp
@@ -36,6 +36,7 @@ bool is_scalar_type(FieldType field_type) {
     case FieldType::OLAP_FIELD_TYPE_ARRAY:
     case FieldType::OLAP_FIELD_TYPE_MAP:
     case FieldType::OLAP_FIELD_TYPE_VARIANT:
+    case FieldType::OLAP_FIELD_TYPE_AGG_STATE:
         return false;
     default:
         return true;
@@ -50,7 +51,6 @@ bool is_olap_string_type(FieldType field_type) {
     case FieldType::OLAP_FIELD_TYPE_OBJECT:
     case FieldType::OLAP_FIELD_TYPE_STRING:
     case FieldType::OLAP_FIELD_TYPE_JSONB:
-    case FieldType::OLAP_FIELD_TYPE_AGG_STATE:
         return true;
     default:
         return false;
diff --git a/be/src/vec/data_types/data_type_factory.cpp 
b/be/src/vec/data_types/data_type_factory.cpp
index 9c40588ed31..8246977c6b0 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -73,7 +73,15 @@ DataTypePtr DataTypeFactory::create_data_type(const 
doris::Field& col_desc) {
 
 DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc, 
bool is_nullable) {
     DataTypePtr nested = nullptr;
-    if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+    if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_AGG_STATE) {
+        DataTypes dataTypes;
+        for (size_t i = 0; i < col_desc.get_subtype_count(); i++) {
+            dataTypes.push_back(
+                    
DataTypeFactory::instance().create_data_type(col_desc.get_sub_column(i)));
+        }
+        nested = std::make_shared<vectorized::DataTypeAggState>(
+                dataTypes, col_desc.get_result_is_nullable(), 
col_desc.get_aggregation_name());
+    } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
         DCHECK(col_desc.get_subtype_count() == 1);
         nested = 
std::make_shared<DataTypeArray>(create_data_type(col_desc.get_sub_column(0)));
     } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_MAP) {
@@ -586,7 +594,14 @@ DataTypePtr DataTypeFactory::create_data_type(const 
PColumnMeta& pcolumn) {
 
 DataTypePtr DataTypeFactory::create_data_type(const segment_v2::ColumnMetaPB& 
pcolumn) {
     DataTypePtr nested = nullptr;
-    if (pcolumn.type() == static_cast<int>(FieldType::OLAP_FIELD_TYPE_ARRAY)) {
+    if (pcolumn.type() == 
static_cast<int>(FieldType::OLAP_FIELD_TYPE_AGG_STATE)) {
+        DataTypes data_types;
+        for (auto child : pcolumn.children_columns()) {
+            
data_types.push_back(DataTypeFactory::instance().create_data_type(child));
+        }
+        nested = std::make_shared<vectorized::DataTypeAggState>(
+                data_types, pcolumn.result_is_nullable(), 
pcolumn.function_name());
+    } else if (pcolumn.type() == 
static_cast<int>(FieldType::OLAP_FIELD_TYPE_ARRAY)) {
         // Item subcolumn and length subcolumn, for sparse columns only 
subcolumn
         DCHECK_GE(pcolumn.children_columns().size(), 1) << 
pcolumn.DebugString();
         nested = 
std::make_shared<DataTypeArray>(create_data_type(pcolumn.children_columns(0)));
@@ -598,13 +613,10 @@ DataTypePtr DataTypeFactory::create_data_type(const 
segment_v2::ColumnMetaPB& pc
     } else if (pcolumn.type() == 
static_cast<int>(FieldType::OLAP_FIELD_TYPE_STRUCT)) {
         DCHECK_GE(pcolumn.children_columns().size(), 1);
         size_t col_size = pcolumn.children_columns().size();
-        DataTypes dataTypes;
-        Strings names;
-        dataTypes.reserve(col_size);
-        names.reserve(col_size);
+        DataTypes dataTypes(col_size);
+        Strings names(col_size);
         for (size_t i = 0; i < col_size; i++) {
-            dataTypes.push_back(create_data_type(pcolumn.children_columns(i)));
-            names.push_back("");
+            dataTypes[i] = create_data_type(pcolumn.children_columns(i));
         }
         nested = std::make_shared<DataTypeStruct>(dataTypes, names);
     } else {
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index 86c1d2d6669..7c1010f743e 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -45,7 +45,6 @@
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_agg_state.h"
 #include "vec/data_types/data_type_array.h"
-#include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_struct.h"
@@ -74,6 +73,42 @@ void OlapBlockDataConvertor::add_column_data_convertor(const 
TabletColumn& colum
     _convertors.emplace_back(create_olap_column_data_convertor(column));
 }
 
+OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr
+OlapBlockDataConvertor::create_map_convertor(const TabletColumn& column) {
+    const auto& key_column = column.get_sub_column(0);
+    const auto& value_column = column.get_sub_column(1);
+    return std::make_unique<OlapColumnDataConvertorMap>(key_column, 
value_column);
+}
+
+OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr
+OlapBlockDataConvertor::create_array_convertor(const TabletColumn& column) {
+    const auto& sub_column = column.get_sub_column(0);
+    return std::make_unique<OlapColumnDataConvertorArray>(
+            create_olap_column_data_convertor(sub_column));
+}
+
+OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr
+OlapBlockDataConvertor::create_agg_state_convertor(const TabletColumn& column) 
{
+    auto data_type = DataTypeFactory::instance().create_data_type(column);
+    const auto* agg_state_type = assert_cast<const 
vectorized::DataTypeAggState*>(data_type.get());
+    auto type = 
agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;
+
+    // Terialized type of most functions is string, and some of them are fixed 
object.
+    // Finally, the serialized type of some special functions is 
bitmap/array/map...
+    if (type == PrimitiveType::TYPE_STRING) {
+        return std::make_unique<OlapColumnDataConvertorVarChar>(false);
+    } else if (type == PrimitiveType::TYPE_OBJECT) {
+        return std::make_unique<OlapColumnDataConvertorBitMap>();
+    } else if (type == PrimitiveType::INVALID_TYPE) {
+        // INVALID_TYPE means function's serialized type is fixed object
+        return std::make_unique<OlapColumnDataConvertorAggState>();
+    } else {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
+                        agg_state_type->get_name());
+    }
+}
+
 OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr
 OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& 
column) {
     switch (column.type()) {
@@ -84,18 +119,7 @@ 
OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
         return std::make_unique<OlapColumnDataConvertorQuantileState>();
     }
     case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
-        DataTypes dataTypes;
-        for (size_t i = 0; i < column.get_subtype_count(); i++) {
-            dataTypes.push_back(
-                    
DataTypeFactory::instance().create_data_type(column.get_sub_column(i)));
-        }
-        auto agg_state_type = std::make_shared<vectorized::DataTypeAggState>(
-                dataTypes, column.get_result_is_nullable(), 
column.get_aggregation_name());
-        if 
(agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type ==
-            TYPE_STRING) {
-            return std::make_unique<OlapColumnDataConvertorVarChar>(false);
-        }
-        return std::make_unique<OlapColumnDataConvertorAggState>();
+        return create_agg_state_convertor(column);
     }
     case FieldType::OLAP_FIELD_TYPE_HLL: {
         return std::make_unique<OlapColumnDataConvertorHLL>();
@@ -181,23 +205,17 @@ 
OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
         return std::make_unique<OlapColumnDataConvertorStruct>(sub_convertors);
     }
     case FieldType::OLAP_FIELD_TYPE_ARRAY: {
-        const auto& sub_column = column.get_sub_column(0);
-        return std::make_unique<OlapColumnDataConvertorArray>(
-                create_olap_column_data_convertor(sub_column));
+        return create_array_convertor(column);
     }
     case FieldType::OLAP_FIELD_TYPE_MAP: {
-        const auto& key_column = column.get_sub_column(0);
-        const auto& value_column = column.get_sub_column(1);
-        return std::make_unique<OlapColumnDataConvertorMap>(
-                create_olap_column_data_convertor(key_column),
-                create_olap_column_data_convertor(value_column));
+        return create_map_convertor(column);
     }
     default: {
-        DCHECK(false) << "Invalid type in olap data convertor:" << 
int(column.type());
-        return nullptr;
+        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid type in olap data 
convertor: {}",
+                        int(column.type()));
     }
     }
-} // namespace doris::vectorized
+}
 
 void OlapBlockDataConvertor::set_source_content(const vectorized::Block* 
block, size_t row_pos,
                                                 size_t num_rows) {
@@ -992,26 +1010,20 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
 
 Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap() {
     const ColumnMap* column_map = nullptr;
-    const DataTypeMap* data_type_map = nullptr;
     if (_nullmap) {
         const auto* nullable_column =
                 assert_cast<const ColumnNullable*>(_typed_column.column.get());
         column_map = assert_cast<const 
ColumnMap*>(nullable_column->get_nested_column_ptr().get());
-        data_type_map = assert_cast<const DataTypeMap*>(
-                (assert_cast<const 
DataTypeNullable*>(_typed_column.type.get())->get_nested_type())
-                        .get());
     } else {
         column_map = assert_cast<const ColumnMap*>(_typed_column.column.get());
-        data_type_map = assert_cast<const 
DataTypeMap*>(_typed_column.type.get());
     }
     assert(column_map);
-    assert(data_type_map);
 
-    return convert_to_olap(column_map, data_type_map);
+    return convert_to_olap(column_map);
 }
 
 Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
-        const ColumnMap* column_map, const DataTypeMap* data_type_map) {
+        const ColumnMap* column_map) {
     ColumnPtr key_data = column_map->get_keys_ptr();
     ColumnPtr value_data = column_map->get_values_ptr();
 
@@ -1045,11 +1057,11 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
         _offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset 
+ _base_offset);
     }
     _base_offset += elem_size;
-    ColumnWithTypeAndName key_typed_column = {key_data, 
data_type_map->get_key_type(), "map.key"};
+    ColumnWithTypeAndName key_typed_column = {key_data, 
_data_type.get_key_type(), "map.key"};
     _key_convertor->set_source_column(key_typed_column, start_offset, 
elem_size);
     RETURN_IF_ERROR(_key_convertor->convert_to_olap());
 
-    ColumnWithTypeAndName value_typed_column = {value_data, 
data_type_map->get_value_type(),
+    ColumnWithTypeAndName value_typed_column = {value_data, 
_data_type.get_value_type(),
                                                 "map.value"};
     _value_convertor->set_source_column(value_typed_column, start_offset, 
elem_size);
     RETURN_IF_ERROR(_value_convertor->convert_to_olap());
diff --git a/be/src/vec/olap/olap_data_convertor.h 
b/be/src/vec/olap/olap_data_convertor.h
index 0ec720fcdc1..d05485e2bc5 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -44,6 +44,8 @@
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_object.h"
 
 namespace doris {
@@ -91,7 +93,11 @@ private:
     using OlapColumnDataConvertorBaseUPtr = 
std::unique_ptr<OlapColumnDataConvertorBase>;
     using OlapColumnDataConvertorBaseSPtr = 
std::shared_ptr<OlapColumnDataConvertorBase>;
 
-    OlapColumnDataConvertorBaseUPtr create_olap_column_data_convertor(const 
TabletColumn& column);
+    static OlapColumnDataConvertorBaseUPtr create_olap_column_data_convertor(
+            const TabletColumn& column);
+    static OlapColumnDataConvertorBaseUPtr create_map_convertor(const 
TabletColumn& column);
+    static OlapColumnDataConvertorBaseUPtr create_array_convertor(const 
TabletColumn& column);
+    static OlapColumnDataConvertorBaseUPtr create_agg_state_convertor(const 
TabletColumn& column);
 
     // accessors for different data types;
     class OlapColumnDataConvertorBase : public IOlapColumnDataAccessor {
@@ -461,10 +467,11 @@ private:
 
     class OlapColumnDataConvertorMap : public OlapColumnDataConvertorBase {
     public:
-        OlapColumnDataConvertorMap(OlapColumnDataConvertorBaseUPtr 
key_convertor,
-                                   OlapColumnDataConvertorBaseUPtr 
value_convertor)
-                : _key_convertor(std::move(key_convertor)),
-                  _value_convertor(std::move(value_convertor)) {
+        OlapColumnDataConvertorMap(const TabletColumn& key_column, const 
TabletColumn& value_column)
+                : 
_key_convertor(create_olap_column_data_convertor(key_column)),
+                  
_value_convertor(create_olap_column_data_convertor(value_column)),
+                  
_data_type(DataTypeFactory::instance().create_data_type(key_column),
+                             
DataTypeFactory::instance().create_data_type(value_column)) {
             _base_offset = 0;
             _results.resize(6); // size + offset + k_data + v_data +  
k_nullmap + v_nullmap
         }
@@ -477,12 +484,13 @@ private:
         };
 
     private:
-        Status convert_to_olap(const ColumnMap* column_map, const DataTypeMap* 
data_type_map);
+        Status convert_to_olap(const ColumnMap* column_map);
         OlapColumnDataConvertorBaseUPtr _key_convertor;
         OlapColumnDataConvertorBaseUPtr _value_convertor;
         std::vector<const void*> _results;
         PaddedPODArray<UInt64> _offsets; // map offsets in disk layout
         UInt64 _base_offset;
+        DataTypeMap _data_type;
     }; //OlapColumnDataConvertorMap
 
     class OlapColumnDataConvertorVariant : public OlapColumnDataConvertorBase {
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index ad0002697dc..ee82a5b5f1a 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -193,6 +193,9 @@ message ColumnMetaPB {
     optional int32 frac = 16; // ColumnMessag
 
     repeated ColumnMetaPB sparse_columns = 17; // sparse column within a 
variant column
+
+    optional bool result_is_nullable = 18; // used on agg_state type
+    optional string function_name = 19; // used on agg_state type
 }
 
 message PrimaryKeyIndexMetaPB {
diff --git 
a/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out
 
b/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out
new file mode 100644
index 00000000000..4e5d112ee95
--- /dev/null
+++ 
b/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out
@@ -0,0 +1,22 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_star --
+\N     4       \N      d
+-4     4       -4      d
+1      1       1       a
+2      2       2       b
+3      3       \N      c
+
+-- !select_mv --
+\N     4
+-4     4
+1      1
+2      2
+3      3
+
+-- !select_mv --
+\N     1
+-4     1
+1      2
+2      1
+3      1
+
diff --git 
a/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy
 
b/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy
new file mode 100644
index 00000000000..9f1f26ae9af
--- /dev/null
+++ 
b/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite ("diffrent_serialize") {
+
+    sql """ DROP TABLE IF EXISTS d_table; """
+
+    sql """
+            create table d_table(
+                k1 int null,
+                k2 int not null,
+                k3 bigint null,
+                k4 varchar(100) null
+            )
+            duplicate key (k1,k2,k3)
+            distributed BY hash(k1) buckets 3
+            properties("replication_num" = "1");
+        """
+
+    sql "insert into d_table select 1,1,1,'a';"
+    sql "insert into d_table select 2,2,2,'b';"
+    sql "insert into d_table select 3,3,null,'c';"
+
+    createMV("create materialized view mv1 as select k1,bitmap_agg(k2) from 
d_table group by k1;")
+    /*
+    createMV("create materialized view mv2 as select k1,map_agg(k2,k3) from 
d_table group by k1;")
+    createMV("create materialized view mv3 as select k1,array_agg(k2) from 
d_table group by k1;")
+    createMV("create materialized view mv4 as select k1,collect_list(k2,3) 
from d_table group by k1;")
+    createMV("create materialized view mv5 as select k1,collect_set(k2,3) from 
d_table group by k1;")
+    */
+
+    sql "insert into d_table select -4,4,-4,'d';"
+    sql "insert into d_table(k4,k2) values('d',4);"
+
+    qt_select_star "select * from d_table order by k1;"
+
+    explain {
+        sql("select k1,bitmap_to_string(bitmap_agg(k2)) from d_table group by 
k1 order by 1;")
+        contains "(mv1)"
+    }
+    qt_select_mv "select k1,bitmap_to_string(bitmap_agg(k2)) from d_table 
group by k1 order by 1;"
+
+    sql "insert into d_table select 1,1,1,'a';"
+    sql "insert into d_table select 1,2,1,'a';"
+
+    explain {
+        sql("select k1,bitmap_count(bitmap_agg(k2)) from d_table group by k1 
order by 1;")
+        contains "(mv1)"
+    }
+    qt_select_mv "select k1,bitmap_count(bitmap_agg(k2)) from d_table group by 
k1 order by 1;"
+
+/*
+    explain {
+        sql("select k1,map_agg(k2,k3) from d_table group by k1 order by 1;")
+        contains "(mv2)"
+    }
+    qt_select_mv "select k1,map_agg(k2,k3) from d_table group by k1 order by 
1;"
+
+    explain {
+        sql("select k1,array_agg(k2) from d_table group by k1 order by 1;")
+        contains "(mv3)"
+    }
+    qt_select_mv "select k1,array_agg(k2) from d_table group by k1 order by 1;"
+
+    explain {
+        sql("select k1,collect_list(k2,3) from d_table group by k1 order by 
1;")
+        contains "(mv4)"
+    }
+    qt_select_mv "select k1,collect_list(k2,3) from d_table group by k1 order 
by 1;"
+
+    explain {
+        sql("select k1,collect_set(k2,3) from d_table group by k1 order by 1;")
+        contains "(mv5)"
+    }
+    qt_select_mv "select k1,collect_set(k2,3) from d_table group by k1 order 
by 1;"
+    */
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to