This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/variant-sparse by this push:
     new 824ed73153a refactor and implement  sparse column reader and 
stats(#45492)
824ed73153a is described below

commit 824ed73153a18939d15f9b56ff6f1ca6888d0312
Author: lihangyu <lihan...@selectdb.com>
AuthorDate: Mon Dec 16 22:07:10 2024 +0800

    refactor and implement  sparse column reader and stats(#45492)
---
 be/src/olap/compaction.cpp                         |   1 +
 be/src/olap/rowset/rowset_writer_context.h         |   3 +
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 146 ++++++++
 be/src/olap/rowset/segment_v2/column_reader.h      |  37 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp    |   6 +-
 be/src/olap/rowset/segment_v2/column_writer.h      |   4 +-
 .../rowset/segment_v2/hierarchical_data_reader.cpp | 364 ++++++++++++++-----
 .../rowset/segment_v2/hierarchical_data_reader.h   | 197 +++--------
 be/src/olap/rowset/segment_v2/segment.cpp          | 330 ++++++------------
 be/src/olap/rowset/segment_v2/segment.h            |  31 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   1 +
 be/src/olap/rowset/segment_v2/stream_reader.h      |   7 +-
 .../segment_v2/variant_column_writer_impl.cpp      | 113 +++++-
 .../rowset/segment_v2/variant_column_writer_impl.h |  11 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |   1 +
 be/src/vec/columns/column_object.cpp               | 387 +++++++++++++++++----
 be/src/vec/columns/column_object.h                 |  34 +-
 be/src/vec/common/schema_util.cpp                  |   6 +-
 be/src/vec/common/schema_util.h                    |   1 +
 be/src/vec/common/string_buffer.hpp                |  86 +++++
 .../data_types/serde/data_type_object_serde.cpp    |   6 +-
 gensrc/proto/segment_v2.proto                      |   2 +-
 22 files changed, 1213 insertions(+), 561 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 68ed0322a9e..1e53ddc7364 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -172,6 +172,7 @@ Status Compaction::merge_input_rowsets() {
     }
 
     RowsetWriterContext ctx;
+    ctx.input_rs_readers = input_rs_readers;
     RETURN_IF_ERROR(construct_output_rowset_writer(ctx));
 
     // write merged rows to output rowset
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index cb0fda83e60..cbdba6991ae 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -115,6 +115,9 @@ struct RowsetWriterContext {
     // For remote rowset
     std::optional<StorageResource> storage_resource;
 
+    // For collect segment statistics for compaction
+    std::vector<RowsetReaderSharedPtr> input_rs_readers;
+
     bool is_local_rowset() const { return !storage_resource; }
 
     std::string segment_path(int seg_id) const {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index b96cf4f7e67..745ff3d93a3 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -44,6 +44,7 @@
 #include "olap/rowset/segment_v2/bloom_filter.h"
 #include "olap/rowset/segment_v2/bloom_filter_index_reader.h"
 #include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo
+#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
 #include "olap/rowset/segment_v2/inverted_index_file_reader.h"
 #include "olap/rowset/segment_v2/inverted_index_reader.h"
 #include "olap/rowset/segment_v2/page_decoder.h"
@@ -52,6 +53,7 @@
 #include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer
 #include "olap/rowset/segment_v2/row_ranges.h"
 #include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
 #include "olap/rowset/segment_v2/zone_map_index.h"
 #include "olap/tablet_schema.h"
 #include "olap/types.h" // for TypeInfo
@@ -220,6 +222,146 @@ Status ColumnReader::create_agg_state(const 
ColumnReaderOptions& opts, const Col
                                  agg_state_type->get_name(), int(type));
 }
 
+const SubcolumnColumnReaders::Node* VariantColumnReader::get_reader_by_path(
+        const vectorized::PathInData& relative_path) const {
+    return _subcolumn_readers->find_leaf(relative_path);
+}
+
+Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
+                                         const TabletColumn& target_col) {
+    // root column use unique id, leaf column use parent_unique_id
+    auto relative_path = target_col.path_info_ptr()->copy_pop_front();
+    const auto* root = _subcolumn_readers->get_root();
+    const auto* node =
+            target_col.has_path_info() ? 
_subcolumn_readers->find_exact(relative_path) : nullptr;
+
+    if (node != nullptr) {
+        if (node->is_leaf_node()) {
+            // Node contains column without any child sub columns and no 
corresponding sparse columns
+            // Direct read extracted columns
+            const auto* node = _subcolumn_readers->find_leaf(relative_path);
+            RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
+        } else {
+            // Node contains column with children columns or has correspoding 
sparse columns
+            // Create reader with hirachical data.
+            std::unique_ptr<ColumnIterator> sparse_iter;
+            if (!_sparse_column_set_in_stats.empty()) {
+                // Sparse column exists or reached sparse size limit, read 
sparse column
+                ColumnIterator* iter;
+                RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter));
+                sparse_iter.reset(iter);
+            }
+            // If read the full path of variant read in MERGE_ROOT, otherwise 
READ_DIRECT
+            HierarchicalDataReader::ReadType read_type =
+                    (relative_path == root->path) ? 
HierarchicalDataReader::ReadType::MERGE_ROOT
+                                                  : 
HierarchicalDataReader::ReadType::READ_DIRECT;
+            RETURN_IF_ERROR(HierarchicalDataReader::create(iterator, 
relative_path, node, root,
+                                                           read_type, 
std::move(sparse_iter)));
+        }
+    } else {
+        if (_sparse_column_set_in_stats.contains(StringRef 
{relative_path.get_path()}) ||
+            _sparse_column_set_in_stats.size() >
+                    VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+            // Sparse column exists or reached sparse size limit, read sparse 
column
+            ColumnIterator* inner_iter;
+            RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
+            *iterator = new SparseColumnExtractReader(relative_path.get_path(),
+                                                      
std::unique_ptr<ColumnIterator>(inner_iter));
+        } else {
+            // Sparse column not exists and not reached stats limit, then the 
target path is not exist, get a default iterator
+            std::unique_ptr<ColumnIterator> iter;
+            RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &iter));
+            *iterator = iter.release();
+        }
+    }
+    return Status::OK();
+}
+
+Status VariantColumnReader::init(const ColumnReaderOptions& opts, const 
SegmentFooterPB& footer,
+                                 uint32_t column_id, uint64_t num_rows,
+                                 io::FileReaderSPtr file_reader) {
+    // init sub columns
+    _subcolumn_readers = std::make_unique<SubcolumnColumnReaders>();
+    std::unordered_map<vectorized::PathInData, uint32_t, 
vectorized::PathInData::Hash>
+            column_path_to_footer_ordinal;
+    for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
+        const auto& column_pb = footer.columns(ordinal);
+        // column path for accessing subcolumns of variant
+        if (column_pb.has_column_path_info()) {
+            vectorized::PathInData path;
+            path.from_protobuf(column_pb.column_path_info());
+            column_path_to_footer_ordinal.emplace(path, ordinal);
+        }
+    }
+
+    const ColumnMetaPB& self_column_pb = footer.columns(column_id);
+    for (const ColumnMetaPB& column_pb : footer.columns()) {
+        if (column_pb.unique_id() != self_column_pb.unique_id()) {
+            continue;
+        }
+        DCHECK(column_pb.has_column_path_info());
+        std::unique_ptr<ColumnReader> reader;
+        RETURN_IF_ERROR(
+                ColumnReader::create(opts, column_pb, footer.num_rows(), 
file_reader, &reader));
+        vectorized::PathInData path;
+        path.from_protobuf(column_pb.column_path_info());
+        // init sparse column
+        if (path.get_path() == SPARSE_COLUMN_PATH) {
+            RETURN_IF_ERROR(ColumnReader::create(opts, column_pb, 
footer.num_rows(), file_reader,
+                                                 &_sparse_column_reader));
+            continue;
+        }
+        // init subcolumns
+        auto relative_path = path.copy_pop_front();
+        if (_subcolumn_readers->get_root() == nullptr) {
+            _subcolumn_readers->create_root(SubcolumnReader {nullptr, 
nullptr});
+        }
+        if (relative_path.empty()) {
+            // root column
+            
_subcolumn_readers->get_mutable_root()->modify_to_scalar(SubcolumnReader {
+                    std::move(reader),
+                    
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+        } else {
+            // check the root is already a leaf node
+            _subcolumn_readers->add(
+                    relative_path,
+                    SubcolumnReader {
+                            std::move(reader),
+                            
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+        }
+    }
+
+    // init sparse column set in stats
+    if (self_column_pb.has_variant_statistics()) {
+        const auto& variant_stats = self_column_pb.variant_statistics();
+        for (const auto& [path, _] : 
variant_stats.sparse_column_non_null_size()) {
+            _sparse_column_set_in_stats.emplace(path.data(), path.size());
+        }
+    }
+    return Status::OK();
+}
+
+Status ColumnReader::create_variant(const ColumnReaderOptions& opts, const 
SegmentFooterPB& footer,
+                                    uint32_t column_id, uint64_t num_rows,
+                                    const io::FileReaderSPtr& file_reader,
+                                    std::unique_ptr<ColumnReader>* reader) {
+    std::unique_ptr<VariantColumnReader> reader_local(new 
VariantColumnReader());
+    RETURN_IF_ERROR(reader_local->init(opts, footer, column_id, num_rows, 
file_reader));
+    *reader = std::move(reader_local);
+    return Status::OK();
+}
+
+Status ColumnReader::create(const ColumnReaderOptions& opts, const 
SegmentFooterPB& footer,
+                            uint32_t column_id, uint64_t num_rows,
+                            const io::FileReaderSPtr& file_reader,
+                            std::unique_ptr<ColumnReader>* reader) {
+    if ((FieldType)footer.columns(column_id).type() != 
FieldType::OLAP_FIELD_TYPE_VARIANT) {
+        return ColumnReader::create(opts, footer.columns(column_id), num_rows, 
file_reader, reader);
+    }
+    // create variant column reader with extracted columns info in footer
+    return create_variant(opts, footer, column_id, num_rows, file_reader, 
reader);
+}
+
 Status ColumnReader::create(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
                             uint64_t num_rows, const io::FileReaderSPtr& 
file_reader,
                             std::unique_ptr<ColumnReader>* reader) {
@@ -706,6 +848,10 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, 
OrdinalPageIndexIterat
     return Status::OK();
 }
 
+Status ColumnReader::new_iterator(ColumnIterator** iterator, const 
TabletColumn& col) {
+    return new_iterator(iterator);
+}
+
 Status ColumnReader::new_iterator(ColumnIterator** iterator) {
     if (is_empty()) {
         *iterator = new EmptyFileColumnIterator();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index d72d802f977..d61393e820c 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -41,6 +41,7 @@
 #include "olap/rowset/segment_v2/page_handle.h"        // for PageHandle
 #include "olap/rowset/segment_v2/page_pointer.h"
 #include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage
+#include "olap/rowset/segment_v2/stream_reader.h"
 #include "olap/types.h"
 #include "olap/utils.h"
 #include "util/once.h"
@@ -78,6 +79,8 @@ class InvertedIndexFileReader;
 class PageDecoder;
 class RowRanges;
 class ZoneMapIndexReader;
+// struct SubcolumnReader;
+// using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SubcolumnReader>;
 
 struct ColumnReaderOptions {
     // whether verify checksum when read page
@@ -112,11 +115,16 @@ struct ColumnIteratorOptions {
 // This will cache data shared by all reader
 class ColumnReader : public MetadataAdder<ColumnReader> {
 public:
+    ColumnReader() = default;
     // Create an initialized ColumnReader in *reader.
     // This should be a lightweight operation without I/O.
     static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& 
meta,
                          uint64_t num_rows, const io::FileReaderSPtr& 
file_reader,
                          std::unique_ptr<ColumnReader>* reader);
+    static Status create(const ColumnReaderOptions& opts, const 
SegmentFooterPB& footer,
+                         uint32_t column_id, uint64_t num_rows,
+                         const io::FileReaderSPtr& file_reader,
+                         std::unique_ptr<ColumnReader>* reader);
     static Status create_array(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
                                const io::FileReaderSPtr& file_reader,
                                std::unique_ptr<ColumnReader>* reader);
@@ -129,11 +137,16 @@ public:
     static Status create_agg_state(const ColumnReaderOptions& opts, const 
ColumnMetaPB& meta,
                                    uint64_t num_rows, const 
io::FileReaderSPtr& file_reader,
                                    std::unique_ptr<ColumnReader>* reader);
+    static Status create_variant(const ColumnReaderOptions& opts, const 
SegmentFooterPB& footer,
+                                 uint32_t column_id, uint64_t num_rows,
+                                 const io::FileReaderSPtr& file_reader,
+                                 std::unique_ptr<ColumnReader>* reader);
     enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, 
ALL_DICT_ENCODING };
 
-    virtual ~ColumnReader();
+    ~ColumnReader() override;
 
     // create a new column iterator. Client should delete returned iterator
+    virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn& 
col);
     Status new_iterator(ColumnIterator** iterator);
     Status new_array_iterator(ColumnIterator** iterator);
     Status new_struct_iterator(ColumnIterator** iterator);
@@ -283,6 +296,28 @@ private:
     DorisCallOnce<Status> _set_dict_encoding_type_once;
 };
 
+class VariantColumnReader : public ColumnReader {
+public:
+    VariantColumnReader() = default;
+
+    Status init(const ColumnReaderOptions& opts, const SegmentFooterPB& 
footer, uint32_t column_id,
+                uint64_t num_rows, io::FileReaderSPtr file_reader);
+    Status new_iterator(ColumnIterator** iterator, const TabletColumn& col) 
override;
+
+    const SubcolumnColumnReaders::Node* get_reader_by_path(
+            const vectorized::PathInData& relative_path) const;
+
+    ~VariantColumnReader() override = default;
+
+private:
+    std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
+    std::unique_ptr<ColumnReader> _sparse_column_reader;
+    // Some sparse column record in stats, use StringRef to reduce memory 
usage,
+    // notice: make sure the ref is not released before the ColumnReader is 
destructed,
+    // used to decide whether to read from sparse column
+    std::unordered_set<StringRef> _sparse_column_set_in_stats;
+};
+
 // Base iterator to read one column data
 class ColumnIterator {
 public:
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index e3cd3b17144..895589d1cd3 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -1168,7 +1168,11 @@ VariantColumnWriter::VariantColumnWriter(const 
ColumnWriterOptions& opts,
                                          const TabletColumn* column, 
std::unique_ptr<Field> field)
         : ColumnWriter(std::move(field), opts.meta->is_nullable()) {
     _impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
-};
+}
+
+Status VariantColumnWriter::init() {
+    return _impl->init();
+}
 
 Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
     _next_rowid += num_rows;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index b664332ea8e..53d7c5d0234 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -73,6 +73,8 @@ struct ColumnWriterOptions {
     io::FileWriter* file_writer = nullptr;
     CompressionTypePB compression_type = UNKNOWN_COMPRESSION;
     RowsetWriterContext* rowset_ctx = nullptr;
+    // For collect segment statistics for compaction
+    std::vector<RowsetReaderSharedPtr> input_rs_readers;
     std::string to_string() const {
         std::stringstream ss;
         ss << std::boolalpha << "meta=" << meta->DebugString()
@@ -480,7 +482,7 @@ public:
 
     ~VariantColumnWriter() override = default;
 
-    Status init() override { return Status::OK(); }
+    Status init() override;
 
     Status append_data(const uint8_t** ptr, size_t num_rows) override;
 
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index c85e4b429ad..2b8e58d47f1 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -23,6 +23,7 @@
 #include "io/io_common.h"
 #include "olap/rowset/segment_v2/column_reader.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_map.h"
 #include "vec/columns/column_object.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/schema_util.h"
@@ -30,14 +31,12 @@
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/json/path_in_data.h"
 
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
 
-Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
-                                      vectorized::PathInData path,
+Status HierarchicalDataReader::create(ColumnIterator** reader, 
vectorized::PathInData path,
                                       const SubcolumnColumnReaders::Node* node,
-                                      const SubcolumnColumnReaders::Node* root,
-                                      ReadType read_type) {
+                                      const SubcolumnColumnReaders::Node* 
root, ReadType read_type,
+                                      std::unique_ptr<ColumnIterator>&& 
sparse_reader) {
     // None leave node need merge with root
     auto* stream_iter = new HierarchicalDataReader(path);
     std::vector<const SubcolumnColumnReaders::Node*> leaves;
@@ -54,14 +53,21 @@ Status 
HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
     // Eg. {"a" : "b" : {"c" : 1}}, access the `a.b` path and merge with root 
path so that
     // we could make sure the data could be fully merged, since some column 
may not be extracted but remains in root
     // like {"a" : "b" : {"e" : 1.1}} in jsonb format
-    if (read_type == ReadType::MERGE_SPARSE) {
+    if (read_type == ReadType::MERGE_ROOT) {
         ColumnIterator* it;
         RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
         stream_iter->set_root(std::make_unique<SubstreamIterator>(
                 root->data.file_column_type->create_column(), 
std::unique_ptr<ColumnIterator>(it),
                 root->data.file_column_type));
     }
-    reader->reset(stream_iter);
+    // need read from sparse column
+    if (sparse_reader) {
+        vectorized::MutableColumnPtr sparse_column =
+                vectorized::ColumnObject::create_sparse_column_fn();
+        stream_iter->_sparse_column_reader = 
std::make_unique<SubstreamIterator>(
+                std::move(sparse_column), std::move(sparse_reader), nullptr);
+    };
+    *reader = stream_iter;
 
     return Status::OK();
 }
@@ -104,7 +110,7 @@ Status HierarchicalDataReader::next_batch(size_t* n, 
vectorized::MutableColumnPt
                 CHECK(reader.inited);
                 RETURN_IF_ERROR(reader.iterator->next_batch(n, reader.column, 
has_null));
                 VLOG_DEBUG << fmt::format("{} next_batch {} rows, type={}", 
path.get_path(), *n,
-                                          type->get_name());
+                                          type ? type->get_name() : "null");
                 reader.rows_read += *n;
                 return Status::OK();
             },
@@ -119,7 +125,7 @@ Status HierarchicalDataReader::read_by_rowids(const 
rowid_t* rowids, const size_
                 CHECK(reader.inited);
                 RETURN_IF_ERROR(reader.iterator->read_by_rowids(rowids, count, 
reader.column));
                 VLOG_DEBUG << fmt::format("{} read_by_rowids {} rows, 
type={}", path.get_path(),
-                                          count, type->get_name());
+                                          count, type ? type->get_name() : 
"null");
                 reader.rows_read += count;
                 return Status::OK();
             },
@@ -150,95 +156,295 @@ ordinal_t HierarchicalDataReader::get_current_ordinal() 
const {
     return (*_substream_reader.begin())->data.iterator->get_current_ordinal();
 }
 
-Status ExtractReader::init(const ColumnIteratorOptions& opts) {
-    if (!_root_reader->inited) {
-        RETURN_IF_ERROR(_root_reader->iterator->init(opts));
-        _root_reader->inited = true;
+Status HierarchicalDataReader::_process_sub_columns(
+        vectorized::ColumnObject& container_variant,
+        const vectorized::PathsWithColumnAndType& non_nested_subcolumns) {
+    for (const auto& entry : non_nested_subcolumns) {
+        DCHECK(!entry.path.has_nested_part());
+        bool add = container_variant.add_sub_column(entry.path, 
entry.column->assume_mutable(),
+                                                    entry.type);
+        if (!add) {
+            return Status::InternalError("Duplicated {}, type {}", 
entry.path.get_path(),
+                                         entry.type->get_name());
+        }
     }
     return Status::OK();
 }
 
-Status ExtractReader::seek_to_first() {
-    LOG(FATAL) << "Not implemented";
-    __builtin_unreachable();
+Status HierarchicalDataReader::_process_nested_columns(
+        vectorized::ColumnObject& container_variant,
+        const std::map<vectorized::PathInData, 
vectorized::PathsWithColumnAndType>&
+                nested_subcolumns) {
+    using namespace vectorized;
+    // Iterate nested subcolumns and flatten them, the entry contains the 
nested subcolumns of the same nested parent
+    // first we pick the first subcolumn as base array and using it's offset 
info. Then we flatten all nested subcolumns
+    // into a new object column and wrap it with array column using the first 
element offsets.The wrapped array column
+    // will type the type of ColumnObject::NESTED_TYPE, whih is 
Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
+    for (const auto& entry : nested_subcolumns) {
+        MutableColumnPtr nested_object = ColumnObject::create(true, false);
+        const auto* base_array =
+                
check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column));
+        MutableColumnPtr offset = 
base_array->get_offsets_ptr()->assume_mutable();
+        auto* nested_object_ptr = 
assert_cast<ColumnObject*>(nested_object.get());
+        // flatten nested arrays
+        for (const auto& subcolumn : entry.second) {
+            const auto& column = subcolumn.column;
+            const auto& type = subcolumn.type;
+            if (!remove_nullable(column)->is_column_array()) {
+                return Status::InvalidArgument(
+                        "Meet none array column when flatten nested array, 
path {}, type {}",
+                        subcolumn.path.get_path(), subcolumn.type->get_name());
+            }
+            const auto* target_array =
+                    
check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get());
+#ifndef NDEBUG
+            if (!base_array->has_equal_offsets(*target_array)) {
+                return Status::InvalidArgument(
+                        "Meet none equal offsets array when flatten nested 
array, path {}, "
+                        "type {}",
+                        subcolumn.path.get_path(), subcolumn.type->get_name());
+            }
+#endif
+            MutableColumnPtr flattend_column = 
target_array->get_data_ptr()->assume_mutable();
+            DataTypePtr flattend_type =
+                    
check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
+                            ->get_nested_type();
+            // add sub path without parent prefix
+            nested_object_ptr->add_sub_column(
+                    
subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()),
+                    std::move(flattend_column), std::move(flattend_type));
+        }
+        nested_object = 
make_nullable(nested_object->get_ptr())->assume_mutable();
+        auto array =
+                make_nullable(ColumnArray::create(std::move(nested_object), 
std::move(offset)));
+        PathInDataBuilder builder;
+        // add parent prefix
+        builder.append(entry.first.get_parts(), false);
+        PathInData parent_path = builder.build();
+        // unset nested parts
+        parent_path.unset_nested();
+        DCHECK(!parent_path.has_nested_part());
+        container_variant.add_sub_column(parent_path, array->assume_mutable(),
+                                         ColumnObject::NESTED_TYPE);
+    }
+    return Status::OK();
 }
 
-Status ExtractReader::seek_to_ordinal(ordinal_t ord) {
-    CHECK(_root_reader->inited);
-    return _root_reader->iterator->seek_to_ordinal(ord);
-}
+Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& 
container,
+                                               size_t nrows) {
+    using namespace vectorized;
+    // build variant as container
+    container = ColumnObject::create(true, false);
+    auto& container_variant = assert_cast<ColumnObject&>(*container);
 
-Status ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t 
nrows) {
-    DCHECK(_root_reader);
-    DCHECK(_root_reader->inited);
-    vectorized::ColumnNullable* nullable_column = nullptr;
-    if (dst->is_nullable()) {
-        nullable_column = assert_cast<vectorized::ColumnNullable*>(dst.get());
+    // add root first
+    if (_path.get_parts().empty() && _root_reader) {
+        auto& root_var =
+                _root_reader->column->is_nullable()
+                        ? assert_cast<vectorized::ColumnObject&>(
+                                  
assert_cast<vectorized::ColumnNullable&>(*_root_reader->column)
+                                          .get_nested_column())
+                        : 
assert_cast<vectorized::ColumnObject&>(*_root_reader->column);
+        auto column = root_var.get_root();
+        auto type = root_var.get_root_type();
+        container_variant.add_sub_column({}, std::move(column), type);
     }
-    auto& variant =
-            nullable_column == nullptr
-                    ? assert_cast<vectorized::ColumnObject&>(*dst)
-                    : 
assert_cast<vectorized::ColumnObject&>(nullable_column->get_nested_column());
-    const auto& root =
-            _root_reader->column->is_nullable()
-                    ? assert_cast<vectorized::ColumnObject&>(
-                              
assert_cast<vectorized::ColumnNullable&>(*_root_reader->column)
-                                      .get_nested_column())
-                    : assert_cast<const 
vectorized::ColumnObject&>(*_root_reader->column);
-    // extract root value with path, we can't modify the original root column
-    // since some other column may depend on it.
-    vectorized::MutableColumnPtr extracted_column;
-    RETURN_IF_ERROR(root.extract_root( // trim the root name, eg. v.a.b -> a.b
-            _col.path_info_ptr()->copy_pop_front(), extracted_column));
-
-    if (_target_type_hint != nullptr) {
-        variant.create_root(_target_type_hint, 
_target_type_hint->create_column());
+    // parent path -> subcolumns
+    std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
+    PathsWithColumnAndType non_nested_subcolumns;
+    RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
+        MutableColumnPtr column = node.data.column->get_ptr();
+        PathInData relative_path = 
node.path.copy_pop_nfront(_path.get_parts().size());
+
+        if (node.path.has_nested_part()) {
+            
CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()),
+                     getTypeName(TypeIndex::Array));
+            PathInData parent_path =
+                    
node.path.get_nested_prefix_path().copy_pop_nfront(_path.get_parts().size());
+            nested_subcolumns[parent_path].emplace_back(relative_path, 
column->get_ptr(),
+                                                        node.data.type);
+        } else {
+            non_nested_subcolumns.emplace_back(relative_path, 
column->get_ptr(), node.data.type);
+        }
+        return Status::OK();
+    }));
+
+    RETURN_IF_ERROR(_process_sub_columns(container_variant, 
non_nested_subcolumns));
+
+    RETURN_IF_ERROR(_process_nested_columns(container_variant, 
nested_subcolumns));
+
+    RETURN_IF_ERROR(_process_sparse_column(container_variant, nrows));
+    return Status::OK();
+}
+
+// Return sub-path by specified prefix.
+// For example, for prefix a.b:
+// a.b.c.d -> c.d, a.b.c -> c
+static std::string_view get_sub_path(const std::string_view& path, const 
std::string_view& prefix) {
+    return path.substr(prefix.size() + 1);
+}
+
+Status 
HierarchicalDataReader::_process_sparse_column(vectorized::ColumnObject& 
container_variant,
+                                                      size_t nrows) {
+    using namespace vectorized;
+    if (!_sparse_column_reader) {
+        
container_variant.get_sparse_column()->assume_mutable()->insert_many_defaults(nrows);
+        return Status::OK();
     }
-    if (variant.empty() || variant.is_null_root()) {
-        variant.create_root(root.get_root_type(), std::move(extracted_column));
+    // process sparse column
+    if (_path.get_parts().empty()) {
+        // directly use sparse column if access root
+        
container_variant.set_sparse_column(_sparse_column_reader->column->get_ptr());
     } else {
-        vectorized::ColumnPtr cast_column;
-        const auto& expected_type = variant.get_root_type();
-        RETURN_IF_ERROR(vectorized::schema_util::cast_column(
-                {extracted_column->get_ptr(),
-                 vectorized::make_nullable(
-                         
std::make_shared<vectorized::ColumnObject::MostCommonType>()),
-                 ""},
-                expected_type, &cast_column));
-        variant.get_root()->insert_range_from(*cast_column, 0, nrows);
-        // variant.set_num_rows(variant.get_root()->size());
+        const auto& offsets =
+                assert_cast<const 
ColumnMap&>(*_sparse_column_reader->column).get_offsets();
+        /// Check if there is no data in shared data in current range.
+        if (offsets.back() == offsets[-1]) {
+            
container_variant.get_sparse_column()->assume_mutable()->insert_many_defaults(nrows);
+        } else {
+            // Read for variant sparse column
+            // Example path: a.b
+            // data: a.b.c : int|123
+            //       a.b.d : string|"456"
+            //       a.e.d : string|"789"
+            // then the extracted sparse column will be:
+            // c : int|123
+            // d : string|"456"
+            const auto& sparse_data_map =
+                    assert_cast<const 
ColumnMap&>(*_sparse_column_reader->column);
+            const auto& src_sparse_data_offsets = 
sparse_data_map.get_offsets();
+            const auto& src_sparse_data_paths =
+                    assert_cast<const 
ColumnString&>(sparse_data_map.get_keys());
+            const auto& src_sparse_data_values =
+                    assert_cast<const 
ColumnString&>(sparse_data_map.get_values());
+
+            auto& sparse_data_offsets =
+                    assert_cast<ColumnMap&>(
+                            
*container_variant.get_sparse_column()->assume_mutable())
+                            .get_offsets();
+            auto [sparse_data_paths, sparse_data_values] =
+                    container_variant.get_sparse_data_paths_and_values();
+            StringRef prefix_ref(_path.get_path());
+            std::string_view path_prefix(prefix_ref.data, prefix_ref.size);
+            for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) {
+                size_t start = src_sparse_data_offsets[ssize_t(i) - 1];
+                size_t end = src_sparse_data_offsets[ssize_t(i)];
+                size_t lower_bound_index =
+                        
vectorized::ColumnObject::find_path_lower_bound_in_sparse_data(
+                                prefix_ref, src_sparse_data_paths, start, end);
+                for (; lower_bound_index != end; ++lower_bound_index) {
+                    auto path_ref = 
src_sparse_data_paths.get_data_at(lower_bound_index);
+                    std::string_view path(path_ref.data, path_ref.size);
+                    if (!path.starts_with(path_prefix)) {
+                        break;
+                    }
+                    // Don't include path that is equal to the prefix.
+                    if (path.size() != path_prefix.size()) {
+                        auto sub_path = get_sub_path(path, path_prefix);
+                        sparse_data_paths->insert_data(sub_path.data(), 
sub_path.size());
+                        
sparse_data_values->insert_from(src_sparse_data_values, lower_bound_index);
+                    }
+                }
+                sparse_data_offsets.push_back(sparse_data_paths->size());
+            }
+        }
     }
-    if (dst->is_nullable()) {
-        // fill nullmap
-        vectorized::ColumnUInt8& dst_null_map =
-                
assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
-        vectorized::ColumnUInt8& src_null_map =
-                
assert_cast<vectorized::ColumnNullable&>(*variant.get_root()).get_null_map_column();
-        dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
+    return Status::OK();
+}
+
+Status HierarchicalDataReader::_init_null_map_and_clear_columns(
+        vectorized::MutableColumnPtr& container, vectorized::MutableColumnPtr& 
dst, size_t nrows) {
+    using namespace vectorized;
+    // clear data in nodes
+    RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
+        node.data.column->clear();
+        return Status::OK();
+    }));
+    container->clear();
+    _sparse_column_reader->column->clear();
+    if (_root_reader) {
+        if (_root_reader->column->is_nullable()) {
+            // fill nullmap
+            DCHECK(dst->is_nullable());
+            ColumnUInt8& dst_null_map = 
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
+            ColumnUInt8& src_null_map =
+                    
assert_cast<ColumnNullable&>(*_root_reader->column).get_null_map_column();
+            dst_null_map.insert_range_from(src_null_map, 0, 
src_null_map.size());
+            // clear nullmap and inner data
+            src_null_map.clear();
+            assert_cast<ColumnObject&>(
+                    
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
+                    .clear_column_data();
+        } else {
+            auto& root_column = 
assert_cast<ColumnObject&>(*_root_reader->column);
+            root_column.clear_column_data();
+        }
+    } else {
+        if (dst->is_nullable()) {
+            // No nullable info exist in hirearchical data, fill nullmap with 
all none null
+            ColumnUInt8& dst_null_map = 
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
+            auto fake_nullable_column = ColumnUInt8::create(nrows, 0);
+            dst_null_map.insert_range_from(*fake_nullable_column, 0, nrows);
+        }
     }
-    _root_reader->column->clear();
-#ifndef NDEBUG
-    variant.check_consistency();
-#endif
     return Status::OK();
 }
 
