This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a6667d02afe54b4f4cb87d23ee21680c3b0104b6
Author: lihangyu <lihan...@selectdb.com>
AuthorDate: Tue Feb 18 16:28:35 2025 +0800

    support downgrade (#47987)
---
 be/src/olap/compaction.cpp                         |   6 +-
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 253 ++++++++++++++-------
 be/src/olap/rowset/segment_v2/column_reader.h      |  80 ++++---
 be/src/olap/rowset/segment_v2/column_writer.cpp    |   8 +
 .../rowset/segment_v2/hierarchical_data_reader.cpp |  19 +-
 .../rowset/segment_v2/hierarchical_data_reader.h   |   3 +-
 be/src/olap/rowset/segment_v2/segment.cpp          |   3 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  19 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  41 ++--
 be/src/vec/columns/column_object.cpp               |  65 +++++-
 be/src/vec/columns/column_object.h                 |   2 +
 be/src/vec/data_types/data_type_factory.cpp        |   4 +
 be/src/vec/data_types/data_type_object.cpp         |   4 +-
 be/src/vec/data_types/get_least_supertype.cpp      |   5 +
 be/src/vec/exprs/table_function/vexplode.cpp       |  30 +++
 .../vec/functions/array/function_array_utils.cpp   |   4 +-
 be/src/vec/olap/olap_data_convertor.cpp            |  57 ++++-
 be/src/vec/olap/olap_data_convertor.h              |  19 ++
 regression-test/data/variant_p0/nested.out         | Bin 14599 -> 16112 bytes
 regression-test/suites/variant_p0/nested.groovy    |  59 ++++-
 .../suites/variant_p0/update/load.groovy           |   4 +-
 21 files changed, 512 insertions(+), 173 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 190970a745f..1e53ddc7364 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -340,8 +340,7 @@ void CompactionMixin::build_basic_info() {
     std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
     std::transform(_input_rowsets.begin(), _input_rowsets.end(), 
rowset_metas.begin(),
                    [](const RowsetSharedPtr& rowset) { return 
rowset->rowset_meta(); });
-    _cur_tablet_schema = 
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas)
-                                 ->copy_without_variant_extracted_columns();
+    _cur_tablet_schema = 
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
 }
 
 bool CompactionMixin::handle_ordered_data_compaction() {
@@ -1105,8 +1104,7 @@ void CloudCompactionMixin::build_basic_info() {
     std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
     std::transform(_input_rowsets.begin(), _input_rowsets.end(), 
rowset_metas.begin(),
                    [](const RowsetSharedPtr& rowset) { return 
rowset->rowset_meta(); });
-    _cur_tablet_schema = 
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas)
-                                 ->copy_without_variant_extracted_columns();
+    _cur_tablet_schema = 
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
 }
 
 int64_t CloudCompactionMixin::get_compaction_permits() {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index ce026fe3935..0d8e68c03d1 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -27,6 +27,7 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
 #include "common/exception.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
@@ -267,14 +268,108 @@ Status 
VariantColumnReader::_create_hierarchical_reader(ColumnIterator** reader,
     return Status::OK();
 }
 
-Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
-                                         const TabletColumn& target_col) {
+bool VariantColumnReader::_read_flat_leaves(ReaderType type, const 
TabletColumn& target_col) {
+    auto relative_path = target_col.path_info_ptr()->copy_pop_front();
+    bool is_compaction_type =
+            (type == ReaderType::READER_BASE_COMPACTION ||
+             type == ReaderType::READER_CUMULATIVE_COMPACTION ||
+             type == ReaderType::READER_COLD_DATA_COMPACTION ||
+             type == ReaderType::READER_SEGMENT_COMPACTION ||
+             type == ReaderType::READER_FULL_COMPACTION || type == 
ReaderType::READER_CHECKSUM);
+    // For compaction operations (e.g., base compaction, cumulative 
compaction, cold data compaction,
+    // segment compaction, full compaction, or checksum reading), a legacy 
compaction style is applied
+    // when reading variant columns.
+    //
+    // Specifically:
+    // 1. If the target column is a root column (i.e., relative_path is empty) 
and it does not have any
+    //    subcolumns (i.e., target_col.variant_max_subcolumns_count() <= 0), 
then the legacy compaction style
+    //    is used.
+    // 2. If the target column is a nested subcolumn (i.e., relative_path is 
not empty), then the legacy
+    //    compaction style is also used.
+    //
+    // This ensures that during compaction, the reading behavior for variant 
columns remains consistent
+    // with historical processing methods, preventing potential data 
amplification issues.
+    return is_compaction_type &&
+           ((relative_path.empty() && 
target_col.variant_max_subcolumns_count() <= 0) ||
+            !relative_path.empty());
+}
+
+Status 
VariantColumnReader::_new_default_iter_with_same_nested(ColumnIterator** 
iterator,
+                                                               const 
TabletColumn& tablet_column) {
+    auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
+    // We find node that represents the same Nested type as path.
+    const auto* parent = _subcolumn_readers->find_best_match(relative_path);
+    VLOG_DEBUG << "find with path " << 
tablet_column.path_info_ptr()->get_path() << " parent "
+               << (parent ? parent->path.get_path() : "nullptr") << ", type "
+               << ", parent is nested " << (parent ? parent->is_nested() : 
false) << ", "
+               << TabletColumn::get_string_by_field_type(tablet_column.type());
+    // find it's common parent with nested part
+    // why not use parent->path->has_nested_part? because parent may not be a 
leaf node
+    // none leaf node may not contain path info
+    // Example:
+    // {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}}
+    // nested node path          : payload.commits(NESTED)
+    // tablet_column path_info   : payload.commits.issue.id(SCALAR)
+    // parent path node          : payload.commits.issue(TUPLE)
+    // leaf path_info            : payload.commits.issue.email(SCALAR)
+    if (parent && SubcolumnColumnReaders::find_parent(
+                          parent, [](const auto& node) { return 
node.is_nested(); })) {
+        /// Find any leaf of Nested subcolumn.
+        const auto* leaf = SubcolumnColumnReaders::find_leaf(
+                parent, [](const auto& node) { return 
node.path.has_nested_part(); });
+        assert(leaf);
+        std::unique_ptr<ColumnIterator> sibling_iter;
+        ColumnIterator* sibling_iter_ptr;
+        RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
+        sibling_iter.reset(sibling_iter_ptr);
+        *iterator = new DefaultNestedColumnIterator(std::move(sibling_iter),
+                                                    
leaf->data.file_column_type);
+    } else {
+        *iterator = new DefaultNestedColumnIterator(nullptr, nullptr);
+    }
+    return Status::OK();
+}
+
+Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** 
iterator,
+                                                           const TabletColumn& 
target_col) {
+    auto relative_path = target_col.path_info_ptr()->copy_pop_front();
+    // compaction need to read flat leaves nodes data to prevent from 
amplification
+    const auto* node =
+            target_col.has_path_info() ? 
_subcolumn_readers->find_leaf(relative_path) : nullptr;
+    if (!node) {
+        if (target_col.is_nested_subcolumn()) {
+            // using the sibling of the nested column to fill the target 
nested column
+            RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator, 
target_col));
+        } else {
+            std::unique_ptr<ColumnIterator> it;
+            RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it));
+            *iterator = it.release();
+        }
+        return Status::OK();
+    }
+    if (relative_path.empty()) {
+        // root path, use VariantRootColumnIterator
+        *iterator = *iterator =
+                new VariantRootColumnIterator(new 
FileColumnIterator(node->data.reader.get()));
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
+    return Status::OK();
+}
+
+Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const 
TabletColumn& target_col,
+                                         const StorageReadOptions* opt) {
     // root column use unique id, leaf column use parent_unique_id
     auto relative_path = target_col.path_info_ptr()->copy_pop_front();
     const auto* root = _subcolumn_readers->get_root();
     const auto* node =
             target_col.has_path_info() ? 
_subcolumn_readers->find_exact(relative_path) : nullptr;
 
+    if (opt != nullptr && _read_flat_leaves(opt->io_ctx.reader_type, 
target_col)) {
+        // original path, compaction with wide schema
+        return _new_iterator_with_flat_leaves(iterator, target_col);
+    }
+
     if (node != nullptr) {
         // relative_path means the root node, should always use 
HierarchicalDataReader
         if (node->is_leaf_node() && !relative_path.empty()) {
@@ -907,7 +1002,8 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, 
OrdinalPageIndexIterat
     return Status::OK();
 }
 
-Status ColumnReader::new_iterator(ColumnIterator** iterator, const 
TabletColumn& col) {
+Status ColumnReader::new_iterator(ColumnIterator** iterator, const 
TabletColumn& col,
+                                  const StorageReadOptions* opt) {
     return new_iterator(iterator);
 }
 
@@ -934,12 +1030,12 @@ Status ColumnReader::new_iterator(ColumnIterator** 
iterator) {
         case FieldType::OLAP_FIELD_TYPE_MAP: {
             return new_map_iterator(iterator);
         }
-        case FieldType::OLAP_FIELD_TYPE_VARIANT: {
-            // read from root data
-            // *iterator = new VariantRootColumnIterator(new 
FileColumnIterator(this));
-            *iterator = new FileColumnIterator(this);
-            return Status::OK();
-        }
+        // case FieldType::OLAP_FIELD_TYPE_VARIANT: {
+        //     // read from root data
+        //     *iterator = new VariantRootColumnIterator(new 
FileColumnIterator(this));
+        //     // *iterator = new FileColumnIterator(this);
+        //     return Status::OK();
+        // }
         default:
             return Status::NotSupported("unsupported type to create iterator: 
{}",
                                         std::to_string(int(type)));
@@ -1799,75 +1895,76 @@ void 
DefaultValueColumnIterator::_insert_many_default(vectorized::MutableColumnP
     }
 }
 
-// Status VariantRootColumnIterator::_process_root_column(
-//         vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& 
root_column,
-//         const vectorized::DataTypePtr& most_common_type) {
-//     auto& obj =
-//             dst->is_nullable()
-//                     ? assert_cast<vectorized::ColumnObject&>(
-//                               
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
-//                     : assert_cast<vectorized::ColumnObject&>(*dst);
-//
-//     // fill nullmap
-//     if (root_column->is_nullable() && dst->is_nullable()) {
-//         vectorized::ColumnUInt8& dst_null_map =
-//                 
assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
-//         vectorized::ColumnUInt8& src_null_map =
-//                 
assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column();
-//         dst_null_map.insert_range_from(src_null_map, 0, 
src_null_map.size());
-//     }
-//
-//     // add root column to a tmp object column
-//     auto tmp = vectorized::ColumnObject::create(true, false);
-//     auto& tmp_obj = assert_cast<vectorized::ColumnObject&>(*tmp);
-//     tmp_obj.add_sub_column({}, std::move(root_column), most_common_type);
-//
-//     // merge tmp object column to dst
-//     obj.insert_range_from(*tmp, 0, tmp_obj.rows());
-//
-//     // finalize object if needed
-//     if (!obj.is_finalized()) {
-//         obj.finalize();
-//     }
-//
-// #ifndef NDEBUG
-//     obj.check_consistency();
-// #endif
-//
-//     return Status::OK();
-// }
-//
-// Status VariantRootColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
-//                                              bool* has_null) {
-//     // read root column
-//     auto& obj =
-//             dst->is_nullable()
-//                     ? assert_cast<vectorized::ColumnObject&>(
-//                               
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
-//                     : assert_cast<vectorized::ColumnObject&>(*dst);
-//
-//     auto most_common_type = obj.get_most_common_type();
-//     auto root_column = most_common_type->create_column();
-//     RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
-//
-//     return _process_root_column(dst, root_column, most_common_type);
-// }
-//
-// Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, 
const size_t count,
-//                                                  
vectorized::MutableColumnPtr& dst) {
-//     // read root column
-//     auto& obj =
-//             dst->is_nullable()
-//                     ? assert_cast<vectorized::ColumnObject&>(
-//                               
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
-//                     : assert_cast<vectorized::ColumnObject&>(*dst);
-//
-//     auto most_common_type = obj.get_most_common_type();
-//     auto root_column = most_common_type->create_column();
-//     RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, 
root_column));
-//
-//     return _process_root_column(dst, root_column, most_common_type);
-// }
+Status VariantRootColumnIterator::_process_root_column(
+        vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& 
root_column,
+        const vectorized::DataTypePtr& most_common_type) {
+    auto& obj =
+            dst->is_nullable()
+                    ? assert_cast<vectorized::ColumnObject&>(
+                              
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
+                    : assert_cast<vectorized::ColumnObject&>(*dst);
+
+    // fill nullmap
+    if (root_column->is_nullable() && dst->is_nullable()) {
+        vectorized::ColumnUInt8& dst_null_map =
+                
assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
+        vectorized::ColumnUInt8& src_null_map =
+                
assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column();
+        dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
+    }
+
+    // add root column to a tmp object column
+    auto tmp = vectorized::ColumnObject::create(0, root_column->size());
+    auto& tmp_obj = assert_cast<vectorized::ColumnObject&>(*tmp);
+    tmp_obj.add_sub_column({}, std::move(root_column), most_common_type);
+    // 
tmp_obj.get_sparse_column()->assume_mutable()->insert_many_defaults(root_column->size());
+
+    // merge tmp object column to dst
+    obj.insert_range_from(*tmp, 0, tmp_obj.rows());
+
+    // finalize object if needed
+    if (!obj.is_finalized()) {
+        obj.finalize();
+    }
+
+#ifndef NDEBUG
+    obj.check_consistency();
+#endif
+
+    return Status::OK();
+}
+
+Status VariantRootColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                             bool* has_null) {
+    // read root column
+    auto& obj =
+            dst->is_nullable()
+                    ? assert_cast<vectorized::ColumnObject&>(
+                              
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
+                    : assert_cast<vectorized::ColumnObject&>(*dst);
+
+    auto most_common_type = obj.get_most_common_type();
+    auto root_column = most_common_type->create_column();
+    RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
+
+    return _process_root_column(dst, root_column, most_common_type);
+}
+
+Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const 
size_t count,
+                                                 vectorized::MutableColumnPtr& 
dst) {
+    // read root column
+    auto& obj =
+            dst->is_nullable()
+                    ? assert_cast<vectorized::ColumnObject&>(
+                              
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
+                    : assert_cast<vectorized::ColumnObject&>(*dst);
+
+    auto most_common_type = obj.get_most_common_type();
+    auto root_column = most_common_type->create_column();
+    RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));
+
+    return _process_root_column(dst, root_column, most_common_type);
+}
 
 Status DefaultNestedColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst) {
     bool has_null = false;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index 9ccf85e3ca8..8ca6cf240df 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -145,7 +145,8 @@ public:
     ~ColumnReader() override;
 
     // create a new column iterator. Client should delete returned iterator
-    virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn& 
col);
+    virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn& 
col,
+                                const StorageReadOptions*);
     Status new_iterator(ColumnIterator** iterator);
     Status new_array_iterator(ColumnIterator** iterator);
     Status new_struct_iterator(ColumnIterator** iterator);