-Status ExtractReader::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, 
bool* has_null) {
-    RETURN_IF_ERROR(_root_reader->iterator->next_batch(n, 
_root_reader->column));
-    RETURN_IF_ERROR(extract_to(dst, *n));
+Status SparseColumnExtractReader::init(const ColumnIteratorOptions& opts) {
+    return _sparse_column_reader->init(opts);
+}
+
+Status SparseColumnExtractReader::seek_to_first() {
+    return _sparse_column_reader->seek_to_first();
+}
+
+Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) {
+    return _sparse_column_reader->seek_to_ordinal(ord);
+}
+
+void 
SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst) 
{
+    vectorized::ColumnObject& var =
+            dst->is_nullable()
+                    ? assert_cast<vectorized::ColumnObject&>(
+                              
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
+                    : assert_cast<vectorized::ColumnObject&>(*dst);
+    DCHECK(!var.is_null_root());
+    vectorized::ColumnObject::fill_path_olumn_from_sparse_data(
+            *var.get_subcolumn({}) /*root*/, StringRef {_path.data(), 
_path.size()},
+            _sparse_column->get_ptr(), 0, _sparse_column->size());
+    _sparse_column->clear();
+}
+
+Status SparseColumnExtractReader::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                             bool* has_null) {
+    _sparse_column->clear();
+    RETURN_IF_ERROR(_sparse_column_reader->next_batch(n, _sparse_column, 
has_null));
+    const auto& offsets = assert_cast<const 
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+    // Check if we don't have any paths in shared data in current range.
+    if (offsets.back() == offsets[-1]) {
+        dst->insert_many_defaults(*n);
+    } else {
+        _fill_path_column(dst);
+    }
     return Status::OK();
 }
 
-Status ExtractReader::read_by_rowids(const rowid_t* rowids, const size_t count,
-                                     vectorized::MutableColumnPtr& dst) {
-    RETURN_IF_ERROR(_root_reader->iterator->read_by_rowids(rowids, count, 
_root_reader->column));
-    RETURN_IF_ERROR(extract_to(dst, count));
+Status SparseColumnExtractReader::read_by_rowids(const rowid_t* rowids, const 
size_t count,
+                                                 vectorized::MutableColumnPtr& 
dst) {
+    _sparse_column->clear();
+    RETURN_IF_ERROR(_sparse_column_reader->read_by_rowids(rowids, count, 
_sparse_column));
+    const auto& offsets = assert_cast<const 
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+    // Check if we don't have any paths in shared data in current range.
+    if (offsets.back() == offsets[-1]) {
+        dst->insert_many_defaults(count);
+    } else {
+        _fill_path_column(dst);
+    }
     return Status::OK();
 }
 
-ordinal_t ExtractReader::get_current_ordinal() const {
-    return _root_reader->iterator->get_current_ordinal();
+ordinal_t SparseColumnExtractReader::get_current_ordinal() const {
+    return _sparse_column_reader->get_current_ordinal();
 }
 
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index 6c8ced89cd2..5d58f666f62 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -18,8 +18,11 @@
 #pragma once
 
 #include <memory>
+#include <string_view>
 #include <unordered_map>
+#include <utility>
 
+#include "common/status.h"
 #include "io/io_common.h"
 #include "olap/field.h"
 #include "olap/iterators.h"
@@ -49,13 +52,14 @@ namespace doris::segment_v2 {
 class HierarchicalDataReader : public ColumnIterator {
 public:
     // Currently two types of read, merge sparse columns with root columns, or 
read directly
-    enum class ReadType { MERGE_SPARSE, READ_DIRECT };
+    enum class ReadType { MERGE_ROOT, READ_DIRECT };
 
     HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {}
 
-    static Status create(std::unique_ptr<ColumnIterator>* reader, 
vectorized::PathInData path,
+    static Status create(ColumnIterator** reader, vectorized::PathInData path,
                          const SubcolumnColumnReaders::Node* target_node,
-                         const SubcolumnColumnReaders::Node* root, ReadType 
read_type);
+                         const SubcolumnColumnReaders::Node* root, ReadType 
read_type,
+                         std::unique_ptr<ColumnIterator>&& sparse_reader);
 
     Status init(const ColumnIteratorOptions& opts) override;
 
@@ -77,6 +81,7 @@ public:
 private:
     SubstreamReaderTree _substream_reader;
     std::unique_ptr<SubstreamIterator> _root_reader;
+    std::unique_ptr<SubstreamIterator> _sparse_column_reader;
     size_t _rows_read = 0;
     vectorized::PathInData _path;
 
@@ -87,6 +92,29 @@ private:
         }
         return Status::OK();
     }
+
+    Status _process_sub_columns(vectorized::ColumnObject& container_variant,
+                                const vectorized::PathsWithColumnAndType& 
non_nested_subcolumns);
+
+    Status _process_nested_columns(
+            vectorized::ColumnObject& container_variant,
+            const std::map<vectorized::PathInData, 
vectorized::PathsWithColumnAndType>&
+                    nested_subcolumns);
+
+    Status _process_sparse_column(vectorized::ColumnObject& container_variant, 
size_t nrows);
+
+    // 1. add root column
+    // 2. collect path for subcolumns and nested subcolumns
+    // 3. init container with subcolumns
+    // 4. init container with nested subcolumns
+    // 5. init container with sparse column
+    Status _init_container(vectorized::MutableColumnPtr& container, size_t 
nrows);
+
+    // clear all subcolumns's column data for next batch read
+    // set null map for nullable column
+    Status _init_null_map_and_clear_columns(vectorized::MutableColumnPtr& 
container,
+                                            vectorized::MutableColumnPtr& dst, 
size_t nrows);
+
     // process read
     template <typename ReadFunction>
     Status process_read(ReadFunction&& read_func, 
vectorized::MutableColumnPtr& dst, size_t nrows) {
@@ -112,162 +140,36 @@ private:
             return Status::OK();
         }));
 
-        // build variant as container
-        auto container = ColumnObject::create(true, false);
-        auto& container_variant = assert_cast<ColumnObject&>(*container);
-
-        // add root first
-        if (_path.get_parts().empty() && _root_reader) {
-            auto& root_var =
-                    _root_reader->column->is_nullable()
-                            ? assert_cast<vectorized::ColumnObject&>(
-                                      assert_cast<vectorized::ColumnNullable&>(
-                                              *_root_reader->column)
-                                              .get_nested_column())
-                            : 
assert_cast<vectorized::ColumnObject&>(*_root_reader->column);
-            auto column = root_var.get_root();
-            auto type = root_var.get_root_type();
-            container_variant.add_sub_column({}, std::move(column), type);
+        // read sparse column
+        if (_sparse_column_reader) {
+            RETURN_IF_ERROR(read_func(*_sparse_column_reader, {}, nullptr));
         }
-        // parent path -> subcolumns
-        std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
-        PathsWithColumnAndType non_nested_subcolumns;
-        RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
-            MutableColumnPtr column = node.data.column->get_ptr();
-            PathInData relative_path = 
node.path.copy_pop_nfront(_path.get_parts().size());
-
-            if (node.path.has_nested_part()) {
-                
CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()),
-                         getTypeName(TypeIndex::Array));
-                PathInData parent_path = 
node.path.get_nested_prefix_path().copy_pop_nfront(
-                        _path.get_parts().size());
-                nested_subcolumns[parent_path].emplace_back(relative_path, 
column->get_ptr(),
-                                                            node.data.type);
-            } else {
-                non_nested_subcolumns.emplace_back(relative_path, 
column->get_ptr(),
-                                                   node.data.type);
-            }
-            return Status::OK();
-        }));
 
-        for (auto& entry : non_nested_subcolumns) {
-            DCHECK(!entry.path.has_nested_part());
-            bool add = container_variant.add_sub_column(entry.path, 
entry.column->assume_mutable(),
-                                                        entry.type);
-            if (!add) {
-                return Status::InternalError("Duplicated {}, type {}", 
entry.path.get_path(),
-                                             entry.type->get_name());
-            }
-        }
-        // Iterate nested subcolumns and flatten them, the entry contains the 
nested subcolumns of the same nested parent
-        // first we pick the first subcolumn as base array and using it's 
offset info. Then we flatten all nested subcolumns
-        // into a new object column and wrap it with array column using the 
first element offsets.The wrapped array column
-        // will type the type of ColumnObject::NESTED_TYPE, whih is 
Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
-        for (auto& entry : nested_subcolumns) {
-            MutableColumnPtr nested_object = ColumnObject::create(true, false);
-            const auto* base_array =
-                    
check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column));
-            MutableColumnPtr offset = 
base_array->get_offsets_ptr()->assume_mutable();
-            auto* nested_object_ptr = 
assert_cast<ColumnObject*>(nested_object.get());
-            // flatten nested arrays
-            for (const auto& subcolumn : entry.second) {
-                const auto& column = subcolumn.column;
-                const auto& type = subcolumn.type;
-                if (!remove_nullable(column)->is_column_array()) {
-                    return Status::InvalidArgument(
-                            "Meet none array column when flatten nested array, 
path {}, type {}",
-                            subcolumn.path.get_path(), 
subcolumn.type->get_name());
-                }
-                const auto* target_array =
-                        
check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get());
-#ifndef NDEBUG
-                if (!base_array->has_equal_offsets(*target_array)) {
-                    return Status::InvalidArgument(
-                            "Meet none equal offsets array when flatten nested 
array, path {}, "
-                            "type {}",
-                            subcolumn.path.get_path(), 
subcolumn.type->get_name());
-                }
-#endif
-                MutableColumnPtr flattend_column = 
target_array->get_data_ptr()->assume_mutable();
-                DataTypePtr flattend_type =
-                        
check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
-                                ->get_nested_type();
-                // add sub path without parent prefix
-                nested_object_ptr->add_sub_column(
-                        
subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()),
-                        std::move(flattend_column), std::move(flattend_type));
-            }
-            nested_object = 
make_nullable(nested_object->get_ptr())->assume_mutable();
-            auto array =
-                    
make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset)));
-            PathInDataBuilder builder;
-            // add parent prefix
-            builder.append(entry.first.get_parts(), false);
-            PathInData parent_path = builder.build();
-            // unset nested parts
-            parent_path.unset_nested();
-            DCHECK(!parent_path.has_nested_part());
-            container_variant.add_sub_column(parent_path, 
array->assume_mutable(),
-                                             ColumnObject::NESTED_TYPE);
-        }
+        MutableColumnPtr container;
+        RETURN_IF_ERROR(_init_container(container, nrows));
+        auto& container_variant = assert_cast<ColumnObject&>(*container);
 