@@ -301,7 +302,9 @@ public:
 
     Status init(const ColumnReaderOptions& opts, const SegmentFooterPB& 
footer, uint32_t column_id,
                 uint64_t num_rows, io::FileReaderSPtr file_reader);
-    Status new_iterator(ColumnIterator** iterator, const TabletColumn& col) 
override;
+
+    Status new_iterator(ColumnIterator** iterator, const TabletColumn& col,
+                        const StorageReadOptions* opt) override;
 
     const SubcolumnColumnReaders::Node* get_reader_by_path(
             const vectorized::PathInData& relative_path) const;
@@ -315,6 +318,11 @@ public:
     int64_t get_metadata_size() const override;
 
 private:
+    bool _read_flat_leaves(ReaderType type, const TabletColumn& target_col);
+    // init for compaction read
+    Status _new_default_iter_with_same_nested(ColumnIterator** iterator, const 
TabletColumn& col);
+    Status _new_iterator_with_flat_leaves(ColumnIterator** iterator, const 
TabletColumn& col);
+
     Status _create_hierarchical_reader(ColumnIterator** reader, 
vectorized::PathInData path,
                                        const SubcolumnColumnReaders::Node* 
node,
                                        const SubcolumnColumnReaders::Node* 
root);
@@ -666,40 +674,40 @@ private:
     int32_t _segment_id = 0;
 };
 