-        // TODO select v:b -> v.b / v.b.c but v.d maybe in v
-        // copy container variant to dst variant, todo avoid copy
         variant.insert_range_from(container_variant, 0, nrows);
 
-        // variant.set_num_rows(nrows);
         _rows_read += nrows;
         variant.finalize();
 #ifndef NDEBUG
         variant.check_consistency();
 #endif
-        // clear data in nodes
-        RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
-            node.data.column->clear();
-            return Status::OK();
-        }));
-        container->clear();
-        if (_root_reader) {
-            if (_root_reader->column->is_nullable()) {
-                // fill nullmap
-                DCHECK(dst->is_nullable());
-                ColumnUInt8& dst_null_map =
-                        
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
-                ColumnUInt8& src_null_map =
-                        
assert_cast<ColumnNullable&>(*_root_reader->column).get_null_map_column();
-                dst_null_map.insert_range_from(src_null_map, 0, 
src_null_map.size());
-                // clear nullmap and inner data
-                src_null_map.clear();
-                assert_cast<ColumnObject&>(
-                        
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
-                        .clear_column_data();
-            } else {
-                ColumnObject& root_column = 
assert_cast<ColumnObject&>(*_root_reader->column);
-                root_column.clear_column_data();
-            }
-        } else {
-            if (dst->is_nullable()) {
-                // No nullable info exist in hirearchical data, fill nullmap 
with all none null
-                ColumnUInt8& dst_null_map =
-                        
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
-                auto fake_nullable_column = ColumnUInt8::create(nrows, 0);
-                dst_null_map.insert_range_from(*fake_nullable_column, 0, 
nrows);
-            }
-        }
+        RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst, 
nrows));
 
         return Status::OK();
     }
 };
 
-// Extract from root column of variant, since root column of variant
-// encodes sparse columns that are not materialized
-class ExtractReader : public ColumnIterator {
+// Extract path from sparse column
+class SparseColumnExtractReader : public ColumnIterator {
 public:
-    ExtractReader(const TabletColumn& col, 
std::unique_ptr<SubstreamIterator>&& root_reader,
-                  vectorized::DataTypePtr target_type_hint)
-            : _col(col),
-              _root_reader(std::move(root_reader)),
-              _target_type_hint(target_type_hint) {}
+    SparseColumnExtractReader(std::string path,
+                              std::unique_ptr<ColumnIterator>&& 
sparse_column_reader)
+            : _path(std::move(path)), 
_sparse_column_reader(std::move(sparse_column_reader)) {
+        _sparse_column = vectorized::ColumnObject::create_sparse_column_fn();
+    }
 
     Status init(const ColumnIteratorOptions& opts) override;
 
@@ -283,12 +185,11 @@ public:
     ordinal_t get_current_ordinal() const override;
 
 private:
-    Status extract_to(vectorized::MutableColumnPtr& dst, size_t nrows);
-
-    TabletColumn _col;
+    void _fill_path_column(vectorized::MutableColumnPtr& dst);
+    vectorized::MutableColumnPtr _sparse_column;
+    std::string _path;
     // may shared among different column iterators
-    std::unique_ptr<SubstreamIterator> _root_reader;
-    vectorized::DataTypePtr _target_type_hint;
+    std::unique_ptr<ColumnIterator> _sparse_column_reader;
 };
 
 } // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index a50ada112f9..441e839e6ef 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -43,7 +43,6 @@
 #include "olap/rowset/rowset_reader_context.h"
 #include "olap/rowset/segment_v2/column_reader.h"
 #include "olap/rowset/segment_v2/empty_segment_iterator.h"
-#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
 #include "olap/rowset/segment_v2/indexed_column_reader.h"
 #include "olap/rowset/segment_v2/inverted_index_file_reader.h"
 #include "olap/rowset/segment_v2/page_io.h"
@@ -51,6 +50,7 @@
 #include "olap/rowset/segment_v2/segment_iterator.h"
 #include "olap/rowset/segment_v2/segment_writer.h" // k_segment_magic_length
 #include "olap/rowset/segment_v2/stream_reader.h"
+#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
 #include "olap/schema.h"
 #include "olap/short_key_index.h"
 #include "olap/tablet_schema.h"
@@ -201,9 +201,24 @@ Status Segment::_open() {
     // 0.01 comes from PrimaryKeyIndexBuilder::init
     _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;
 
+    uint32_t ordinal = 0;
+    for (const auto& column_meta : _footer_pb->columns()) {
+        // unique_id < 0 means this column is extracted column from variant
+        if (static_cast<int>(column_meta.unique_id()) >= 0) {
+            _column_id_to_footer_ordinal[column_meta.unique_id()] = ordinal++;
+        }
+    }
     return Status::OK();
 }
 
+const ColumnMetaPB* Segment::get_column_meta(int32_t unique_id) const {
+    auto it = _column_id_to_footer_ordinal.find(unique_id);
+    if (it == _column_id_to_footer_ordinal.end()) {
+        return nullptr;
+    }
+    return &_footer_pb->columns(it->second);
+}
+
 Status Segment::_open_inverted_index() {
     _inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>(
             _fs,
@@ -233,7 +248,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const 
StorageReadOptions& read_o
         if (col.is_extracted_column()) {
             auto relative_path = col.path_info_ptr()->copy_pop_front();
             int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : 
col.parent_unique_id();
-            const auto* node = 
_sub_column_tree[unique_id].find_exact(relative_path);
+            const auto* node = 
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+                                       ->get_reader_by_path(relative_path);
             reader = node != nullptr ? node->data.reader.get() : nullptr;
         } else {
             reader = _column_readers.contains(col.unique_id())
@@ -558,20 +574,17 @@ vectorized::DataTypePtr Segment::get_data_type_of(const 
ColumnIdentifier& identi
         auto relative_path = identifier.path->copy_pop_front();
         int32_t unique_id =
                 identifier.unique_id > 0 ? identifier.unique_id : 
identifier.parent_unique_id;
-        const auto* node = _sub_column_tree.contains(unique_id)
-                                   ? 
_sub_column_tree.at(unique_id).find_leaf(relative_path)
+        const auto* node = _column_readers.contains(unique_id)
+                                   ? 
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+                                             
->get_reader_by_path(relative_path)
                                    : nullptr;
-        const auto* sparse_node =
-                _sparse_column_tree.contains(unique_id)
-                        ? 
_sparse_column_tree.at(unique_id).find_exact(relative_path)
-                        : nullptr;
         if (node) {
-            if (read_flat_leaves || (node->children.empty() && sparse_node == 
nullptr)) {
+            if (read_flat_leaves || (node->children.empty())) {
                 return node->data.file_column_type;
             }
         }
         // missing in storage, treat it using input data type
-        if (read_flat_leaves && !node && !sparse_node) {
+        if (read_flat_leaves && !node) {
             return nullptr;
         }
         // it contains children or column missing in storage, so treat it as 
variant
@@ -592,28 +605,11 @@ Status Segment::_create_column_readers_once() {
 }
 
 Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
-    std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal;
-    std::unordered_map<vectorized::PathInData, uint32_t, 
vectorized::PathInData::Hash>
-            column_path_to_footer_ordinal;
-    for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
-        const auto& column_pb = footer.columns(ordinal);
-        // column path for accessing subcolumns of variant
-        if (column_pb.has_column_path_info()) {
-            vectorized::PathInData path;
-            path.from_protobuf(column_pb.column_path_info());
-            column_path_to_footer_ordinal.emplace(path, ordinal);
-        }
-        // unique_id is unsigned, -1 meaning no unique id(e.g. an extracted 
column from variant)
-        if (static_cast<int>(column_pb.unique_id()) >= 0) {
-            // unique id
-            column_id_to_footer_ordinal.emplace(column_pb.unique_id(), 
ordinal);
-        }
-    }
     // init by unique_id
     for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); 
++ordinal) {
         const auto& column = _tablet_schema->column(ordinal);
-        auto iter = column_id_to_footer_ordinal.find(column.unique_id());
-        if (iter == column_id_to_footer_ordinal.end()) {
+        auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
+        if (iter == _column_id_to_footer_ordinal.end()) {
             continue;
         }
 
@@ -622,40 +618,40 @@ Status Segment::_create_column_readers(const 
SegmentFooterPB& footer) {
                 .be_exec_version = _be_exec_version,
         };
         std::unique_ptr<ColumnReader> reader;
-        RETURN_IF_ERROR(ColumnReader::create(opts, 
footer.columns(iter->second), footer.num_rows(),
+        RETURN_IF_ERROR(ColumnReader::create(opts, footer, iter->second, 
footer.num_rows(),
                                              _file_reader, &reader));
         _column_readers.emplace(column.unique_id(), std::move(reader));
     }
 
-    for (const auto& [path, ordinal] : column_path_to_footer_ordinal) {
-        const ColumnMetaPB& column_pb = footer.columns(ordinal);
-        ColumnReaderOptions opts {
-                .kept_in_memory = _tablet_schema->is_in_memory(),
-                .be_exec_version = _be_exec_version,
-        };
-        std::unique_ptr<ColumnReader> reader;
-        RETURN_IF_ERROR(
-                ColumnReader::create(opts, column_pb, footer.num_rows(), 
_file_reader, &reader));
-        int32_t unique_id = column_pb.unique_id();
-        auto relative_path = path.copy_pop_front();
-        if (_sub_column_tree[unique_id].get_root() == nullptr) {
-            _sub_column_tree[unique_id].create_root(SubcolumnReader {nullptr, 
nullptr});
-        }
-        if (relative_path.empty()) {
-            // root column
-            
_sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader
 {
-                    std::move(reader),
-                    
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
-        } else {
-            // check the root is already a leaf node
-            // 
DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty());
-            _sub_column_tree[unique_id].add(
-                    relative_path,
-                    SubcolumnReader {
-                            std::move(reader),
-                            
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
-        }
-    }
+    // for (const auto& [path, ordinal] : column_path_to_footer_ordinal) {
+    //     const ColumnMetaPB& column_pb = footer.columns(ordinal);
+    //     ColumnReaderOptions opts {
+    //             .kept_in_memory = _tablet_schema->is_in_memory(),
+    //             .be_exec_version = _be_exec_version,
+    //     };
+    //     std::unique_ptr<ColumnReader> reader;
+    //     RETURN_IF_ERROR(
+    //             ColumnReader::create(opts, column_pb, footer.num_rows(), 
_file_reader, &reader));
+    //     int32_t unique_id = column_pb.unique_id();
+    //     auto relative_path = path.copy_pop_front();
+    //     if (_sub_column_tree[unique_id].get_root() == nullptr) {
+    //         _sub_column_tree[unique_id].create_root(SubcolumnReader 
{nullptr, nullptr});
+    //     }
+    //     if (relative_path.empty()) {
+    //         // root column
+    //         
_sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader
 {
+    //                 std::move(reader),
+    //                 
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+    //     } else {
+    //         // check the root is already a leaf node
+    //         // 
DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty());
+    //         _sub_column_tree[unique_id].add(
+    //                 relative_path,
+    //                 SubcolumnReader {
+    //                         std::move(reader),
+    //                         
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+    //     }
+    // }
 
     // compability reason use tablet schema
     // init by column path
@@ -716,8 +712,8 @@ Status Segment::_create_column_readers(const 
SegmentFooterPB& footer) {
     return Status::OK();
 }
 
-static Status new_default_iterator(const TabletColumn& tablet_column,
-                                   std::unique_ptr<ColumnIterator>* iter) {
+Status Segment::new_default_iterator(const TabletColumn& tablet_column,
+                                     std::unique_ptr<ColumnIterator>* iter) {
     if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
         return Status::InternalError(
                 "invalid nonexistent column without default value. 
column_uid={}, column_name={}, "
@@ -736,147 +732,48 @@ static Status new_default_iterator(const TabletColumn& 
tablet_column,
     return Status::OK();
 }
 
-Status Segment::_new_iterator_with_variant_root(const TabletColumn& 
tablet_column,
-                                                
std::unique_ptr<ColumnIterator>* iter,
-                                                const 
SubcolumnColumnReaders::Node* root,
-                                                vectorized::DataTypePtr 
target_type_hint) {
-    ColumnIterator* it;
-    RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
-    auto* stream_iter = new ExtractReader(
-            tablet_column,
-            
std::make_unique<SubstreamIterator>(root->data.file_column_type->create_column(),
-                                                
std::unique_ptr<ColumnIterator>(it),
-                                                root->data.file_column_type),
-            target_type_hint);
-    iter->reset(stream_iter);
-    return Status::OK();
-}
-
-Status Segment::new_column_iterator_with_path(const TabletColumn& 
tablet_column,
-                                              std::unique_ptr<ColumnIterator>* 
iter,
-                                              const StorageReadOptions* opt) {
-    // root column use unique id, leaf column use parent_unique_id
-    int32_t unique_id = tablet_column.unique_id() > 0 ? 
tablet_column.unique_id()
-                                                      : 
tablet_column.parent_unique_id();
-    if (!_sub_column_tree.contains(unique_id)) {
-        // No such variant column in this segment, get a default one
-        RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
-        return Status::OK();
-    }
-    auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
-    const auto* root = _sub_column_tree[unique_id].get_root();
-    const auto* node = tablet_column.has_path_info()
-                               ? 
_sub_column_tree[unique_id].find_exact(relative_path)
-                               : nullptr;
-    const auto* sparse_node =
-            tablet_column.has_path_info() && 
_sparse_column_tree.contains(unique_id)
-                    ? _sparse_column_tree[unique_id].find_exact(relative_path)
-                    : nullptr;
-    // // Currently only compaction and checksum need to read flat leaves
-    // // They both use tablet_schema_with_merged_max_schema_version as read 
schema
-    // auto type_to_read_flat_leaves = [](ReaderType type) {
-    //     return type == ReaderType::READER_BASE_COMPACTION ||
-    //            type == ReaderType::READER_CUMULATIVE_COMPACTION ||
-    //            type == ReaderType::READER_COLD_DATA_COMPACTION ||
-    //            type == ReaderType::READER_SEGMENT_COMPACTION ||
-    //            type == ReaderType::READER_FULL_COMPACTION || type == 
ReaderType::READER_CHECKSUM;
-    // };
-
-    // // find the sibling of the nested column to fill the target nested 
column
-    // auto new_default_iter_with_same_nested = [&](const TabletColumn& 
tablet_column,
-    //                                              
std::unique_ptr<ColumnIterator>* iter) {
-    //     // We find node that represents the same Nested type as path.
-    //     const auto* parent = 
_sub_column_tree[unique_id].find_best_match(relative_path);
-    //     VLOG_DEBUG << "find with path " << 
tablet_column.path_info_ptr()->get_path() << " parent "
-    //                << (parent ? parent->path.get_path() : "nullptr") << ", 
type "
-    //                << ", parent is nested " << (parent ? 
parent->is_nested() : false) << ", "
-    //                << 
TabletColumn::get_string_by_field_type(tablet_column.type());
-    //     // find it's common parent with nested part
-    //     // why not use parent->path->has_nested_part? because parent may 
not be a leaf node
-    //     // none leaf node may not contain path info
-    //     // Example:
-    //     // {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : 
"a@b"}}]}}
-    //     // nested node path          : payload.commits(NESTED)
-    //     // tablet_column path_info   : payload.commits.issue.id(SCALAR)
-    //     // parent path node          : payload.commits.issue(TUPLE)
-    //     // leaf path_info            : payload.commits.issue.email(SCALAR)
-    //     if (parent && SubcolumnColumnReaders::find_parent(
-    //                           parent, [](const auto& node) { return 
node.is_nested(); })) {
-    //         /// Find any leaf of Nested subcolumn.
-    //         const auto* leaf = SubcolumnColumnReaders::find_leaf(
-    //                 parent, [](const auto& node) { return 
node.path.has_nested_part(); });
-    //         assert(leaf);
-    //         std::unique_ptr<ColumnIterator> sibling_iter;
-    //         ColumnIterator* sibling_iter_ptr;
-    //         
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
-    //         sibling_iter.reset(sibling_iter_ptr);
-    //         *iter = 
std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
-    //                                                               
leaf->data.file_column_type);
-    //     } else {
-    //         *iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, 
nullptr);
-    //     }
-    //     return Status::OK();
-    // };
-
-    // if (opt != nullptr && 
type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
-    //     // compaction need to read flat leaves nodes data to prevent from 
amplification
-    //     const auto* node = tablet_column.has_path_info()
-    //                                ? 
_sub_column_tree[unique_id].find_leaf(relative_path)
-    //                                : nullptr;
-    //     if (!node) {
-    //         // sparse_columns have this path, read from root
-    //         if (sparse_node != nullptr && sparse_node->is_leaf_node()) {
-    //             RETURN_IF_ERROR(_new_iterator_with_variant_root(
-    //                     tablet_column, iter, root, 
sparse_node->data.file_column_type));
-    //         } else {
-    //             if (tablet_column.is_nested_subcolumn()) {
-    //                 // using the sibling of the nested column to fill the 
target nested column
-    //                 
RETURN_IF_ERROR(new_default_iter_with_same_nested(tablet_column, iter));
-    //             } else {
-    //                 RETURN_IF_ERROR(new_default_iterator(tablet_column, 
iter));
-    //             }
-    //         }
-    //         return Status::OK();
-    //     }
-    //     ColumnIterator* it;
-    //     RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
-    //     iter->reset(it);
-    //     return Status::OK();
-    // }
-
-    if (node != nullptr) {
-        if (node->is_leaf_node() && sparse_node == nullptr) {
-            // Node contains column without any child sub columns and no 
corresponding sparse columns
-            // Direct read extracted columns
-            const auto* node = 
_sub_column_tree[unique_id].find_leaf(relative_path);
-            ColumnIterator* it;
-            RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
-            iter->reset(it);
-        } else {
-            // Node contains column with children columns or has correspoding 
sparse columns
-            // Create reader with hirachical data.
-            // If sparse column exists or read the full path of variant read 
in MERGE_SPARSE, otherwise READ_DIRECT
-            HierarchicalDataReader::ReadType read_type =
-                    (relative_path == root->path) || sparse_node != nullptr
-                            ? HierarchicalDataReader::ReadType::MERGE_SPARSE
-                            : HierarchicalDataReader::ReadType::READ_DIRECT;
-            RETURN_IF_ERROR(
-                    HierarchicalDataReader::create(iter, relative_path, node, 
root, read_type));
-        }
-    } else {
-        // No such node, read from either sparse column or default column
-        if (sparse_node != nullptr) {
-            // sparse columns have this path, read from root
-            RETURN_IF_ERROR(_new_iterator_with_variant_root(tablet_column, 
iter, root,
-                                                            
sparse_node->data.file_column_type));
-        } else {
-            // No such variant column in this segment, get a default one
-            RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
-        }
-    }
-
-    return Status::OK();
-}
+// Status Segment::new_column_iterator_with_path(const TabletColumn& 
tablet_column,
+//                                               
std::unique_ptr<ColumnIterator>* iter,
+//                                               const StorageReadOptions* 
opt) {
+//     // root column use unique id, leaf column use parent_unique_id
+//     int32_t unique_id = tablet_column.unique_id() > 0 ? 
tablet_column.unique_id()
+//                                                       : 
tablet_column.parent_unique_id();
+//     if (!_sub_column_tree.contains(unique_id)) {
+//         // No such variant column in this segment, get a default one
+//         RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
+//         return Status::OK();
+//     }
+//     auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
+//     const auto* root = _sub_column_tree[unique_id].get_root();
+//     const auto* node = tablet_column.has_path_info()
+//                                ? 
_sub_column_tree[unique_id].find_exact(relative_path)
+//                                : nullptr;
+//
+//     if (node != nullptr) {
+//         if (node->is_leaf_node()) {
+//             // Node contains column without any child sub columns and no 
corresponding sparse columns
+//             // Direct read extracted columns
+//             const auto* node = 
_sub_column_tree[unique_id].find_leaf(relative_path);
+//             ColumnIterator* it;
+//             RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
+//             iter->reset(it);
+//         } else {
+//             // Node contains column with children columns or has 
correspoding sparse columns
+//             // Create reader with hirachical data.
+//             // If sparse column exists or read the full path of variant 
read in MERGE_ROOT, otherwise READ_DIRECT
+//             HierarchicalDataReader::ReadType read_type =
+//                     (relative_path == root->path) ? 
HierarchicalDataReader::ReadType::MERGE_ROOT
+//                                                   : 
HierarchicalDataReader::ReadType::READ_DIRECT;
+//             RETURN_IF_ERROR(
+//                     HierarchicalDataReader::create(iter, relative_path, 
node, root, read_type));
+//         }
+//     } else {
+//         // No such node, read from sparse column
+//         // TODO test if in VariantStatisticsPB.sparse_column_non_null_size, 
otherwise generate a default iterator
+//     }
+//
+//     return Status::OK();
+// }
 
 // Not use cid anymore, for example original table schema is colA int, then 
user do following actions
 // 1.add column b
@@ -894,46 +791,43 @@ Status Segment::new_column_iterator(const TabletColumn& 
tablet_column,
     RETURN_IF_ERROR(_create_column_readers_once());
 
     // init column iterator by path info
-    if (tablet_column.has_path_info() || tablet_column.is_variant_type()) {
-        return new_column_iterator_with_path(tablet_column, iter, opt);
-    }
+    // if (tablet_column.has_path_info() || tablet_column.is_variant_type()) {
+    //     return new_column_iterator_with_path(tablet_column, iter, opt);
+    // }
+
+    // For compability reason unique_id may less than 0 for variant extracted 
column
+    int32_t unique_id = tablet_column.unique_id() > 0 ? 
tablet_column.unique_id()
+                                                      : 
tablet_column.parent_unique_id();
     // init default iterator
-    if (!_column_readers.contains(tablet_column.unique_id())) {
+    if (!_column_readers.contains(unique_id)) {
         RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
         return Status::OK();
     }
     // init iterator by unique id
     ColumnIterator* it;
-    
RETURN_IF_ERROR(_column_readers.at(tablet_column.unique_id())->new_iterator(&it));
+    RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it, 
tablet_column));
     iter->reset(it);
 
     if (config::enable_column_type_check && !tablet_column.is_agg_state_type() 
&&
-        tablet_column.type() != 
_column_readers.at(tablet_column.unique_id())->get_meta_type()) {
+        tablet_column.type() != 
_column_readers.at(unique_id)->get_meta_type()) {
         LOG(WARNING) << "different type between schema and column reader,"
                      << " column schema name: " << tablet_column.name()
                      << " column schema type: " << int(tablet_column.type())
                      << " column reader meta type: "
-                     << 
int(_column_readers.at(tablet_column.unique_id())->get_meta_type());
+                     << int(_column_readers.at(unique_id)->get_meta_type());
         return Status::InternalError("different type between schema and column 
reader");
     }
     return Status::OK();
 }
 
-Status Segment::new_column_iterator(int32_t unique_id, 
std::unique_ptr<ColumnIterator>* iter) {
-    RETURN_IF_ERROR(_create_column_readers_once());
-    ColumnIterator* it;
-    RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it));
-    iter->reset(it);
-    return Status::OK();
-}
-
 ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {
     // init column iterator by path info
     if (col.has_path_info() || col.is_variant_type()) {
         auto relative_path = col.path_info_ptr()->copy_pop_front();
         int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : 
col.parent_unique_id();
         const auto* node = col.has_path_info()
-                                   ? 
_sub_column_tree[unique_id].find_exact(relative_path)
+                                   ? 
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+                                             
->get_reader_by_path(relative_path)
                                    : nullptr;
         if (node != nullptr) {
             return node->data.reader.get();
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index bc5ab1e1fdc..877f74ae1c3 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -37,14 +37,12 @@
 #include "olap/olap_common.h"
 #include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
 #include "olap/rowset/segment_v2/page_handle.h"
-#include "olap/rowset/segment_v2/stream_reader.h"
 #include "olap/schema.h"
 #include "olap/tablet_schema.h"
 #include "runtime/descriptors.h"
 #include "util/once.h"
 #include "util/slice.h"
 #include "vec/columns/column.h"
-#include "vec/columns/subcolumn_tree.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/json/path_in_data.h"
@@ -107,11 +105,9 @@ public:
                                std::unique_ptr<ColumnIterator>* iter,
                                const StorageReadOptions* opt);
 
-    Status new_column_iterator_with_path(const TabletColumn& tablet_column,
-                                         std::unique_ptr<ColumnIterator>* iter,
-                                         const StorageReadOptions* opt);
-
-    Status new_column_iterator(int32_t unique_id, 
std::unique_ptr<ColumnIterator>* iter);
+    // Status new_column_iterator_with_path(const TabletColumn& tablet_column,
+    //                                      std::unique_ptr<ColumnIterator>* 
iter,
+    //                                      const StorageReadOptions* opt);
 
     Status new_bitmap_index_iterator(const TabletColumn& tablet_column,
                                      std::unique_ptr<BitmapIndexIterator>* 
iter);
@@ -120,7 +116,8 @@ public:
                                        const TabletIndex* index_meta,
                                        const StorageReadOptions& read_options,
                                        std::unique_ptr<InvertedIndexIterator>* 
iter);
-
+    static Status new_default_iterator(const TabletColumn& tablet_column,
+                                       std::unique_ptr<ColumnIterator>* iter);
     const ShortKeyIndexDecoder* get_short_key_index() const {
         DCHECK(_load_index_once.has_called() && 
_load_index_once.stored_result().ok());
         return _sk_index_decoder.get();
@@ -211,6 +208,8 @@ public:
 
     const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; }
 
+    const ColumnMetaPB* get_column_meta(int32_t unique_id) const;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(Segment);
     Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr 
tablet_schema,
@@ -226,11 +225,6 @@ private:
     Status _load_pk_bloom_filter();
     ColumnReader* _get_column_reader(const TabletColumn& col);
 
-    // Get Iterator which will read variant root column and extract with paths 
and types info
-    Status _new_iterator_with_variant_root(const TabletColumn& tablet_column,
-                                           std::unique_ptr<ColumnIterator>* 
iter,
-                                           const SubcolumnColumnReaders::Node* 
root,
-                                           vectorized::DataTypePtr 
target_type_hint);
     Status _write_error_file(size_t file_size, size_t offset, size_t 
bytes_read, char* data,
                              io::IOContext& io_ctx);
 
@@ -269,15 +263,6 @@ private:
     // map column unique id ---> it's inner data type
     std::map<int32_t, std::shared_ptr<const vectorized::IDataType>> 
_file_column_types;
 
-    // Each node in the tree represents the sub column reader and type
-    // for variants.
-    // map column unique id --> it's sub column readers
-    std::map<int32_t, SubcolumnColumnReaders> _sub_column_tree;
-
-    // each sprase column's path and types info
-    // map column unique id --> it's sparse sub column readers
-    std::map<int32_t, SubcolumnColumnReaders> _sparse_column_tree;
-
     // used to guarantee that short key index will be loaded at most once in a 
thread-safe way
     DorisCallOnce<Status> _load_index_once;
     // used to guarantee that primary key bloom filter will be loaded at most 
once in a thread-safe way
@@ -303,6 +288,8 @@ private:
 
     int _be_exec_version = BeExecVersionManager::get_newest_version();
     OlapReaderStatistics* _pk_index_load_stats = nullptr;
+    // unique_id -> idx in footer.columns()
+    std::unordered_map<int32_t, uint32_t> _column_id_to_footer_ordinal;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 1bfcfbb999b..b76acf68978 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -285,6 +285,7 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, 
const TabletColumn& co
     opts.file_writer = _file_writer;
     opts.compression_type = _opts.compression_type;
     opts.footer = &_footer;
+    opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
 
     std::unique_ptr<ColumnWriter> writer;
     RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, 
&writer));
diff --git a/be/src/olap/rowset/segment_v2/stream_reader.h 
b/be/src/olap/rowset/segment_v2/stream_reader.h
index 9aac3c0f232..5b71e00101f 100644
--- a/be/src/olap/rowset/segment_v2/stream_reader.h
+++ b/be/src/olap/rowset/segment_v2/stream_reader.h
@@ -19,9 +19,14 @@
 
 #include <memory>
 