-// class VariantRootColumnIterator : public ColumnIterator {
-// public:
-//     VariantRootColumnIterator() = delete;
-//
-//     explicit VariantRootColumnIterator(FileColumnIterator* iter) { 
_inner_iter.reset(iter); }
-//
-//     ~VariantRootColumnIterator() override = default;
-//
-//     Status init(const ColumnIteratorOptions& opts) override { return 
_inner_iter->init(opts); }
-//
-//     Status seek_to_first() override { return _inner_iter->seek_to_first(); }
-//
-//     Status seek_to_ordinal(ordinal_t ord_idx) override {
-//         return _inner_iter->seek_to_ordinal(ord_idx);
-//     }
-//
-//     Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
-//         bool has_null;
-//         return next_batch(n, dst, &has_null);
-//     }
-//
-//     Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
-//
-//     Status read_by_rowids(const rowid_t* rowids, const size_t count,
-//                           vectorized::MutableColumnPtr& dst) override;
-//
-//     ordinal_t get_current_ordinal() const override { return 
_inner_iter->get_current_ordinal(); }
-//
-// private:
-//     Status _process_root_column(vectorized::MutableColumnPtr& dst,
-//                                 vectorized::MutableColumnPtr& root_column,
-//                                 const vectorized::DataTypePtr& 
most_common_type);
-//     std::unique_ptr<FileColumnIterator> _inner_iter;
-// };
+class VariantRootColumnIterator : public ColumnIterator {
+public:
+    VariantRootColumnIterator() = delete;
+
+    explicit VariantRootColumnIterator(FileColumnIterator* iter) { 
_inner_iter.reset(iter); }
+
+    ~VariantRootColumnIterator() override = default;
+
+    Status init(const ColumnIteratorOptions& opts) override { return 
_inner_iter->init(opts); }
+
+    Status seek_to_first() override { return _inner_iter->seek_to_first(); }
+
+    Status seek_to_ordinal(ordinal_t ord_idx) override {
+        return _inner_iter->seek_to_ordinal(ord_idx);
+    }
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
+        bool has_null;
+        return next_batch(n, dst, &has_null);
+    }
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
+
+    Status read_by_rowids(const rowid_t* rowids, const size_t count,
+                          vectorized::MutableColumnPtr& dst) override;
+
+    ordinal_t get_current_ordinal() const override { return 
_inner_iter->get_current_ordinal(); }
+
+private:
+    Status _process_root_column(vectorized::MutableColumnPtr& dst,
+                                vectorized::MutableColumnPtr& root_column,
+                                const vectorized::DataTypePtr& 
most_common_type);
+    std::unique_ptr<FileColumnIterator> _inner_iter;
+};
 
 // This iterator is used to read default value column
 class DefaultValueColumnIterator : public ColumnIterator {
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 895589d1cd3..3f001cb5671 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -329,6 +329,14 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
             return Status::OK();
         }
         case FieldType::OLAP_FIELD_TYPE_VARIANT: {
+            if (column->variant_max_subcolumns_count() <= 0) {
+                // Use ScalarColumnWriter to write it's only root data
+                std::unique_ptr<ColumnWriter> writer_local = 
std::unique_ptr<ColumnWriter>(
+                        new ScalarColumnWriter(opts, std::move(field), 
file_writer));
+                *writer = std::move(writer_local);
+                return Status::OK();
+            }
+            // Process columns with sparse column
             RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, 
writer));
             return Status::OK();
         }
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index b625c3b2d8a..9d0d7e3379b 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -57,11 +57,13 @@ Status HierarchicalDataReader::create(ColumnIterator** 
reader, vectorized::PathI
         // we could make sure the data could be fully merged, since some 
column may not be extracted but remains in root
         // like {"a" : "b" : {"e" : 1.1}} in jsonb format
         if (read_type == ReadType::MERGE_ROOT) {
-            ColumnIterator* it;
-            RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
+            // ColumnIterator* it;
+            // RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
             stream_iter->set_root(std::make_unique<SubstreamIterator>(
                     root->data.file_column_type->create_column(),
-                    std::unique_ptr<ColumnIterator>(it), 
root->data.file_column_type));
+                    std::unique_ptr<ColumnIterator>(
+                            new FileColumnIterator(root->data.reader.get())),
+                    root->data.file_column_type));
         }
     }
 