-#include "olap/rowset/segment_v2/column_reader.h"
+// #include "olap/rowset/segment_v2/column_reader.h"
+#include "vec/columns/column.h"
+#include "vec/columns/subcolumn_tree.h"
+#include "vec/data_types/data_type.h"
 
 namespace doris::segment_v2 {
+class ColumnIterator;
+class ColumnReader;
 
 // This file Defined ColumnIterator and ColumnReader for reading variant 
subcolumns. The types from read schema and from storage are
 // different, so we need to wrap the ColumnIterator from execution phase and 
storage column reading phase.And we also
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp 
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 958df5780bd..761cbec8c49 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -16,9 +16,14 @@
 // under the License.
 #include "olap/rowset/segment_v2/variant_column_writer_impl.h"
 
+#include <gen_cpp/segment_v2.pb.h>
+
 #include "common/status.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset_fwd.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/rowset/segment_v2/column_writer.h"
+#include "olap/segment_loader.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_object.h"
@@ -31,11 +36,90 @@ namespace doris::segment_v2 {
 VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions& 
opts,
                                                  const TabletColumn* column) {
     _opts = opts;
-    _column = vectorized::ColumnObject::create(true, false);
-    if (column->is_nullable()) {
+    _tablet_column = column;
+}
+
+Status VariantColumnWriterImpl::init() {
+    // caculate stats info
+    std::set<std::string> dynamic_paths;
+    RETURN_IF_ERROR(_get_subcolumn_paths_from_stats(dynamic_paths));
+    if (dynamic_paths.empty()) {
+        _column = vectorized::ColumnObject::create(true, false);
+    } else {
+        vectorized::ColumnObject::Subcolumns dynamic_subcolumns;
+        for (const auto& path : dynamic_paths) {
+            dynamic_subcolumns.add(vectorized::PathInData(path),
+                                   vectorized::ColumnObject::Subcolumn {0, 
true});
+        }
+        _column = 
vectorized::ColumnObject::create(std::move(dynamic_subcolumns), true);
+    }
+    if (_tablet_column->is_nullable()) {
         _null_column = vectorized::ColumnUInt8::create(0);
     }
-    _tablet_column = column;
+    return Status::OK();
+}
+
+Status 
VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::string>& 
paths) {
+    std::unordered_map<std::string, size_t> 
path_to_total_number_of_non_null_values;
+
+    // Merge and collect all stats info from all input rowsets/segments
+    for (RowsetReaderSharedPtr reader : _opts.input_rs_readers) {
+        SegmentCacheHandle segment_cache;
+        RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
+                std::static_pointer_cast<BetaRowset>(reader->rowset()), 
&segment_cache));
+        for (const auto& segment : segment_cache.get_segments()) {
+            const auto* column_meta_pb = 
segment->get_column_meta(_tablet_column->unique_id());
+            if (!column_meta_pb) {
+                continue;
+            }
+            if (!column_meta_pb->has_variant_statistics()) {
+                continue;
+            }
+            const VariantStatisticsPB& source_statistics = 
column_meta_pb->variant_statistics();
+            for (const auto& [path, size] : 
source_statistics.subcolumn_non_null_size()) {
+                auto it = path_to_total_number_of_non_null_values.find(path);
+                if (it == path_to_total_number_of_non_null_values.end()) {
+                    it = path_to_total_number_of_non_null_values.emplace(path, 
0).first;
+                }
+                it->second += size;
+            }
+            for (const auto& [path, size] : 
source_statistics.sparse_column_non_null_size()) {
+                auto it = path_to_total_number_of_non_null_values.find(path);
+                if (it == path_to_total_number_of_non_null_values.end()) {
+                    it = path_to_total_number_of_non_null_values.emplace(path, 
0).first;
+                }
+                it->second += size;
+            }
+        }
+    }
+    // Check if the number of all dynamic paths exceeds the limit.
+    if (path_to_total_number_of_non_null_values.size() > 
vectorized::ColumnObject::MAX_SUBCOLUMNS) {
+        // Sort paths by total number of non null values.
+        std::vector<std::pair<size_t, std::string_view>> paths_with_sizes;
+        
paths_with_sizes.reserve(path_to_total_number_of_non_null_values.size());
+        for (const auto& [path, size] : 
path_to_total_number_of_non_null_values) {
+            paths_with_sizes.emplace_back(size, path);
+        }
+        std::sort(paths_with_sizes.begin(), paths_with_sizes.end(), 
std::greater());
+
+        // Fill dynamic_paths with first max_dynamic_paths paths in sorted 
list.
+        for (const auto& [size, path] : paths_with_sizes) {
+            if (paths.size() < vectorized::ColumnObject::MAX_SUBCOLUMNS) {
+                paths.emplace(path);
+            }
+            // // todo : Add all remaining paths into shared data statistics 
until we reach its max size;
+            // else if (new_statistics.sparse_data_paths_statistics.size() < 
Statistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+            //     new_statistics.sparse_data_paths_statistics.emplace(path, 
size);
+            // }
+        }
+    } else {
+        // Use all dynamic paths from all source columns.
+        for (const auto& [path, _] : path_to_total_number_of_non_null_values) {
+            paths.emplace(path);
+        }
+    }
+
+    return Status::OK();
 }
 
 Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* 
ptr,
@@ -92,7 +176,6 @@ Status 
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
                                                     .parent_unique_id = 
_tablet_column->unique_id(),
                                                     .path_info = full_path});
     };
-    
_statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size());
     // convert sub column data from engine format to storage layer format
     for (const auto& entry :
          
vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
@@ -121,7 +204,8 @@ Status 
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
         _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);
 
         // get stastics
-        
_statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size());
+        _statistics._subcolumns_non_null_size.emplace(entry->path.get_path(),
+                                                      
entry->data.get_non_null_value_size());
     }
     return Status::OK();
 }
@@ -163,7 +247,7 @@ Status VariantColumnWriterImpl::_process_sparse_column(
             it != _statistics._sparse_column_non_null_size.end()) {
             ++it->second;
         } else if (_statistics._sparse_column_non_null_size.size() <
-                   VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) {
+                   VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
             _statistics._sparse_column_non_null_size.emplace(path, 1);
         }
     }
@@ -173,7 +257,22 @@ Status VariantColumnWriterImpl::_process_sparse_column(
 }
 
 void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
-    // TODO
+    for (const auto& [path, value] : _sparse_column_non_null_size) {
+        stats->mutable_subcolumn_non_null_size()->emplace(path.to_string(), 
value);
+    }
+    for (const auto& [path, value] : _sparse_column_non_null_size) {
+        
stats->mutable_sparse_column_non_null_size()->emplace(path.to_string(), value);
+    }
+}
+
+void VariantStatistics::from_pb(const VariantStatisticsPB& stats) {
+    // make sure the ref of path, todo not use ref
+    for (const auto& [path, value] : stats.subcolumn_non_null_size()) {
+        _subcolumns_non_null_size[StringRef(path.data(), path.size())] = value;
+    }
+    for (const auto& [path, value] : stats.sparse_column_non_null_size()) {
+        _sparse_column_non_null_size[StringRef(path.data(), path.size())] = 
value;
+    }
 }
 
 Status VariantColumnWriterImpl::finalize() {
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h 
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index 87f67e7b1ef..66c5269e7ce 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -36,18 +36,20 @@ class ColumnWriter;
 class ScalarColumnWriter;
 
 struct VariantStatistics {
-    constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000;
-    std::vector<size_t> _subcolumns_non_null_size;
+    // If reached the size of this, we should stop writing statistics for 
sparse data
+    constexpr static size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000;
+    std::map<StringRef, size_t> _subcolumns_non_null_size;
     std::map<StringRef, size_t> _sparse_column_non_null_size;
 
     void to_pb(VariantStatisticsPB* stats) const;
+    void from_pb(const VariantStatisticsPB& stats);
 };
 
 class VariantColumnWriterImpl {
 public:
     VariantColumnWriterImpl(const ColumnWriterOptions& opts, const 
TabletColumn* column);
     Status finalize();
-
+    Status init();
     bool is_finalized() const;
 
     Status append_data(const uint8_t** ptr, size_t num_rows);
@@ -65,6 +67,9 @@ public:
 private:
     void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const 
TabletColumn& column);
 
+    // subcolumn path from variant stats info to distinguish from sparse column
+    Status _get_subcolumn_paths_from_stats(std::set<std::string>& paths);
+
     Status _create_column_writer(uint32_t cid, const TabletColumn& column,
                                  const TabletColumn& parent_column,
                                  const TabletSchemaSPtr& tablet_schema);
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 089dac218fe..6abc2f5c16e 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -280,6 +280,7 @@ Status 
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
     opts.file_writer = _file_writer;
     opts.compression_type = _opts.compression_type;
     opts.footer = &_footer;
+    opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
 
     std::unique_ptr<ColumnWriter> writer;
     RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, 
&writer));
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index 9d6e260724b..11130e628e9 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -356,6 +356,7 @@ ColumnObject::Subcolumn::Subcolumn(MutableColumnPtr&& 
data_, DataTypePtr type, b
         : least_common_type(type), is_nullable(is_nullable_), 
is_root(is_root_) {
     data.push_back(std::move(data_));
     data_types.push_back(type);
+    data_serdes.push_back(type->get_serde());
 }
 
 ColumnObject::Subcolumn::Subcolumn(size_t size_, bool is_nullable_, bool 
is_root_)
@@ -398,6 +399,7 @@ void 
ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) {
     data.push_back(type->create_column());
     least_common_type = LeastCommonType {type};
     data_types.push_back(type);
+    data_serdes.push_back(type->get_serde());
 }
 
 void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
@@ -800,6 +802,9 @@ ColumnObject::ColumnObject(bool is_nullable_, bool 
create_root_)
     }
 }
 
+ColumnObject::ColumnObject(MutableColumnPtr&& sparse_column)
+        : is_nullable(true), 
serialized_sparse_column(std::move(sparse_column)) {}
+
 ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type, 
MutableColumnPtr&& column)
         : is_nullable(is_nullable_) {
     add_sub_column({}, std::move(column), type);
@@ -957,6 +962,27 @@ void ColumnObject::insert_default() {
     ++num_rows;
 }
 
+bool ColumnObject::Subcolumn::is_null_at(size_t n) const {
+    if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
+        return true;
+    }
+    size_t ind = n;
+    if (ind < num_of_defaults_in_prefix) {
+        return true;
+    }
+
+    ind -= num_of_defaults_in_prefix;
+    for (const auto& part : data) {
+        if (ind < part->size()) {
+            return assert_cast<const ColumnNullable&>(*part).is_null_at(ind);
+        }
+        ind -= part->size();
+    }
+
+    throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting 
field is out of range",
+                           n);
+}
+
 void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
     if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
         res = Null();
@@ -1023,8 +1049,9 @@ void 
ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std:
                 is_null = false;
                 // insert key
                 key->insert_data(path.data(), path.size());
+                const auto& part_type_serde = data_serdes[i];
                 // insert value
-                data_types[i]->get_serde()->write_one_cell_to_binary(*part, 
value, row);
+                part_type_serde->write_one_cell_to_binary(*part, value, row);
             }
             return;
         }
@@ -1088,7 +1115,7 @@ const char* parse_binary_from_sparse_column(TypeIndex 
type, const char* data, Fi
         const size_t size = *reinterpret_cast<const size_t*>(data);
         data += sizeof(size_t);
         res = Array(size);
-        vectorized::Array& array = res.get<Array>();
+        auto& array = res.get<Array>();
         info_res.num_dimensions++;
         for (size_t i = 0; i < size; ++i) {
             const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++);
@@ -1097,7 +1124,7 @@ const char* parse_binary_from_sparse_column(TypeIndex 
type, const char* data, Fi
                 continue;
             }
             Field nested_field;
-            const TypeIndex nested_type =
+            const auto nested_type =
                     assert_cast<const TypeIndex>(*reinterpret_cast<const 
uint8_t*>(data++));
             data = parse_binary_from_sparse_column(nested_type, data, 
nested_field, info_res);
             array.emplace_back(std::move(nested_field));
@@ -1113,7 +1140,7 @@ const char* parse_binary_from_sparse_column(TypeIndex 
type, const char* data, Fi
 }
 
 std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const 
ColumnString* value,
-                                                                         
size_t row) const {
+                                                                         
size_t row) {
     const auto& data_ref = value->get_data_at(row);
     const char* data = data_ref.data;
     DCHECK(data_ref.size > 0);
@@ -1132,7 +1159,7 @@ std::pair<Field, FieldInfo> 
ColumnObject::deserialize_from_sparse_column(const C
     }
 
     DCHECK(data_ref.size > 1);
-    const TypeIndex type = assert_cast<const 
TypeIndex>(*reinterpret_cast<const uint8_t*>(data++));
+    const auto type = assert_cast<const TypeIndex>(*reinterpret_cast<const 
uint8_t*>(data++));
     info_res.scalar_type_id = type;
     Field res;
     const char* end = parse_binary_from_sparse_column(type, data, res, 
info_res);
@@ -1171,7 +1198,7 @@ void ColumnObject::get(size_t n, Field& res) const {
     // Iterator over [path, binary value]
     for (size_t i = offset; i != end; ++i) {
         const StringRef path_data = path->get_data_at(i);
-        const auto& data = deserialize_from_sparse_column(value, i);
+        const auto& data = ColumnObject::deserialize_from_sparse_column(value, 
i);
         object.try_emplace(std::string(path_data.data, path_data.size), 
data.first);
     }
 
@@ -1360,7 +1387,8 @@ void 
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
             const PathInData column_path(src_sparse_path);
             if (auto* subcolumn = get_subcolumn(column_path); subcolumn != 
nullptr) {
                 // Deserialize binary value into subcolumn from src serialized 
sparse column data.
-                const auto& data = 
src.deserialize_from_sparse_column(src_sparse_column_values, i);
+                const auto& data =
+                        
ColumnObject::deserialize_from_sparse_column(src_sparse_column_values, i);
                 subcolumn->insert(data.first, data.second);
             } else {
                 // Before inserting this path into sparse column check if we 
need to
@@ -1386,7 +1414,7 @@ void 
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
             }
         }
 
-        // Insert remaining dynamic paths from 
src_dynamic_paths_for_shared_data.
+        // Insert remaining dynamic paths from 
src_dynamic_paths_for_sparse_data.
         while (sorted_src_subcolumn_for_sparse_column_idx <
                sorted_src_subcolumn_for_sparse_column_size) {
             auto& [src_path, src_subcolumn] = 
sorted_src_subcolumn_for_sparse_column
@@ -1452,6 +1480,37 @@ const ColumnObject::Subcolumn* 
ColumnObject::get_subcolumn(const PathInData& key
     return &node->data;
 }
 
+size_t ColumnObject::Subcolumn::serialize_text_json(size_t n, BufferWritable& 
output) const {
+    if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
+        output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(),
+                     DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size());
+        return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size();
+    }
+
+    size_t ind = n;
+    if (ind < num_of_defaults_in_prefix) {
+        output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(),
+                     DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size());
+        return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size();
+    }
+
+    ind -= num_of_defaults_in_prefix;
+    DataTypeSerDe::FormatOptions opt;
+    for (size_t i = 0; i < data.size(); ++i) {
+        const auto& part = data[i];
+        const auto& part_type_serde = data_serdes[i];
+
+        if (ind < part->size()) {
+            return part_type_serde->serialize_one_cell_to_json(*part, ind, 
output, opt);
+        }
+
+        ind -= part->size();
+    }
+
+    throw doris::Exception(ErrorCode::OUT_OF_BOUND,
+                           "Index ({}) for serializing JSON is out of range", 
n);
+}
+
 const ColumnObject::Subcolumn* ColumnObject::get_subcolumn_with_cache(const 
PathInData& key,
                                                                       size_t 
key_index) const {
     // Optimization by caching the order of fields (which is almost always the 
same)
@@ -1717,89 +1776,203 @@ void get_json_by_column_tree(rapidjson::Value& root, 
rapidjson::Document::Alloca
 }
 
 Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string* 
output) const {
-    if (!is_finalized()) {
-        const_cast<ColumnObject*>(this)->finalize();
-    }
-    rapidjson::StringBuffer buf;
-    if (is_scalar_variant()) {
+    // if (!is_finalized()) {
+    //     const_cast<ColumnObject*>(this)->finalize();
+    // }
+    if (is_scalar_variant() && is_finalized()) {
         auto type = get_root_type();
         *output = type->to_string(*get_root(), row);
         return Status::OK();
     }
-    RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr));
-    // TODO avoid copy
-    *output = std::string(buf.GetString(), buf.GetSize());
+    // TODO preallocate memory
+    auto tmp_col = ColumnString::create();
+    VectorBufferWriter write_buffer(*tmp_col.get());
+    RETURN_IF_ERROR(serialize_one_row_to_json_format(row, write_buffer, 
nullptr));
+    write_buffer.commit();
+    auto str_ref = tmp_col->get_data_at(0);
+    *output = std::string(str_ref.data, str_ref.size);
     return Status::OK();
 }
 
 Status ColumnObject::serialize_one_row_to_string(int64_t row, BufferWritable& 
output) const {
-    if (!is_finalized()) {
-        const_cast<ColumnObject*>(this)->finalize();
-    }
-    if (is_scalar_variant()) {
+    // if (!is_finalized()) {
+    //     const_cast<ColumnObject*>(this)->finalize();
+    // }
+    if (is_scalar_variant() && is_finalized()) {
         auto type = get_root_type();
         type->to_string(*get_root(), row, output);
         return Status::OK();
     }
-    rapidjson::StringBuffer buf;
-    RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr));
-    output.write(buf.GetString(), buf.GetLength());
+    RETURN_IF_ERROR(serialize_one_row_to_json_format(row, output, nullptr));
     return Status::OK();
 }
 