@@ -187,17 +189,18 @@ Status HierarchicalDataReader::_process_sub_columns(
 Status HierarchicalDataReader::_process_nested_columns(
         vectorized::ColumnObject& container_variant,
         const std::map<vectorized::PathInData, 
vectorized::PathsWithColumnAndType>&
-                nested_subcolumns) {
+                nested_subcolumns,
+        size_t nrows) {
     using namespace vectorized;
     // Iterate nested subcolumns and flatten them, the entry contains the 
nested subcolumns of the same nested parent
     // first we pick the first subcolumn as base array and using it's offset 
info. Then we flatten all nested subcolumns
     // into a new object column and wrap it with array column using the first 
element offsets.The wrapped array column
     // will type the type of ColumnObject::NESTED_TYPE, whih is 
Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
     for (const auto& entry : nested_subcolumns) {
-        MutableColumnPtr nested_object =
-                ColumnObject::create(container_variant.max_subcolumns_count());
         const auto* base_array =
-                
check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column));
+                
check_and_get_column<ColumnArray>(*remove_nullable(entry.second[0].column));
+        MutableColumnPtr nested_object = ColumnObject::create(
+                container_variant.max_subcolumns_count(), 
base_array->get_data().size());
         MutableColumnPtr offset = 
base_array->get_offsets_ptr()->assume_mutable();
         auto* nested_object_ptr = 
assert_cast<ColumnObject*>(nested_object.get());
         // flatten nested arrays
@@ -296,7 +299,7 @@ Status 
HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& con
 
     RETURN_IF_ERROR(_process_sub_columns(container_variant, 
non_nested_subcolumns));
 
-    RETURN_IF_ERROR(_process_nested_columns(container_variant, 
nested_subcolumns));
+    RETURN_IF_ERROR(_process_nested_columns(container_variant, 
nested_subcolumns, nrows));
 
     RETURN_IF_ERROR(_process_sparse_column(container_variant, nrows));
     container_variant.set_num_rows(nrows);
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index af9a584fbc1..a99c15bb12a 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -100,7 +100,8 @@ private:
     Status _process_nested_columns(
             vectorized::ColumnObject& container_variant,
             const std::map<vectorized::PathInData, 
vectorized::PathsWithColumnAndType>&
-                    nested_subcolumns);
+                    nested_subcolumns,
+            size_t nrows);
 
     Status _process_sparse_column(vectorized::ColumnObject& container_variant, 
size_t nrows);
 
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index f76a5e1b5e7..6e0317d8c72 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -801,10 +801,11 @@ Status Segment::new_column_iterator(const TabletColumn& 
tablet_column,
     }
     // init iterator by unique id
     ColumnIterator* it;
-    RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it, 
tablet_column));
+    RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it, 
tablet_column, opt));
     iter->reset(it);
 
     if (config::enable_column_type_check && !tablet_column.is_agg_state_type() 
&&
+        !tablet_column.has_path_info() &&
         tablet_column.type() != 
_column_readers.at(unique_id)->get_meta_type()) {
         LOG(WARNING) << "different type between schema and column reader,"
                      << " column schema name: " << tablet_column.name()
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 4cd91d22a3c..52a0be59716 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -21,6 +21,8 @@
 #include <gen_cpp/segment_v2.pb.h>
 #include <parallel_hashmap/phmap.h>
 
+#include <algorithm>
+
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "cloud/config.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
@@ -361,7 +363,10 @@ void SegmentWriter::_maybe_invalid_row_cache(const 
std::string& key) {
 // 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
 //    which will be used to contruct the new schema for rowset
 Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& 
data) {
-    if (_tablet_schema->num_variant_columns() == 0) {
+    if (_tablet_schema->num_variant_columns() == 0 ||
+        // need to handle sparse columns if variant_max_subcolumns_count > 0
+        std::any_of(_tablet_schema->columns().begin(), 
_tablet_schema->columns().end(),
+                    [](const auto& col) { return 
col->variant_max_subcolumns_count() > 0; })) {
         return Status::OK();
     }
     size_t column_id = _tablet_schema->num_columns();
@@ -707,7 +712,7 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
             << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
     _olap_data_convertor->clear_source_content();
 
-    // RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block));
+    RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block));
     return Status::OK();
 }
 
@@ -819,11 +824,11 @@ Status SegmentWriter::append_block(const 
vectorized::Block* block, size_t row_po
         }
     }
 
-    // if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
-    //     _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
-    //     RETURN_IF_ERROR(
-    //             
append_block_with_variant_subcolumns(*const_cast<vectorized::Block*>(block)));
-    // }
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+        RETURN_IF_ERROR(
+                
append_block_with_variant_subcolumns(*const_cast<vectorized::Block*>(block)));
+    }
 
     _num_rows_written += num_rows;
     _olap_data_convertor->clear_source_content();
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index c06a568fb95..649bdb636a5 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -164,10 +164,6 @@ void 
VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum
     for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
         _init_column_meta(meta->add_children_columns(), column_id, 
column.get_sub_column(i));
     }
-    // add sparse column to footer
-    for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
-        _init_column_meta(meta->add_sparse_columns(), -1, 
column.sparse_column_at(i));
-    }
     
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
 }
 
@@ -991,7 +987,9 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
 // 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
 //    which will be used to contruct the new schema for rowset
 Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
-    if (_tablet_schema->num_variant_columns() == 0) {
+    if (_tablet_schema->num_variant_columns() == 0 ||
+        std::any_of(_tablet_schema->columns().begin(), 
_tablet_schema->columns().end(),
+                    [](const auto& col) { return 
col->variant_max_subcolumns_count() > 0; })) {
         return Status::OK();
     }
     size_t column_id = _tablet_schema->num_columns();
@@ -1102,10 +1100,10 @@ Status VerticalSegmentWriter::write_batch() {
                 RETURN_IF_ERROR(_append_block_with_partial_content(data, 
full_block));
             }
         }
-        // for (auto& data : _batched_blocks) {
-        //     RowsInBlock full_rows_block {&full_block, data.row_pos, 
data.num_rows};
-        //     
RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block));
-        // }
+        for (auto& data : _batched_blocks) {
+            RowsInBlock full_rows_block {&full_block, data.row_pos, 
data.num_rows};
+            
RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block));
+        }
         for (auto& column_writer : _column_writers) {
             RETURN_IF_ERROR(column_writer->finish());
             RETURN_IF_ERROR(column_writer->write_data());
@@ -1170,18 +1168,19 @@ Status VerticalSegmentWriter::write_batch() {
         _num_rows_written += data.num_rows;
     }
 
-    // if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
-    //     _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
-    //     size_t original_writers_cnt = _column_writers.size();
-    //     // handle variant dynamic sub columns
-    //     for (auto& data : _batched_blocks) {
-    //         RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));
-    //     }
-    //     for (size_t i = original_writers_cnt; i < _column_writers.size(); 
++i) {
-    //         RETURN_IF_ERROR(_column_writers[i]->finish());
-    //         RETURN_IF_ERROR(_column_writers[i]->write_data());
-    //     }
-    // }
+    // no sparse columns, need to flatten
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+        size_t original_writers_cnt = _column_writers.size();
+        // handle variant dynamic sub columns
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));
+        }
+        for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i) 
{
+            RETURN_IF_ERROR(_column_writers[i]->finish());
+            RETURN_IF_ERROR(_column_writers[i]->write_data());
+        }
+    }
 
     _batched_blocks.clear();
     return Status::OK();
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index 28dbc59f383..b68dddf90dd 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -60,7 +60,10 @@
 #include "vec/common/schema_util.h"
 #include "vec/common/string_buffer.hpp"
 #include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
 #include "vec/data_types/convert_field_to_type.h"
+#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_nothing.h"
@@ -77,6 +80,9 @@ namespace doris::vectorized {
 namespace {
 
 DataTypePtr create_array_of_type(TypeIndex type, size_t num_dimensions, bool 
is_nullable) {
+    if (type == TypeIndex::Nothing) {
+        return std::make_shared<DataTypeNothing>();
+    }
     if (type == ColumnObject::MOST_COMMON_TYPE_ID) {
         // JSONB type MUST NOT wrapped in ARRAY column, it should be top level.
         // So we ignored num_dimensions.
@@ -365,6 +371,8 @@ ColumnObject::Subcolumn::Subcolumn(MutableColumnPtr&& 
data_, DataTypePtr type, b
     data.push_back(std::move(data_));
     data_types.push_back(type);
     data_serdes.push_back(generate_data_serdes(type, is_root));
+    DCHECK_EQ(data.size(), data_types.size());
+    DCHECK_EQ(data.size(), data_serdes.size());
 }
 
 ColumnObject::Subcolumn::Subcolumn(size_t size_, bool is_nullable_, bool 
is_root_)
@@ -408,6 +416,8 @@ void 
ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) {
     least_common_type = LeastCommonType {type, is_root};
     data_types.push_back(type);
     data_serdes.push_back(generate_data_serdes(type, is_root));
+    DCHECK_EQ(data.size(), data_types.size());
+    DCHECK_EQ(data.size(), data_serdes.size());
 }
 
 void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
@@ -519,6 +529,7 @@ ColumnObject::Subcolumn 
ColumnObject::Subcolumn::clone_with_default_values(
                 new_subcolumn.data[i], field_info.scalar_type_id, 
field_info.num_dimensions);
         new_subcolumn.data_types[i] = 
create_array_of_type(field_info.scalar_type_id,
                                                            
field_info.num_dimensions, is_nullable);
+        new_subcolumn.data_serdes[i] = 
generate_data_serdes(new_subcolumn.data_types[i], false);
     }
 
     return new_subcolumn;
@@ -700,6 +711,9 @@ void ColumnObject::Subcolumn::finalize(FinalizeMode mode) {
     data_types = {std::move(to_type)};
     data_serdes = {(generate_data_serdes(data_types[0], is_root))};
 
+    DCHECK_EQ(data.size(), data_types.size());
+    DCHECK_EQ(data.size(), data_serdes.size());
+
     num_of_defaults_in_prefix = 0;
 }
 
@@ -741,6 +755,7 @@ void ColumnObject::Subcolumn::pop_back(size_t n) {
     size_t sz = data.size() - num_removed;
     data.resize(sz);
     data_types.resize(sz);
+    data_serdes.resize(sz);
     num_of_defaults_in_prefix -= n;
 }
 
@@ -849,7 +864,7 @@ void ColumnObject::check_consistency() const {
                                serialized_sparse_column->size());
     }
 
-#ifdef NDEBUG
+#ifndef NDEBUG
     bool error = false;
     auto [path, value] = get_sparse_data_paths_and_values();
 
@@ -1292,7 +1307,6 @@ void ColumnObject::add_nested_subcolumn(const PathInData& 
key, const FieldInfo&
                 "Required size of subcolumn {} ({}) is inconsistent with 
column size ({})",
                 key.get_path(), new_size, num_rows);
     }