-Status ColumnObject::serialize_one_row_to_json_format(int64_t row, 
rapidjson::StringBuffer* output,
-                                                      bool* is_null) const {
-    CHECK(is_finalized());
-    if (subcolumns.empty()) {
-        if (is_null != nullptr) {
-            *is_null = true;
-        } else {
-            rapidjson::Value root(rapidjson::kNullType);
-            rapidjson::Writer<rapidjson::StringBuffer> writer(*output);
-            if (!root.Accept(writer)) {
-                return Status::InternalError("Failed to serialize json value");
+/// Struct that represents elements of the JSON path.
+/// "a.b.c" -> ["a", "b", "c"]
+struct PathElements {
+    explicit PathElements(const String& path) {
+        const char* start = path.data();
+        const char* end = start + path.size();
+        const char* pos = start;
+        const char* last_dot_pos = pos - 1;
+        for (pos = start; pos != end; ++pos) {
+            if (*pos == '.') {
+                elements.emplace_back(last_dot_pos + 1, size_t(pos - 
last_dot_pos - 1));
+                last_dot_pos = pos;
             }
         }
-        return Status::OK();
+
+        elements.emplace_back(last_dot_pos + 1, size_t(pos - last_dot_pos - 
1));
     }
-    CHECK(size() > row);
-    rapidjson::StringBuffer buffer;
-    rapidjson::Value root(rapidjson::kNullType);
-    if (doc_structure == nullptr) {
-        doc_structure = std::make_shared<rapidjson::Document>();
-        rapidjson::Document::AllocatorType& allocator = 
doc_structure->GetAllocator();
-        get_json_by_column_tree(*doc_structure, allocator, 
subcolumns.get_root());
+
+    size_t size() const { return elements.size(); }
+
+    std::vector<std::string_view> elements;
+};
+
+/// Struct that represents a prefix of a JSON path. Used during output of the 
JSON object.
+struct Prefix {
+    /// Shrink current prefix to the common prefix of current prefix and 
specified path.
+    /// For example, if current prefix is a.b.c.d and path is a.b.e, then 
shrink the prefix to a.b.
+    void shrink_to_common_prefix(const PathElements& path_elements) {
+        /// Don't include last element in path_elements in the prefix.
+        size_t i = 0;
+        while (i != elements.size() && i != (path_elements.elements.size() - 
1) &&
+               elements[i].first == path_elements.elements[i])
+            ++i;
+        elements.resize(i);
     }
-    if (!doc_structure->IsNull()) {
-        root.CopyFrom(*doc_structure, doc_structure->GetAllocator());
+
+    /// Check is_first flag in current object.
+    bool is_first_in_current_object() const {
+        if (elements.empty()) return root_is_first_flag;
+        return elements.back().second;
     }
-    Arena mem_pool;
-#ifndef NDEBUG
-    VLOG_DEBUG << "dump structure " << 
JsonFunctions::print_json_value(*doc_structure);
-#endif
-    for (const auto& subcolumn : subcolumns) {
-        RETURN_IF_ERROR(find_and_set_leave_value(
-                subcolumn->data.get_finalized_column_ptr(), subcolumn->path,
-                subcolumn->data.get_least_common_type_serde(),
-                subcolumn->data.get_least_common_type(),
-                subcolumn->data.least_common_type.get_base_type_id(), root,
-                doc_structure->GetAllocator(), mem_pool, row));
-        if (subcolumn->path.empty() && !root.IsObject()) {
-            // root was modified, only handle root node
-            break;
-        }
+
+    /// Set flag is_first = false in current object.
+    void set_not_first_in_current_object() {
+        if (elements.empty())
+            root_is_first_flag = false;
+        else
+            elements.back().second = false;
     }
-    compact_null_values(root, doc_structure->GetAllocator());
-    if (root.IsNull() && is_null != nullptr) {
-        // Fast path
-        *is_null = true;
-    } else {
-        output->Clear();
-        rapidjson::Writer<rapidjson::StringBuffer> writer(*output);
-        if (!root.Accept(writer)) {
-            return Status::InternalError("Failed to serialize json value");
+
+    size_t size() const { return elements.size(); }
+
+    /// Elements of the prefix: (path element, is_first flag in this prefix).
+    /// is_first flag indicates if we already serialized some key in the 
object with such prefix.
+    std::vector<std::pair<std::string_view, bool>> elements;
+    bool root_is_first_flag = true;
+};
+
+Status ColumnObject::serialize_one_row_to_json_format(int64_t row_num, 
BufferWritable& output,
+                                                      bool* is_null) const {
+    const auto& column_map = assert_cast<const 
ColumnMap&>(*serialized_sparse_column);
+    const auto& sparse_data_offsets = column_map.get_offsets();
+    const auto [sparse_data_paths, sparse_data_values] = 
get_sparse_data_paths_and_values();
+    size_t sparse_data_offset = 
sparse_data_offsets[static_cast<ssize_t>(row_num) - 1];
+    size_t sparse_data_end = 
sparse_data_offsets[static_cast<ssize_t>(row_num)];
+
+    // We need to convert the set of paths in this row to a JSON object.
+    // To do it, we first collect all the paths from current row, then we sort 
them
+    // and construct the resulting JSON object by iterating over sorted list 
of paths.
+    // For example:
+    // b.c, a.b, a.a, b.e, g, h.u.t -> a.a, a.b, b.c, b.e, g, h.u.t -> {"a" : 
{"a" : ..., "b" : ...}, "b" : {"c" : ..., "e" : ...}, "g" : ..., "h" : {"u" : 
{"t" : ...}}}.
+    std::vector<String> sorted_paths;
+    std::map<std::string, Subcolumn> subcolumn_path_map;
+    sorted_paths.reserve(get_subcolumns().size() + (sparse_data_end - 
sparse_data_offset));
+    for (const auto& subcolumn : get_subcolumns()) {
+        /// We consider null value and absence of the path in a row as 
equivalent cases, because we cannot actually distinguish them.
+        /// So, we don't output null values at all.
+        if (!subcolumn->data.is_null_at(row_num)) {
+            sorted_paths.emplace_back(subcolumn->path.get_path());
+        }
+        subcolumn_path_map.emplace(subcolumn->path.get_path(), 
subcolumn->data);
+    }
+    for (size_t i = sparse_data_offset; i != sparse_data_end; ++i) {
+        auto path = sparse_data_paths->get_data_at(i).to_string();
+        sorted_paths.emplace_back(path);
+    }
+
+    std::sort(sorted_paths.begin(), sorted_paths.end());
+
+    writeChar('{', output);
+    size_t index_in_sparse_data_values = sparse_data_offset;
+    // current_prefix represents the path of the object we are currently 
serializing keys in.
+    Prefix current_prefix;
+    for (const auto& path : sorted_paths) {
+        PathElements path_elements(path);
+        // Change prefix to common prefix between current prefix and current 
path.
+        // If prefix changed (it can only decrease), close all finished 
objects.
+        // For example:
+        // Current prefix: a.b.c.d
+        // Current path: a.b.e.f
+        // It means now we have : {..., "a" : {"b" : {"c" : {"d" : ...
+        // Common prefix will be a.b, so it means we should close objects 
a.b.c.d and a.b.c: {..., "a" : {"b" : {"c" : {"d" : ...}}
+        // and continue serializing keys in object a.b
+        size_t prev_prefix_size = current_prefix.size();
+        current_prefix.shrink_to_common_prefix(path_elements);
+        size_t prefix_size = current_prefix.size();
+        if (prefix_size != prev_prefix_size) {
+            size_t objects_to_close = prev_prefix_size - prefix_size;
+            for (size_t i = 0; i != objects_to_close; ++i) {
+                writeChar('}', output);
+            }
+        }
+
+        // Now we are inside object that has common prefix with current path.
+        // We should go inside all objects in current path.
+        // From the example above we should open object a.b.e:
+        //  {..., "a" : {"b" : {"c" : {"d" : ...}}, "e" : {
+        if (prefix_size + 1 < path_elements.size()) {
+            for (size_t i = prefix_size; i != path_elements.size() - 1; ++i) {
+                /// Write comma before the key if it's not the first key in 
this prefix.
+                if (!current_prefix.is_first_in_current_object()) {
+                    writeChar(',', output);
+                } else {
+                    current_prefix.set_not_first_in_current_object();
+                }
+
+                writeJSONString(path_elements.elements[i], output);
+                writeCString(":{", output);
+
+                // Update current prefix.
+                
current_prefix.elements.emplace_back(path_elements.elements[i], true);
+            }
+        }
+
+        // Write comma before the key if it's not the first key in this prefix.
+        if (!current_prefix.is_first_in_current_object()) {
+            writeChar(',', output);
+        } else {
+            current_prefix.set_not_first_in_current_object();
+        }
+
+        writeJSONString(path_elements.elements.back(), output);
+        writeCString(":", output);
+
+        // Serialize value of current path.
+        if (auto subcolumn_it = subcolumn_path_map.find(path);
+            subcolumn_it != subcolumn_path_map.end()) {
+            subcolumn_it->second.serialize_text_json(row_num, output);
+        } else {
+            // To serialize value stored in shared data we should first 
deserialize it from binary format.
+            Subcolumn tmp_subcolumn(0, true);
+            const auto& data = ColumnObject::deserialize_from_sparse_column(
+                    sparse_data_values, index_in_sparse_data_values++);
+            tmp_subcolumn.insert(data.first, data.second);
+            tmp_subcolumn.serialize_text_json(0, output);
         }
     }
+
+    // Close all remaining open objects.
+    for (size_t i = 0; i != current_prefix.elements.size(); ++i) {
+        writeChar('}', output);
+    }
+    writeChar('}', output);
+#ifndef NDEBUG
+    // check if it is a valid json
+#endif
     return Status::OK();
 }
 
@@ -2126,6 +2299,8 @@ const DataTypePtr ColumnObject::NESTED_TYPE = 
std::make_shared<vectorized::DataT
         
std::make_shared<vectorized::DataTypeArray>(std::make_shared<vectorized::DataTypeNullable>(
                 std::make_shared<vectorized::DataTypeObject>())));
 
+const size_t ColumnObject::MAX_SUBCOLUMNS = 5;
+
 DataTypePtr ColumnObject::get_root_type() const {
     return subcolumns.get_root()->data.get_least_common_type();
 }
@@ -2312,4 +2487,80 @@ bool ColumnObject::try_insert_default_from_nested(const 
Subcolumns::NodePtr& ent
     return true;
 }
 
+size_t ColumnObject::find_path_lower_bound_in_sparse_data(StringRef path,
+                                                          const ColumnString& 
sparse_data_paths,
+                                                          size_t start, size_t 
end) {
+    // Simple random access iterator over values in ColumnString in specified 
range.
+    class Iterator {
+    public:
+        using difference_type = size_t;
+        using value_type = StringRef;
+        using iterator_category = std::random_access_iterator_tag;
+        using pointer = StringRef*;
+        using reference = StringRef&;
+
+        Iterator() = delete;
+        Iterator(const ColumnString* data_, size_t index_) : data(data_), 
index(index_) {}
+        Iterator(const Iterator& rhs) = default;
+        Iterator& operator=(const Iterator& rhs) = default;
+        inline Iterator& operator+=(difference_type rhs) {
+            index += rhs;
+            return *this;
+        }
+        inline StringRef operator*() const { return data->get_data_at(index); }
+
+        inline Iterator& operator++() {
+            ++index;
+            return *this;
+        }
+        inline Iterator& operator--() {
+            --index;
+            return *this;
+        }
+        inline difference_type operator-(const Iterator& rhs) const { return 
index - rhs.index; }
+
+        const ColumnString* data;
+        size_t index;
+    };
+
+    Iterator start_it(&sparse_data_paths, start);
+    Iterator end_it(&sparse_data_paths, end);
+    auto it = std::lower_bound(start_it, end_it, path);
+    return it.index;
+}
+
+void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, 
StringRef path,
+                                                    const ColumnPtr& 
sparse_data_column,
+                                                    size_t start, size_t end) {
+    const auto& sparse_data_map = assert_cast<const 
ColumnMap&>(*sparse_data_column);
+    const auto& sparse_data_offsets = sparse_data_map.get_offsets();
+    size_t first_offset = sparse_data_offsets[static_cast<ssize_t>(start) - 1];
+    size_t last_offset = sparse_data_offsets[static_cast<ssize_t>(end) - 1];
+    // Check if we have at least one row with data.
+    if (first_offset == last_offset) {
+        subcolumn.insert_many_defaults(end - start);
+        return;
+    }
+
+    const auto& sparse_data_paths = assert_cast<const 
ColumnString&>(sparse_data_map.get_keys());
+    const auto& sparse_data_values = assert_cast<const 
ColumnString&>(sparse_data_map.get_values());
+    for (size_t i = start; i != end; ++i) {
+        size_t paths_start = sparse_data_offsets[static_cast<ssize_t>(i) - 1];
+        size_t paths_end = sparse_data_offsets[static_cast<ssize_t>(i)];
+        auto lower_bound_path_index = 
ColumnObject::find_path_lower_bound_in_sparse_data(
+                path, sparse_data_paths, paths_start, paths_end);
+        if (lower_bound_path_index != paths_end &&
+            sparse_data_paths.get_data_at(lower_bound_path_index) == path) {
+            // auto value_data = 
sparse_data_values.get_data_at(lower_bound_path_index);
+            // ReadBufferFromMemory buf(value_data.data, value_data.size);
+            // dynamic_serialization->deserializeBinary(path_column, buf, 
getFormatSettings());
+            const auto& data = 
ColumnObject::deserialize_from_sparse_column(&sparse_data_values,
+                                                                            
lower_bound_path_index);
+            subcolumn.insert(data.first, data.second);
+        } else {
+            subcolumn.insert_default();
+        }
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 72cc783caf8..b63c0c5c0d8 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -98,7 +98,7 @@ public:
     constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
     // Nullable(Array(Nullable(Object)))
     const static DataTypePtr NESTED_TYPE;
-    const size_t MAX_SUBCOLUMNS = 3;
+    const static size_t MAX_SUBCOLUMNS;
     // Finlize mode for subcolumns, write mode will estimate which subcolumns 
are sparse columns(too many null values inside column),
     // merge and encode them into a shared column in root column. Only affects 
in flush block to segments.
     // Otherwise read mode should be as default mode.
@@ -128,6 +128,8 @@ public:
 
         size_t get_non_null_value_size() const;
 
+        size_t serialize_text_json(size_t n, BufferWritable& output) const;
+
         const DataTypeSerDeSPtr& get_least_common_type_serde() const {
             return least_common_type.get_serde();
         }
@@ -136,6 +138,8 @@ public:
 
         void get(size_t n, Field& res) const;
 
+        bool is_null_at(size_t n) const;
+
         /// Inserts a field, which scalars can be arbitrary, but number of
         /// dimensions should be consistent with current common type.
         /// throws InvalidArgument when meet conflict types
@@ -233,6 +237,7 @@ public:
         /// and it's the supertype for all type of column from 0 to i-1.
         std::vector<WrappedPtr> data;
         std::vector<DataTypePtr> data_types;
+        std::vector<DataTypeSerDeSPtr> data_serdes;
         /// Until we insert any non-default field we don't know further
         /// least common type and we count number of defaults in prefix,
         /// which will be converted to the default type of final common type.
@@ -248,7 +253,6 @@ private:
     const bool is_nullable;
     Subcolumns subcolumns;
     size_t num_rows;
-
     // The rapidjson document format of Subcolumns tree structure
     // the leaves is null.In order to display whole document, copy
     // this structure and fill with Subcolumns sub items
@@ -268,6 +272,8 @@ public:
 
     explicit ColumnObject(bool is_nullable_, bool create_root = true);
 
+    explicit ColumnObject(MutableColumnPtr&& sparse_column);
+
     explicit ColumnObject(bool is_nullable_, DataTypePtr type, 
MutableColumnPtr&& column);
 
     // create without root, num_rows = size
@@ -292,7 +298,7 @@ public:
     Status serialize_one_row_to_string(int64_t row, BufferWritable& output) 
const;
 
     // serialize one row to json format
-    Status serialize_one_row_to_json_format(int64_t row, 
rapidjson::StringBuffer* output,
+    Status serialize_one_row_to_json_format(int64_t row, BufferWritable& 
output,
                                             bool* is_null) const;
 
     // Fill the `serialized_sparse_column`
@@ -360,11 +366,19 @@ public:
 
     Subcolumns& get_subcolumns() { return subcolumns; }
 
-    ColumnPtr get_sparse_column() {
+    ColumnPtr get_sparse_column() const {
         return serialized_sparse_column->convert_to_full_column_if_const();
     }
 
     // use sparse_subcolumns_schema to record sparse column's path info and 
type
+    static MutableColumnPtr create_sparse_column_fn() {
+        return 
vectorized::ColumnMap::create(vectorized::ColumnString::create(),
+                                             
vectorized::ColumnString::create(),
+                                             
vectorized::ColumnArray::ColumnOffsets::create());
+    }
+
+    void set_sparse_column(ColumnPtr column) { serialized_sparse_column = 
column; }
+
     Status finalize(FinalizeMode mode);
 
     /// Finalizes all subcolumns.
@@ -571,10 +585,18 @@ public:
         const auto& value = assert_cast<const 
ColumnString&>(column_map.get_values());
         return {&key, &value};
     }
+    // Insert all the data from sparse data with specified path to sub column.
+    static void fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, 
StringRef path,
+                                                 const ColumnPtr& 
sparse_data_column, size_t start,
+                                                 size_t end);
+
+    static size_t find_path_lower_bound_in_sparse_data(StringRef path,
+                                                       const ColumnString& 
sparse_data_paths,
+                                                       size_t start, size_t 
end);
 
     // Deserialize the i-th row of the column from the sparse column.
-    std::pair<Field, FieldInfo> deserialize_from_sparse_column(const 
ColumnString* value,
-                                                               size_t row) 
const;
+    static std::pair<Field, FieldInfo> deserialize_from_sparse_column(const 
ColumnString* value,
+                                                                      size_t 
row);
 
 private:
     // May throw execption
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index 42f9240646f..77b3299c5b5 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -608,7 +608,7 @@ bool has_schema_index_diff(const TabletSchema* new_schema, 
const TabletSchema* o
 
 TabletColumn create_sparse_column(int32_t parent_unique_id) {
     TColumn tcolumn;
-    tcolumn.column_name = ".sparse";
+    tcolumn.column_name = SPARSE_COLUMN_PATH;
     tcolumn.col_unique_id = parent_unique_id;
     tcolumn.column_type = TColumnType {};
     tcolumn.column_type.type = TPrimitiveType::MAP;
@@ -618,7 +618,9 @@ TabletColumn create_sparse_column(int32_t parent_unique_id) 
{
     tcolumn.column_type.type = TPrimitiveType::STRING;
     tcolumn.children_column.push_back(child_tcolumn);
     tcolumn.children_column.push_back(child_tcolumn);
-    return TabletColumn {tcolumn};
+    auto res = TabletColumn {tcolumn};
+    res.set_path_info(PathInData {SPARSE_COLUMN_PATH});
+    return res;
 }
 
 } // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 0507c9e2fe6..fee6e778325 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -49,6 +49,7 @@ struct ColumnWithTypeAndName;
 } // namespace vectorized
 } // namespace doris
 
+const std::string SPARSE_COLUMN_PATH = "__DORIS_VARIANT_SPARSE__";
 namespace doris::vectorized::schema_util {
 /// Returns number of dimensions in Array type. 0 if type is not array.
 size_t get_number_of_dimensions(const IDataType& type);
diff --git a/be/src/vec/common/string_buffer.hpp 
b/be/src/vec/common/string_buffer.hpp
index 8dca6f057a2..d297d465985 100644
--- a/be/src/vec/common/string_buffer.hpp
+++ b/be/src/vec/common/string_buffer.hpp
@@ -85,6 +85,92 @@ private:
     const char* _data;
 };
 
+inline void writeChar(char x, BufferWritable& buf) {
+    buf.write(x);
+}
+
+/** Writes a C-string without creating a temporary object. If the string is a 
literal, then `strlen` is executed at the compilation stage.
+  * Use when the string is a literal.
+  */
+#define writeCString(s, buf) (buf).write((s), strlen(s))
+
+inline void writeJSONString(const char* begin, const char* end, 
BufferWritable& buf) {
+    writeChar('"', buf);
+    for (const char* it = begin; it != end; ++it) {
+        switch (*it) {
+        case '\b':
+            writeChar('\\', buf);
+            writeChar('b', buf);
+            break;
+        case '\f':
+            writeChar('\\', buf);
+            writeChar('f', buf);
+            break;
+        case '\n':
+            writeChar('\\', buf);
+            writeChar('n', buf);
+            break;
+        case '\r':
+            writeChar('\\', buf);
+            writeChar('r', buf);
+            break;
+        case '\t':
+            writeChar('\\', buf);
+            writeChar('t', buf);
+            break;
+        case '\\':
+            writeChar('\\', buf);
+            writeChar('\\', buf);
+            break;
+        case '/':
+            writeChar('/', buf);
+            break;
+        case '"':
+            writeChar('\\', buf);
+            writeChar('"', buf);
+            break;
+        default:
+            UInt8 c = *it;
+            if (c <= 0x1F) {
+                /// Escaping of ASCII control characters.
+
+                UInt8 higher_half = c >> 4;
+                UInt8 lower_half = c & 0xF;
+
+                writeCString("\\u00", buf);
+                writeChar('0' + higher_half, buf);
+
+                if (lower_half <= 9) {
+                    writeChar('0' + lower_half, buf);
+                } else {
+                    writeChar('A' + lower_half - 10, buf);
+                }
+            } else if (end - it >= 3 && it[0] == '\xE2' && it[1] == '\x80' &&
+                       (it[2] == '\xA8' || it[2] == '\xA9')) {
+                /// This is for compatibility with JavaScript, because 
unescaped line separators are prohibited in string literals,
+                ///  and these code points are alternative line separators.
+
+                if (it[2] == '\xA8') {
+                    writeCString("\\u2028", buf);
+                }
+                if (it[2] == '\xA9') {
+                    writeCString("\\u2029", buf);
+                }
+
+                /// Byte sequence is 3 bytes long. We have additional two 
bytes to skip.
+                it += 2;
+            } else {
+                writeChar(*it, buf);
+            }
+        }
+    }
+    writeChar('"', buf);
+}
+
+inline void writeJSONString(std::string_view s, BufferWritable& buf) {
+    writeJSONString(s.data(), s.data() + s.size(), buf);
+}
+
 using VectorBufferReader = BufferReadable;
 using BufferReader = BufferReadable;
 
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp 
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index f6719437285..6c902b60589 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -58,15 +58,15 @@ Status DataTypeObjectSerDe::_write_column_to_mysql(const 
IColumn& column,
                 root->get_finalized_column(), row_buffer, row_idx, col_const, 
options));
     } else {
         // Serialize hierarchy types to json format
-        rapidjson::StringBuffer buffer;
+        std::string buffer;
         bool is_null = false;
-        if (!variant.serialize_one_row_to_json_format(row_idx, &buffer, 
&is_null)) {
+        if (!variant.serialize_one_row_to_string(row_idx, &buffer)) {
             return Status::InternalError("Invalid json format");
         }
         if (is_null) {
             row_buffer.push_null();
         } else {
-            row_buffer.push_string(buffer.GetString(), buffer.GetLength());
+            row_buffer.push_string(buffer.data(), buffer.size());
         }
     }
     return Status::OK();
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 37a4f0a70ee..dee4a81d3bb 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -161,7 +161,7 @@ message ColumnPathInfo {
 
 message VariantStatisticsPB {
     // in the order of subcolumns in variant
-    repeated uint32 subcolumn_non_null_size = 1;
+    map<string, uint32> subcolumn_non_null_size = 1;
     map<string, uint32> sparse_column_non_null_size = 2;
 } 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to