-    ENABLE_CHECK_CONSISTENCY(this);
 }
 
 void ColumnObject::set_num_rows(size_t n) {
@@ -1731,15 +1745,52 @@ struct Prefix {
     bool root_is_first_flag = true;
 };
 
+// skip empty nested json:
+// 1. nested array with only nulls, eg. [null. null],todo: think a better way 
to deal distinguish array null value and real null value.
+// 2. type is nothing
+bool ColumnObject::Subcolumn::is_empty_nested(size_t row) const {
+    TypeIndex base_type_id = least_common_type.get_base_type_id();
+    const DataTypePtr& type = least_common_type.get();
+    // check if it is empty nested json array, then skip
+    if (base_type_id == TypeIndex::VARIANT) {
+        DCHECK(type->equals(*ColumnObject::NESTED_TYPE));
+        Field field;
+        get(row, field);
+        if (field.get_type() == Field::Types::Array) {
+            const auto& array = field.get<Array>();
+            bool only_nulls_inside = true;
+            for (const auto& elem : array) {
+                if (elem.get_type() != Field::Types::Null) {
+                    only_nulls_inside = false;
+                    break;
+                }
+            }
+            // if only nulls then skip
+            return only_nulls_inside;
+        }
+    }
+    // skip nothing type
+    if (base_type_id == TypeIndex::Nothing) {
+        return true;
+    }
+    return false;
+}
+
 bool ColumnObject::is_visible_root_value(size_t nrow) const {
     if (is_null_root()) {
         return false;
     }
-    if (subcolumns.get_root()->data.is_null_at(nrow)) {
+    const auto* root = subcolumns.get_root();
+    if (root->data.is_null_at(nrow)) {
         return false;
     }
-    int ind = nrow - subcolumns.get_root()->data.num_of_defaults_in_prefix;
-    for (const auto& part : subcolumns.get_root()->data.data) {
+    if (root->data.least_common_type.get_base_type_id() == TypeIndex::VARIANT) 
{
+        // nested field
+        return !root->data.is_empty_nested(nrow);
+    }
+    size_t ind = nrow - root->data.num_of_defaults_in_prefix;
+    // null value as empty json, todo: think a better way to disinguish empty 
json and null json.
+    for (const auto& part : root->data.data) {
         if (ind < part->size()) {
             return !part->get_data_at(ind).empty();
         }
@@ -1776,6 +1827,10 @@ Status 
ColumnObject::serialize_one_row_to_json_format(int64_t row_num, BufferWri
         if (subcolumn->data.is_root) {
             continue;
         }
+        // skip empty nested value
+        if (subcolumn->data.is_empty_nested(row_num)) {
+            continue;
+        }
         /// We consider null value and absence of the path in a row as 
equivalent cases, because we cannot actually distinguish them.
         /// So, we don't output null values at all.
         if (!subcolumn->data.is_null_at(row_num)) {
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index fcd275b89d5..272ce541562 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -194,6 +194,8 @@ public:
 
         friend class ColumnObject;
 
+        bool is_empty_nested(size_t row) const;
+
     private:
         class LeastCommonType {
         public:
diff --git a/be/src/vec/data_types/data_type_factory.cpp 
b/be/src/vec/data_types/data_type_factory.cpp
index cb0fb452bfe..5681a9d0443 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -345,6 +345,10 @@ DataTypePtr DataTypeFactory::create_data_type(const 
TypeIndex& type_index, bool
     case TypeIndex::Time:
         nested = std::make_shared<vectorized::DataTypeTimeV2>();
         break;
+    case TypeIndex::VARIANT:
+        // only in nested type which is Array<ColumnObject>
+        nested = std::make_shared<vectorized::DataTypeObject>(0);
+        break;
     default:
         throw doris::Exception(ErrorCode::INTERNAL_ERROR, "invalid typeindex: 
{}",
                                getTypeName(type_index));
diff --git a/be/src/vec/data_types/data_type_object.cpp 
b/be/src/vec/data_types/data_type_object.cpp
index 551c30bd3c7..fef6b2cbcef 100644
--- a/be/src/vec/data_types/data_type_object.cpp
+++ b/be/src/vec/data_types/data_type_object.cpp
@@ -53,8 +53,8 @@ bool DataTypeObject::equals(const IDataType& rhs) const {
     auto rhs_type = typeid_cast<const DataTypeObject*>(&rhs);
     if (rhs_type && _max_subcolumns_count != 
rhs_type->variant_max_subcolumns_count()) {
         VLOG_DEBUG << "_max_subcolumns_count is" << _max_subcolumns_count
-                  << "rhs_type->variant_max_subcolumns_count()"
-                  << rhs_type->variant_max_subcolumns_count();
+                   << "rhs_type->variant_max_subcolumns_count()"
+                   << rhs_type->variant_max_subcolumns_count();
         return false;
     }
     return rhs_type && _max_subcolumns_count == 
rhs_type->variant_max_subcolumns_count();
diff --git a/be/src/vec/data_types/get_least_supertype.cpp 
b/be/src/vec/data_types/get_least_supertype.cpp
index a0f27482b5a..499b87ee87b 100644
--- a/be/src/vec/data_types/get_least_supertype.cpp
+++ b/be/src/vec/data_types/get_least_supertype.cpp
@@ -281,6 +281,11 @@ void get_least_supertype_jsonb(const TypeIndexSet& types, 
DataTypePtr* type) {
             *type = std::make_shared<DataTypeJsonb>();
             return;
         }
+        if (which.is_variant_type()) {
+            // only in nested type which is Array<ColumnObject>
+            *type = std::make_shared<DataTypeObject>(0);
+            return;
+        }
         if (which.is_date_v2()) {
             *type = std::make_shared<DataTypeDateV2>();
             return;
diff --git a/be/src/vec/exprs/table_function/vexplode.cpp 
b/be/src/vec/exprs/table_function/vexplode.cpp
index feef58cd277..8a6e91e84e0 100644
--- a/be/src/vec/exprs/table_function/vexplode.cpp
+++ b/be/src/vec/exprs/table_function/vexplode.cpp
@@ -37,6 +37,36 @@ VExplodeTableFunction::VExplodeTableFunction() {
     _fn_name = "vexplode";
 }
 
+Status VExplodeTableFunction::_process_init_variant(Block* block, int 
value_column_idx) {
+    // explode variant array
+    auto& variant_column = *assert_cast<ColumnObject*>(
+            remove_nullable(block->get_by_position(value_column_idx)
+                                    .column->convert_to_full_column_if_const())
+                    ->assume_mutable()
+                    .get());
+    variant_column.finalize();
+    _detail.output_as_variant = true;
+    if (!variant_column.is_null_root()) {
+        _array_column = variant_column.get_root();
+        // We need to wrap the output nested column within a variant column.
+        // Otherwise the type is missmatched
+        const auto* array_type = check_and_get_data_type<DataTypeArray>(
+                remove_nullable(variant_column.get_root_type()).get());
+        if (array_type == nullptr) {
+            return Status::NotSupported("explode not support none array type 
{}",
+                                        
variant_column.get_root_type()->get_name());
+        }
+        _detail.nested_type = array_type->get_nested_type();
+    } else {
+        // null root, use nothing type
+        _array_column = 
ColumnNullable::create(ColumnArray::create(ColumnNothing::create(0)),
+                                               ColumnUInt8::create(0));
+        
_array_column->assume_mutable()->insert_many_defaults(variant_column.size());
+        _detail.nested_type = std::make_shared<DataTypeNothing>();
+    }
+    return Status::OK();
+}
+
 Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
     CHECK(_expr_context->root()->children().size() == 1)
             << "VExplodeTableFunction only support 1 child but has "
diff --git a/be/src/vec/functions/array/function_array_utils.cpp 
b/be/src/vec/functions/array/function_array_utils.cpp
index 89e1d9b860b..dfe9cda5174 100644
--- a/be/src/vec/functions/array/function_array_utils.cpp
+++ b/be/src/vec/functions/array/function_array_utils.cpp
@@ -55,9 +55,7 @@ bool extract_column_array_info(const IColumn& src, 
ColumnArrayExecutionData& dat
     if (data.output_as_variant &&
         !WhichDataType(remove_nullable(data.nested_type)).is_variant_type()) {
         // set variant root column/type to from column/type
-        const auto& data_type_object =
-                assert_cast<const 
DataTypeObject&>(*remove_nullable(data.nested_type));
-        auto variant = 
ColumnObject::create(data_type_object.variant_max_subcolumns_count());
+        auto variant = ColumnObject::create(0);
         variant->create_root(data.nested_type, 
make_nullable(data.nested_col)->assume_mutable());
         data.nested_col = variant->get_ptr();
     }
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index a35109d6575..099f9af080c 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -212,7 +212,10 @@ 
OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
         return 
std::make_unique<OlapColumnDataConvertorSimple<vectorized::Float64>>();
     }
     case FieldType::OLAP_FIELD_TYPE_VARIANT: {
-        return std::make_unique<OlapColumnDataConvertorVariant>();
+        if (column.variant_max_subcolumns_count() > 0) {
+            return std::make_unique<OlapColumnDataConvertorVariant>();
+        }
+        return std::make_unique<OlapColumnDataConvertorVariantRoot>();
     }
     case FieldType::OLAP_FIELD_TYPE_STRUCT: {
         return create_struct_convertor(column);
@@ -1096,6 +1099,58 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
     return Status::OK();
 }
 
+void 
OlapBlockDataConvertor::OlapColumnDataConvertorVariantRoot::set_source_column(
+        const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t 
num_rows) {
+    // set
+    const ColumnNullable* nullable_column = nullptr;
+    if (typed_column.column->is_nullable()) {
+        nullable_column = assert_cast<const 
ColumnNullable*>(typed_column.column.get());
+        _nullmap = nullable_column->get_null_map_data().data();
+    }
+    const auto& variant =
+            nullable_column == nullptr
+                    ? assert_cast<const 
vectorized::ColumnObject&>(*typed_column.column)
+                    : assert_cast<const vectorized::ColumnObject&>(
+                              nullable_column->get_nested_column());
+    if (variant.is_null_root()) {
+        auto root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
+        auto root_col = root_type->create_column();
+        root_col->insert_many_defaults(variant.rows());
+        const_cast<ColumnObject&>(variant).create_root(root_type, 
std::move(root_col));
+        variant.check_consistency();
+    }
+    // ensure data finalized
+    _source_column_ptr = &const_cast<ColumnObject&>(variant);
+    
static_cast<void>(_source_column_ptr->finalize(ColumnObject::FinalizeMode::WRITE_MODE));
+    _root_data_convertor = 
std::make_unique<OlapColumnDataConvertorVarChar>(true);
+    // Make sure the root node is jsonb storage type
+    auto expected_root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
+    _source_column_ptr->ensure_root_node_type(expected_root_type);
+    _root_data_convertor->set_source_column(
+            {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, row_pos, 
num_rows);
+    
OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column,
 row_pos,
+                                                                           
num_rows);
+}
+
+// convert root data
+Status 
OlapBlockDataConvertor::OlapColumnDataConvertorVariantRoot::convert_to_olap() {
+#ifndef NDEBUG
+    _source_column_ptr->check_consistency();
+#endif
+    const auto* nullable = assert_cast<const 
ColumnNullable*>(_source_column_ptr->get_root().get());
+    const auto* root_column = assert_cast<const 
ColumnString*>(&nullable->get_nested_column());
+    RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, 
root_column));
+    return Status::OK();
+}
+
+const void* 
OlapBlockDataConvertor::OlapColumnDataConvertorVariantRoot::get_data() const {
+    return _root_data_convertor->get_data();
+}
+const void* 
OlapBlockDataConvertor::OlapColumnDataConvertorVariantRoot::get_data_at(
+        size_t offset) const {
+    return _root_data_convertor->get_data_at(offset);
+}
+
 void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column(
         const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t 
num_rows) {
     // set
diff --git a/be/src/vec/olap/olap_data_convertor.h 
b/be/src/vec/olap/olap_data_convertor.h
index 3c21eb4fc51..f64f9d4464f 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -498,6 +498,25 @@ private:
         DataTypeMap _data_type;
     }; //OlapColumnDataConvertorMap
 
+    class OlapColumnDataConvertorVariantRoot : public 
OlapColumnDataConvertorBase {
+    public:
+        OlapColumnDataConvertorVariantRoot() = default;
+
+        void set_source_column(const ColumnWithTypeAndName& typed_column, 
size_t row_pos,
+                               size_t num_rows) override;
+        Status convert_to_olap() override;
+
+        const void* get_data() const override;
+        const void* get_data_at(size_t offset) const override;
+
+    private:
+        // // encodes sparsed columns
+        // const ColumnString* _root_data_column;
+        // // _nullmap contains null info for this variant
+        std::unique_ptr<OlapColumnDataConvertorVarChar> _root_data_convertor;
+        ColumnObject* _source_column_ptr;
+    };
+
     class OlapColumnDataConvertorVariant : public OlapColumnDataConvertorBase {
     public:
         OlapColumnDataConvertorVariant() = default;
diff --git a/regression-test/data/variant_p0/nested.out 
b/regression-test/data/variant_p0/nested.out
index 2c105a68778..c9045a2e600 100644
Binary files a/regression-test/data/variant_p0/nested.out and 
b/regression-test/data/variant_p0/nested.out differ
diff --git a/regression-test/suites/variant_p0/nested.groovy 
b/regression-test/suites/variant_p0/nested.groovy
index 90728df2532..6b8afdaabf9 100644
--- a/regression-test/suites/variant_p0/nested.groovy
+++ b/regression-test/suites/variant_p0/nested.groovy
@@ -32,7 +32,7 @@ suite("regression_test_variant_nested", "p0"){
                 )
                 DUPLICATE KEY(`k`)
                 DISTRIBUTED BY HASH(k) BUCKETS 4
-                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "variant_enable_flatten_nested" = "true");
+                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "variant_enable_flatten_nested" = "true", 
"variant_max_subcolumns_count" = "0");
             """
         sql """
             insert into var_nested values (1, '{"xx" : 10}');
@@ -159,18 +159,69 @@ 
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
                 )
                 UNIQUE KEY(`k`)
                 DISTRIBUTED BY HASH(k) BUCKETS 1
-                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "enable_unique_key_merge_on_write" = "true", 
"variant_enable_flatten_nested" = "true");
+                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "enable_unique_key_merge_on_write" = "true", 
"variant_enable_flatten_nested" = "true", "variant_max_subcolumns_count" = "0");
             """
         sql """insert into var_nested2 select * from var_nested order by k 
limit 1024"""
         qt_sql """select  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
 [...]
-        qt_sql """select v['nested'] from var_nested2 where k < 10 order by k 
limit 10;"""
-        // explode variant array
+        qt_sql """select v['nested'] from var_nested2 where k < 10 and 
length(v['nested']) > 3 order by k limit 10;"""
+        // 0. nomal explode variant array
         order_qt_explode_sql """select count(),cast(vv['xx'] as int) from 
var_nested lateral view explode_variant_array(v['nested']) tmp as vv where 
vv['xx'] = 10 group by cast(vv['xx'] as int)"""
         sql """truncate table var_nested2"""
         sql """insert into var_nested2 values(1119111, 
'{"eventId":1,"firstName":"Name1","lastName":"Surname1","body":{"phoneNumbers":[{"number":"5550219210","type":"GSM","callLimit":5},{"number":"02124713252","type":"HOME","callLimit":3},{"number":"05550219211","callLimit":2,"type":"WORK"}]}}
 ')"""
         order_qt_explode_sql """select v['eventId'], phone_numbers from 
var_nested2 lateral view explode_variant_array(v['body']['phoneNumbers']) tmp1 
as phone_numbers
 where phone_numbers['type'] = 'GSM' OR phone_numbers['type'] = 'HOME' and 
phone_numbers['callLimit'] > 2;"""
+
+        // test array_function
+        sql "DROP TABLE IF EXISTS var_nested_array_agg"
+        sql """
+                CREATE TABLE IF NOT EXISTS var_nested_array_agg(
+                    k bigint,
+                    v variant
+                )
+                UNIQUE KEY(`k`)
+                DISTRIBUTED BY HASH(k) BUCKETS 1
+                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "enable_unique_key_merge_on_write" = "true", 
"variant_enable_flatten_nested" = "true", "variant_max_subcolumns_count" = "0");
+            """
+        sql "insert into var_nested_array_agg select * from var_nested"
+        // 1. array_contains
+        qt_sql "select * from var_nested_array_agg where 
array_contains(cast(v['nested']['xx'] as array<int>), 10) order by k limit 10"
+        // 2. array_agg scalar
+        sql "select k, array_agg(cast(v['nested'] as text))  from 
var_nested_array_agg group by k limit 10"
+
+        // test explode_variant_array with abonomal case
+        sql "DROP TABLE IF EXISTS var_nested_explode_variant_with_abnomal"
+        sql """
+                CREATE TABLE IF NOT EXISTS 
var_nested_explode_variant_with_abnomal(
+                    k bigint,
+                    v variant
+                )
+                UNIQUE KEY(`k`)
+                DISTRIBUTED BY HASH(k) BUCKETS 1
+                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "enable_unique_key_merge_on_write" = "true", 
"variant_enable_flatten_nested" = "true",  "variant_max_subcolumns_count" = 
"0");
+            """
+        sql "insert into var_nested_explode_variant_with_abnomal select * from 
var_nested"
+        // 1. v['nested']['x'] is null root
+        order_qt_explode_sql """select count(),cast(vv as int) from 
var_nested_explode_variant_with_abnomal lateral view 
explode_variant_array(v['nested']['x']) tmp as vv where vv = 10 group by 
cast(vv as int)"""
+        // 2. v['nested']['xx'] is normal array
+        order_qt_explode_sql """select count(),cast(vv as int) from 
var_nested_explode_variant_with_abnomal lateral view 
explode_variant_array(v['nested']['xx']) tmp as vv where vv = 10 group by 
cast(vv as int)"""
+        // 3. v['xx'] is none array scalar type 
+        test {
+            sql """select count(),cast(vv as int) from 
var_nested_explode_variant_with_abnomal lateral view 
explode_variant_array(v['xx']) tmp as vv where vv = 10 group by cast(vv as 
int)"""
+            exception("explode not support none array type")
+        }
+        // 4. v['k1'] is json scalar type 
+        test {
+            sql """select count(),cast(vv as int) from 
var_nested_explode_variant_with_abnomal lateral view 
explode_variant_array(v['k1']) tmp as vv where vv = 10 group by cast(vv as 
int)"""
+            exception("explode not support none array type")
+        }
+        // 5. toplevel nested array
+        sql "truncate table var_nested_explode_variant_with_abnomal"
+        sql """insert into var_nested_explode_variant_with_abnomal values(1, 
'[{"a" : 10}, {"b" : "20", "c" :1024, "a" : 11}]')"""
+        sql """insert into var_nested_explode_variant_with_abnomal values(2, 
'[{"a" : 10}, {"b" : "20", "a" : 150}]')"""
+        order_qt_explode_sql """select count(),cast(vv as int) from 
var_nested_explode_variant_with_abnomal lateral view 
explode_variant_array(v['a']) tmp as vv where vv = 10 group by cast(vv as 
int)"""
+        // FIXME after refator
+        // order_qt_explode_sql """select count(),cast(vv as int) from 
var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v) 
tmp as vv where vv['a'] = 10 group by cast(vv as int)"""
     } finally {
         // reset flags
     }
diff --git a/regression-test/suites/variant_p0/update/load.groovy 
b/regression-test/suites/variant_p0/update/load.groovy
index a857a912da3..f0703cf6e85 100644
--- a/regression-test/suites/variant_p0/update/load.groovy
+++ b/regression-test/suites/variant_p0/update/load.groovy
@@ -58,7 +58,7 @@ suite("update_test_load", "p0") {
         )
         DUPLICATE KEY(`k`)
         DISTRIBUTED BY HASH(k) BUCKETS 6
-        properties("replication_num" = "1", "disable_auto_compaction" = 
"true");
+        properties("replication_num" = "1", "disable_auto_compaction" = 
"true", "variant_max_subcolumns_count" = "0");
     """
 
     for (int i = 0; i < 10; i++) {
@@ -109,7 +109,7 @@ suite("update_test_load", "p0") {
             )
             DUPLICATE KEY(`k`)
             DISTRIBUTED BY HASH(k) BUCKETS 6
-            properties("replication_num" = "1", "disable_auto_compaction" = 
"true");
+            properties("replication_num" = "1", "disable_auto_compaction" = 
"true", "variant_max_subcolumns_count" = "0");
         """
 
         for (int i = 0; i < 10; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to