This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d94ad1874ce branch-3.1: [fix](variant) fix the reading core caused by 
inserting nested column and scalar column in variant sub-column #53083 (#53977)
d94ad1874ce is described below

commit d94ad1874ce2566dbc5594a932ba0470889163dd
Author: amory <[email protected]>
AuthorDate: Thu Jul 31 14:30:22 2025 +0800

    branch-3.1: [fix](variant) fix the reading core caused by inserting nested 
column and scalar column in variant sub-column #53083 (#53977)
    
    backport: #53083
---
 be/src/cloud/cloud_meta_mgr.cpp                    |  13 +-
 be/src/cloud/schema_cloud_dictionary_cache.cpp     |  36 ++-
 be/src/olap/base_tablet.cpp                        |   4 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |   7 +-
 be/src/olap/rowset/beta_rowset_writer.h            |   2 +-
 be/src/service/internal_service.cpp                |  18 +-
 be/src/vec/common/schema_util.cpp                  |  99 ++++--
 be/src/vec/common/schema_util.h                    |  27 +-
 be/src/vec/json/parse2column.cpp                   |  10 +
 .../cloud/test_schema_cloud_dictionary_cache.cpp   |  54 ++++
 be/test/vec/common/schema_util_test.cpp            | 334 ++++++++++++++++++++-
 .../apache/doris/datasource/InternalCatalog.java   |  10 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  15 +
 gensrc/thrift/AgentService.thrift                  |   2 +-
 regression-test/data/variant_p0/nested/load.out    | Bin 0 -> 7378 bytes
 regression-test/data/variant_p0/nested/sql/q01.out | Bin 0 -> 377 bytes
 .../suites/variant_p0/delete_update.groovy         |   1 +
 regression-test/suites/variant_p0/nested.groovy    |   1 +
 .../suites/variant_p0/nested/load.groovy           | 198 ++++++++++++
 .../suites/variant_p0/nested/sql/q01.sql           |  13 +
 regression-test/suites/variant_p0/nested2.groovy   |  11 +-
 .../suites/variant_p0/predefine/load.groovy        |   1 +
 .../test_double_write_when_schema_change.groovy    |   1 +
 23 files changed, 801 insertions(+), 56 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index d26df9dbf30..f478a833a07 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1011,11 +1011,18 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, 
const std::string& job_i
     // Replace schema dictionary keys based on the rowset's index ID to 
maintain schema consistency.
     CloudStorageEngine& engine = 
ExecEnv::GetInstance()->storage_engine().to_cloud();
     // if not enable dict cache, then directly return true to avoid refresh
-    bool replaced =
+    Status replaced_st =
             config::variant_use_cloud_schema_dict_cache
                     ? 
engine.get_schema_cloud_dictionary_cache().replace_schema_to_dict_keys(
                               rs_meta_pb.index_id(), req.mutable_rowset_meta())
-                    : true;
+                    : Status::OK();
+    // if the replaced_st is not ok and alse not NotFound, then we need to 
just return the replaced_st
+    VLOG_DEBUG << "replace schema to dict keys, replaced_st: " << 
replaced_st.to_string()
+               << ", replaced_st.is<ErrorCode::NOT_FOUND>(): "
+               << replaced_st.is<ErrorCode::NOT_FOUND>();
+    if (!replaced_st.ok() && !replaced_st.is<ErrorCode::NOT_FOUND>()) {
+        return replaced_st;
+    }
     Status st = retry_rpc("commit rowset", req, &resp, 
&MetaService_Stub::commit_rowset);
     if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
         if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
@@ -1029,7 +1036,7 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, 
const std::string& job_i
     // If dictionary replacement fails, it may indicate that the local schema 
dictionary is outdated.
     // Refreshing the dictionary here ensures that the rowset metadata is 
updated with the latest schema definitions,
     // which is critical for maintaining consistency between the rowset and 
its corresponding schema.
-    if (!replaced) {
+    if (replaced_st.is<ErrorCode::NOT_FOUND>()) {
         RETURN_IF_ERROR(
                 
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
     }
diff --git a/be/src/cloud/schema_cloud_dictionary_cache.cpp 
b/be/src/cloud/schema_cloud_dictionary_cache.cpp
index 25f0b232702..9fdde420ecb 100644
--- a/be/src/cloud/schema_cloud_dictionary_cache.cpp
+++ b/be/src/cloud/schema_cloud_dictionary_cache.cpp
@@ -19,6 +19,7 @@
 
 #include <fmt/core.h>
 #include <gen_cpp/olap_file.pb.h>
+#include <vec/common/schema_util.h>
 
 #include <functional>
 #include <memory>
@@ -62,6 +63,27 @@ SchemaCloudDictionarySPtr 
SchemaCloudDictionaryCache::_lookup(int64_t index_id)
     return dict;
 }
 
+Status check_path_amibigus(const SchemaCloudDictionary& schema, 
RowsetMetaCloudPB* rowset_meta) {
+    // if enable_variant_flatten_nested is false, then we don't need to check 
path amibigus
+    if (!rowset_meta->tablet_schema().enable_variant_flatten_nested()) {
+        return Status::OK();
+    }
+    // try to get all the paths in the rowset meta
+    vectorized::PathsInData all_paths;
+    for (const auto& column : rowset_meta->tablet_schema().column()) {
+        vectorized::PathInData path_in_data;
+        path_in_data.from_protobuf(column.column_path_info());
+        all_paths.push_back(path_in_data);
+    }
+    // try to get all the paths in the schema dict
+    for (const auto& [_, column] : schema.column_dict()) {
+        vectorized::PathInData path_in_data;
+        path_in_data.from_protobuf(column.column_path_info());
+        all_paths.push_back(path_in_data);
+    }
+    
RETURN_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(all_paths));
+    return Status::OK();
+}
 /**
  * Processes dictionary entries by matching items from the given item map.
  * It maps items to their dictionary keys, then adds these keys to the rowset 
metadata.
@@ -101,7 +123,7 @@ Status process_dictionary(SchemaCloudDictionary& dict,
         return output;
     };
 
-    google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+    google::protobuf::RepeatedPtrField<ItemPB> none_extracted_items;
     std::unordered_map<std::string, int> reversed_dict;
     for (const auto& [key, val] : item_dict) {
         reversed_dict[serialize_fn(val)] = key;
@@ -110,7 +132,7 @@ Status process_dictionary(SchemaCloudDictionary& dict,
     for (const auto& item : items) {
         if (filter(item)) {
             // Filter none extended items, mainly extended columns and 
extended indexes
-            *none_ext_items.Add() = item;
+            *none_extracted_items.Add() = item;
             continue;
         }
         const std::string serialized_key = serialize_fn(item);
@@ -127,7 +149,7 @@ Status process_dictionary(SchemaCloudDictionary& dict,
     }
     // clear extended items to prevent writing them to fdb
     if (result != nullptr) {
-        result->Swap(&none_ext_items);
+        result->Swap(&none_extracted_items);
     }
     return Status::OK();
 }
@@ -137,11 +159,15 @@ Status 
SchemaCloudDictionaryCache::replace_schema_to_dict_keys(int64_t index_id,
     if (!rowset_meta->has_variant_type_in_schema()) {
         return Status::OK();
     }
+    // first attempt to get dict from cache
     auto dict = _lookup(index_id);
     if (!dict) {
-        g_schema_dict_cache_miss_count << 1;
-        return Status::NotFound<false>("Not found dict {}", index_id);
+        // if not found the dict in cache, then refresh the dict from remote 
meta service
+        RETURN_IF_ERROR(refresh_dict(index_id, &dict));
     }
+    // here we should have the dict
+    DCHECK(dict);
+    RETURN_IF_ERROR(check_path_amibigus(*dict, rowset_meta));
     auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
     // Process column dictionary: add keys for non-extended columns.
     auto column_filter = [&](const doris::ColumnPB& col) -> bool { return 
col.unique_id() >= 0; };
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 48bb54f593b..3cc6bd821ea 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -215,11 +215,13 @@ Status BaseTablet::update_by_least_common_schema(const 
TabletSchemaSPtr& update_
     CHECK(_max_version_schema->schema_version() >= 
update_schema->schema_version());
     TabletSchemaSPtr final_schema;
     bool check_column_size = true;
+    VLOG_DEBUG << "dump _max_version_schema: " << 
_max_version_schema->dump_full_schema();
+    VLOG_DEBUG << "dump update_schema: " << update_schema->dump_full_schema();
     RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
             {_max_version_schema, update_schema}, _max_version_schema, 
final_schema,
             check_column_size));
     _max_version_schema = final_schema;
-    VLOG_DEBUG << "dump updated tablet schema: " << 
final_schema->dump_structure();
+    VLOG_DEBUG << "dump updated tablet schema: " << 
final_schema->dump_full_schema();
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 393d5bc1d37..af053f62fae 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -863,13 +863,13 @@ int64_t BetaRowsetWriter::_num_seg() const {
 // Eg. rowset schema:       A(int),    B(float),  C(int), D(int)
 // _tabelt->tablet_schema:  A(bigint), B(double)
 //  => update_schema:       A(bigint), B(double), C(int), D(int)
-void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) 
{
+Status BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr 
flush_schema) {
     std::lock_guard<std::mutex> lock(*(_context.schema_lock));
     TabletSchemaSPtr update_schema;
     if (_context.merged_tablet_schema == nullptr) {
         _context.merged_tablet_schema = _context.tablet_schema;
     }
-    static_cast<void>(vectorized::schema_util::get_least_common_schema(
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
             {_context.merged_tablet_schema, flush_schema}, nullptr, 
update_schema));
     CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
             << "Rowset merge schema columns count is " << 
update_schema->num_columns()
@@ -878,6 +878,7 @@ void 
BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
             << " flush_schema: " << flush_schema->dump_structure();
     _context.merged_tablet_schema.swap(update_schema);
     VLOG_DEBUG << "dump rs schema: " << 
_context.tablet_schema->dump_structure();
+    return Status::OK();
 }
 
 Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool 
check_segment_num) {
@@ -1092,7 +1093,7 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t 
segment_id, const SegmentStati
     }
     // tablet schema updated
     if (flush_schema != nullptr) {
-        update_rowset_schema(flush_schema);
+        RETURN_IF_ERROR(update_rowset_schema(flush_schema));
     }
     if (_context.mow_context != nullptr) {
         // ensure that the segment file writing is complete
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index d32fd8d2384..78c39caf568 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -198,7 +198,7 @@ public:
     }
 
 private:
-    void update_rowset_schema(TabletSchemaSPtr flush_schema);
+    Status update_rowset_schema(TabletSchemaSPtr flush_schema);
     // build a tmp rowset for load segment to calc delete_bitmap
     // for this segment
 protected:
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index f62c66176c3..d33a8240a95 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1181,8 +1181,13 @@ void 
PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
             if (!schemas.empty() && st.ok()) {
                 // merge all
                 TabletSchemaSPtr merged_schema;
-                
static_cast<void>(vectorized::schema_util::get_least_common_schema(schemas, 
nullptr,
-                                                                               
    merged_schema));
+                st = vectorized::schema_util::get_least_common_schema(schemas, 
nullptr,
+                                                                      
merged_schema);
+                if (!st.ok()) {
+                    LOG(WARNING) << "Failed to get least common schema: " << 
st.to_string();
+                    st = Status::InternalError("Failed to get least common 
schema: {}",
+                                               st.to_string());
+                }
                 VLOG_DEBUG << "dump schema:" << 
merged_schema->dump_structure();
                 merged_schema->reserve_extracted_columns();
                 merged_schema->to_schema_pb(response->mutable_merged_schema());
@@ -1223,8 +1228,13 @@ void 
PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
                 if (!tablet_schemas.empty()) {
                     // merge all
                     TabletSchemaSPtr merged_schema;
-                    
static_cast<void>(vectorized::schema_util::get_least_common_schema(
-                            tablet_schemas, nullptr, merged_schema));
+                    st = 
vectorized::schema_util::get_least_common_schema(tablet_schemas, nullptr,
+                                                                          
merged_schema);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "Failed to get least common schema: " 
<< st.to_string();
+                        st = Status::InternalError("Failed to get least common 
schema: {}",
+                                                   st.to_string());
+                    }
                     
merged_schema->to_schema_pb(response->mutable_merged_schema());
                     VLOG_DEBUG << "dump schema:" << 
merged_schema->dump_structure();
                 }
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index 93868c8ffe8..3dc6c4f69b6 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -286,11 +286,65 @@ TabletColumn get_column_by_type(const 
vectorized::DataTypePtr& data_type, const
     return result;
 }
 
-void update_least_schema_internal(const std::map<PathInData, DataTypes>& 
subcolumns_types,
-                                  TabletSchemaSPtr& common_schema, bool 
update_sparse_column,
-                                  int32_t variant_col_unique_id,
-                                  const std::map<std::string, 
TabletColumnPtr>& typed_columns,
-                                  std::set<PathInData>* path_set) {
+// check if two paths which same prefix have different structure
+static bool has_different_structure_in_same_path(const PathInData::Parts& lhs,
+                                                 const PathInData::Parts& rhs) 
{
+    if (lhs.size() != rhs.size()) {
+        return false; // different size means different structure
+    }
+    // Since we group by path string, lhs and rhs must have the same size and 
keys
+    // We only need to check if they have different nested structure
+    for (size_t i = 0; i < lhs.size(); ++i) {
+        if (lhs[i] != rhs[i]) {
+            VLOG_DEBUG << fmt::format(
+                    "Check different structure: {} vs {}, lhs[i].is_nested: 
{}, rhs[i].is_nested: "
+                    "{}",
+                    lhs[i].key, rhs[i].key, lhs[i].is_nested, 
rhs[i].is_nested);
+            return true;
+        }
+    }
+    return false;
+}
+
+Status check_variant_has_no_ambiguous_paths(const PathsInData& tuple_paths) {
+    // Group paths by their string representation to reduce comparisons
+    std::unordered_map<std::string, std::vector<size_t>> path_groups;
+
+    for (size_t i = 0; i < tuple_paths.size(); ++i) {
+        // same path should have same structure, so we group them by path
+        path_groups[tuple_paths[i].get_path()].push_back(i);
+        // print part of tuple_paths[i]
+        VLOG_DEBUG << "tuple_paths[i]: " << tuple_paths[i].get_path();
+    }
+
+    // Only compare paths within the same group
+    for (const auto& [path_str, indices] : path_groups) {
+        if (indices.size() <= 1) {
+            continue; // No conflicts possible
+        }
+
+        // Compare all pairs within this group
+        for (size_t i = 0; i < indices.size(); ++i) {
+            for (size_t j = 0; j < i; ++j) {
+                if 
(has_different_structure_in_same_path(tuple_paths[indices[i]].get_parts(),
+                                                         
tuple_paths[indices[j]].get_parts())) {
+                    return Status::DataQualityError(
+                            "Ambiguous paths: {} vs {} with different nested 
part {} vs {}",
+                            tuple_paths[indices[i]].get_path(), 
tuple_paths[indices[j]].get_path(),
+                            tuple_paths[indices[i]].has_nested_part(),
+                            tuple_paths[indices[j]].has_nested_part());
+                }
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status update_least_schema_internal(const std::map<PathInData, DataTypes>& 
subcolumns_types,
+                                    TabletSchemaSPtr& common_schema, bool 
update_sparse_column,
+                                    int32_t variant_col_unique_id,
+                                    const std::map<std::string, 
TabletColumnPtr>& typed_columns,
+                                    std::set<PathInData>* path_set) {
     PathsInData tuple_paths;
     DataTypes tuple_types;
     CHECK(common_schema.use_count() == 1);
@@ -351,11 +405,12 @@ void update_least_schema_internal(const 
std::map<PathInData, DataTypes>& subcolu
             path_set->insert(tuple_paths[i]);
         }
     }
+    return Status::OK();
 }
 
-void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
-                                TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
-                                std::set<PathInData>* path_set) {
+Status update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                                  TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
+                                  std::set<PathInData>* path_set) {
     std::map<std::string, TabletColumnPtr> typed_columns;
     for (const TabletColumnPtr& col :
          
common_schema->column_by_uid(variant_col_unique_id).get_sub_columns()) {
@@ -363,6 +418,10 @@ void update_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
     }
     // Types of subcolumns by path from all tuples.
     std::map<PathInData, DataTypes> subcolumns_types;
+
+    // Collect all paths first to enable batch checking
+    std::vector<PathInData> all_paths;
+
     for (const TabletSchemaSPtr& schema : schemas) {
         for (const TabletColumnPtr& col : schema->columns()) {
             // Get subcolumns of this variant
@@ -370,9 +429,14 @@ void update_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
                 col->parent_unique_id() == variant_col_unique_id) {
                 subcolumns_types[*col->path_info_ptr()].emplace_back(
                         DataTypeFactory::instance().create_data_type(*col, 
col->is_nullable()));
+                all_paths.push_back(*col->path_info_ptr());
             }
         }
     }
+
+    // Batch check for conflicts
+    RETURN_IF_ERROR(check_variant_has_no_ambiguous_paths(all_paths));
+
     for (const TabletSchemaSPtr& schema : schemas) {
         if (schema->field_index(variant_col_unique_id) == -1) {
             // maybe dropped
@@ -390,13 +454,13 @@ void update_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
             }
         }
     }
-    update_least_schema_internal(subcolumns_types, common_schema, false, 
variant_col_unique_id,
-                                 typed_columns, path_set);
+    return update_least_schema_internal(subcolumns_types, common_schema, false,
+                                        variant_col_unique_id, typed_columns, 
path_set);
 }
 
-void update_least_sparse_column(const std::vector<TabletSchemaSPtr>& schemas,
-                                TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
-                                const std::set<PathInData>& path_set) {
+Status update_least_sparse_column(const std::vector<TabletSchemaSPtr>& schemas,
+                                  TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
+                                  const std::set<PathInData>& path_set) {
     std::map<std::string, TabletColumnPtr> typed_columns;
     for (const TabletColumnPtr& col :
          
common_schema->column_by_uid(variant_col_unique_id).get_sub_columns()) {
@@ -420,8 +484,8 @@ void update_least_sparse_column(const 
std::vector<TabletSchemaSPtr>& schemas,
             }
         }
     }
-    update_least_schema_internal(subcolumns_types, common_schema, true, 
variant_col_unique_id,
-                                 typed_columns);
+    return update_least_schema_internal(subcolumns_types, common_schema, true,
+                                        variant_col_unique_id, typed_columns);
 }
 
 void inherit_column_attributes(const TabletColumn& source, TabletColumn& 
target,
@@ -485,7 +549,6 @@ Status get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
                                const TabletSchemaSPtr& base_schema, 
TabletSchemaSPtr& output_schema,
                                bool check_schema_size) {
     std::vector<int32_t> variant_column_unique_id;
-
     // Construct a schema excluding the extracted columns and gather unique 
identifiers for variants.
     // Ensure that the output schema also excludes these extracted columns. 
This approach prevents
     // duplicated paths following the update_least_common_schema process.
@@ -532,9 +595,9 @@ Status get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
         std::set<PathInData> path_set;
         // 1. cast extracted column to common type
         // path set is used to record the paths of those sparse columns that 
have been merged into the extracted columns, eg: v:b
-        update_least_common_schema(schemas, output_schema, unique_id, 
&path_set);
+        RETURN_IF_ERROR(update_least_common_schema(schemas, output_schema, 
unique_id, &path_set));
         // 2. cast sparse column to common type, exclude the columns from the 
path set
-        update_least_sparse_column(schemas, output_schema, unique_id, 
path_set);
+        RETURN_IF_ERROR(update_least_sparse_column(schemas, output_schema, 
unique_id, path_set));
     }
 
     inherit_column_attributes(output_schema);
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 5854ae01d38..fc5698bf966 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -104,6 +104,11 @@ Status parse_variant_columns(Block& block, const 
std::vector<int>& variant_pos,
                              const ParseConfig& config);
 // Status encode_variant_sparse_subcolumns(ColumnObject& column);
 
+// check if the tuple_paths has ambiguous paths
+// situation:
+// throw exception if there exists a prefix with matched names, but not 
matched structure (is Nested, number of dimensions).
+Status check_variant_has_no_ambiguous_paths(const std::vector<PathInData>& 
paths);
+
 // Pick the tablet schema with the highest schema version as the reference.
 // Then update all variant columns to there least common types.
 // Return the final merged schema as common schema.
@@ -114,13 +119,13 @@ Status get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
 
 // Get least common types for extracted columns which has Path info,
 // with a speicified variant column's unique id
-void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
-                                TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
-                                std::set<PathInData>* path_set);
+Status update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                                  TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
+                                  std::set<PathInData>* path_set);
 
-void update_least_sparse_column(const std::vector<TabletSchemaSPtr>& schemas,
-                                TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
-                                const std::set<PathInData>& path_set);
+Status update_least_sparse_column(const std::vector<TabletSchemaSPtr>& schemas,
+                                  TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id,
+                                  const std::set<PathInData>& path_set);
 
 // inherit attributes like index/agg info from it's parent column
 void inherit_column_attributes(TabletSchemaSPtr& schema);
@@ -194,11 +199,11 @@ void 
get_compaction_subcolumns(TabletSchema::PathsSetInfo& paths_set_info,
 
 size_t get_size_of_interger(TypeIndex type);
 
-void update_least_schema_internal(const std::map<PathInData, DataTypes>& 
subcolumns_types,
-                                  TabletSchemaSPtr& common_schema, bool 
update_sparse_column,
-                                  int32_t variant_col_unique_id,
-                                  const std::map<std::string, 
TabletColumnPtr>& typed_columns,
-                                  std::set<PathInData>* path_set = nullptr);
+Status update_least_schema_internal(const std::map<PathInData, DataTypes>& 
subcolumns_types,
+                                    TabletSchemaSPtr& common_schema, bool 
update_sparse_column,
+                                    int32_t variant_col_unique_id,
+                                    const std::map<std::string, 
TabletColumnPtr>& typed_columns,
+                                    std::set<PathInData>* path_set = nullptr);
 
 Status get_compaction_typed_columns(const TabletSchemaSPtr& target,
                                     const std::unordered_set<std::string>& 
typed_paths,
diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp
index a5163e3f0a5..229a0f12fbf 100644
--- a/be/src/vec/json/parse2column.cpp
+++ b/be/src/vec/json/parse2column.cpp
@@ -155,6 +155,16 @@ void parse_json_to_variant(IColumn& column, const char* 
src, size_t length,
     auto& [paths, values] = *result;
     assert(paths.size() == values.size());
     size_t old_num_rows = column_object.rows();
+    if (config.enable_flatten_nested) {
+        // here we should check the paths in variant and paths in result,
+        // if two paths which same prefix have different structure, we should 
throw an exception
+        std::vector<PathInData> check_paths;
+        for (const auto& entry : column_object.get_subcolumns()) {
+            check_paths.push_back(entry->path);
+        }
+        check_paths.insert(check_paths.end(), paths.begin(), paths.end());
+        
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
+    }
     for (size_t i = 0; i < paths.size(); ++i) {
         FieldInfo field_info;
         schema_util::get_field_info(values[i], &field_info);
diff --git a/be/test/cloud/test_schema_cloud_dictionary_cache.cpp 
b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
index 3d05eb67e45..0fc4fd0c3f5 100644
--- a/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
+++ b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "cloud/schema_cloud_dictionary_cache.cpp"
 #include "cloud/schema_cloud_dictionary_cache.h"
 #include "gen_cpp/olap_file.pb.h"
 #include "gtest/gtest.h"
+#include "vec/json/path_in_data.h"
 
 namespace doris {
 
@@ -175,4 +177,56 @@ TEST(SchemaCloudDictionaryCacheTest, 
ReplaceDictKeysToSchema_RefreshFailure) {
     EXPECT_FALSE(st.ok());
 }
 
+// Test case 5: replace_schema_to_dict_keys with 
tablet_schema.enable_variant_flatten_nested = true
+TEST(SchemaCloudDictionaryCacheTest, 
ProcessDictionary_VariantPathConflict_Throws) {
+    SchemaCloudDictionarySPtr dict = std::make_shared<SchemaCloudDictionary>();
+    // construct two variant columns with same unique_id but different 
path_info
+    auto& col_dict = *dict->mutable_column_dict();
+    ColumnPB* col1 = &(col_dict)[101];
+    col1->set_unique_id(101);
+    vectorized::PathInDataBuilder builder1;
+    builder1.append("v", false).append("nested", true).append("a", false);
+    vectorized::PathInData path_in_data1 = builder1.build();
+    segment_v2::ColumnPathInfo path_info1;
+    path_in_data1.to_protobuf(&path_info1, 0);
+    col1->mutable_column_path_info()->CopyFrom(path_info1);
+    {
+        RowsetMetaCloudPB rs_meta;
+        rs_meta.set_has_variant_type_in_schema(true);
+        auto* schema = rs_meta.mutable_tablet_schema();
+        schema->set_enable_variant_flatten_nested(true);
+        // add two columns with same key but different is_nested value
+        auto* col_schema1 = schema->add_column();
+        col_schema1->set_unique_id(101);
+        // create pathIndata with same key but different is_nested value
+        vectorized::PathInDataBuilder builder3;
+        builder3.append("v", false).append("nested", false).append("a", false);
+        vectorized::PathInData path_in_data3 = builder3.build();
+        segment_v2::ColumnPathInfo path_info3;
+        path_in_data3.to_protobuf(&path_info3, 0);
+        col_schema1->mutable_column_path_info()->CopyFrom(path_info3);
+        auto st = check_path_amibigus(*dict, &rs_meta);
+        EXPECT_FALSE(st.ok());
+        EXPECT_EQ(st.code(), TStatusCode::DATA_QUALITY_ERROR);
+    }
+
+    {
+        RowsetMetaCloudPB rs_meta;
+        rs_meta.set_has_variant_type_in_schema(true);
+        auto* schema = rs_meta.mutable_tablet_schema();
+        // add two columns with same key but same is_nested value
+        auto* col_schema3 = schema->add_column();
+        col_schema3->set_unique_id(101);
+        vectorized::PathInDataBuilder builder5;
+        builder5.append("v", false).append("nested", true).append("a", false);
+        vectorized::PathInData path_in_data5 = builder5.build();
+        segment_v2::ColumnPathInfo path_info5;
+        path_in_data5.to_protobuf(&path_info5, 0);
+        col_schema3->mutable_column_path_info()->CopyFrom(path_info5);
+        // assert no exception
+        auto st = check_path_amibigus(*dict, &rs_meta);
+        EXPECT_TRUE(st.ok()) << st.to_string();
+    }
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/test/vec/common/schema_util_test.cpp 
b/be/test/vec/common/schema_util_test.cpp
index b1ca2e5d18d..3fa9037c72e 100644
--- a/be/test/vec/common/schema_util_test.cpp
+++ b/be/test/vec/common/schema_util_test.cpp
@@ -26,14 +26,20 @@
 #include "testutil/variant_util.h"
 #include "vec/columns/column_nothing.h"
 #include "vec/columns/column_object.h"
+#include "vec/columns/column_string.h"
 #include "vec/common/schema_util.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_date_time.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_ipv4.h"
 #include "vec/data_types/data_type_nothing.h"
 #include "vec/data_types/data_type_object.h"
+#include "vec/data_types/data_type_string.h"
 #include "vec/data_types/data_type_time_v2.h"
+#include "vec/json/json_parser.h"
 
 using namespace doris::vectorized;
 
@@ -1051,7 +1057,9 @@ TEST_F(SchemaUtilTest, TestUpdateLeastSchemaInternal) {
     subcolumns_types[single_path] = {std::make_shared<DataTypeString>()};
 
     std::map<std::string, TabletColumnPtr> typed_columns;
-    schema_util::update_least_schema_internal(subcolumns_types, schema, false, 
1, typed_columns);
+    auto status = schema_util::update_least_schema_internal(subcolumns_types, 
schema, false, 1,
+                                                            typed_columns);
+    EXPECT_TRUE(status.ok());
 
     // Check results
     EXPECT_EQ(schema->num_columns(), 4); // base + 3 subcolumns
@@ -1103,7 +1111,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastCommonSchema) {
     result_schema->append_column(variant_col);
 
     std::set<PathInData> path_set;
-    schema_util::update_least_common_schema(schemas, result_schema, 1, 
&path_set);
+    auto status = schema_util::update_least_common_schema(schemas, 
result_schema, 1, &path_set);
+    EXPECT_TRUE(status.ok());
 
     // Check results
     EXPECT_EQ(result_schema->num_columns(), 2); // variant + subcolumn
@@ -1187,7 +1196,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastCommonSchema2) {
     // 1. schema->field_index(variant_col_unique_id) == -1 branch (via schema1)
     // 2. The for loop over sparse_columns() (via schema2)
     // 3. subcolumns_types.find(*col->path_info_ptr()) != 
subcolumns_types.end() branch
-    schema_util::update_least_common_schema(schemas, common_schema, 1, 
&path_set);
+    auto status = schema_util::update_least_common_schema(schemas, 
common_schema, 1, &path_set);
+    EXPECT_TRUE(status.ok());
 
     // Verify results
     const auto& result_variant = common_schema->column_by_uid(1);
@@ -1288,7 +1298,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastCommonSchema3) {
     // 1. schema->field_index(variant_col_unique_id) == -1 branch (via schema1)
     // 2. The for loop over sparse_columns() (via schema2)
     // 3. path_set.find(*col->path_info_ptr()) == path_set.end() branch (via 
sparse_col4)
-    schema_util::update_least_common_schema(schemas, common_schema, 1, 
&path_set);
+    auto status = schema_util::update_least_common_schema(schemas, 
common_schema, 1, &path_set);
+    EXPECT_TRUE(status.ok());
 
     // Verify that only sparse columns not in path_set are kept
     const auto& result_variant = common_schema->column_by_uid(1);
@@ -1347,7 +1358,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastSparseColumn) {
     std::set<PathInData> path_set;
     path_set.insert(PathInData("test_variant.other_path")); // This path 
should be excluded
 
-    schema_util::update_least_sparse_column(schemas, result_schema, 1, 
path_set);
+    auto status = schema_util::update_least_sparse_column(schemas, 
result_schema, 1, path_set);
+    EXPECT_TRUE(status.ok());
 
     // Check results : why 0?
     EXPECT_EQ(result_schema->column_by_uid(1).sparse_columns().size(), 0);
@@ -1382,7 +1394,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastSparseColumn2) {
     result_schema->append_column(variant2);
 
     // This should handle the case where sparse_columns is empty
-    schema_util::update_least_sparse_column(schemas, result_schema, 1, 
path_set);
+    auto status = schema_util::update_least_sparse_column(schemas, 
result_schema, 1, path_set);
+    EXPECT_TRUE(status.ok());
     EXPECT_EQ(result_schema->column_by_uid(1).sparse_columns().size(), 0);
 
     // Test case 3: schema has variant column with sparse columns but empty 
path_set
@@ -1396,7 +1409,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastSparseColumn2) {
     // dropped Variant Col
 
     std::set<PathInData> empty_path_set;
-    schema_util::update_least_sparse_column(schemas, result_schema, 1, 
empty_path_set);
+    status = schema_util::update_least_sparse_column(schemas, result_schema, 
1, empty_path_set);
+    EXPECT_TRUE(status.ok());
     EXPECT_EQ(result_schema->column_by_uid(1).sparse_columns().size(), 0);
 }
 
@@ -1471,7 +1485,8 @@ TEST_F(SchemaUtilTest, TestUpdateLeastSparseColumn3) {
     // 1. schema->field_index(variant_col_unique_id) == -1 branch (via schema1)
     // 2. The for loop over sparse_columns() (via schema2)
     // 3. path_set.find(*col->path_info_ptr()) == path_set.end() branch (via 
sparse_col4)
-    schema_util::update_least_sparse_column(schemas, common_schema, 1, 
path_set);
+    auto status = schema_util::update_least_sparse_column(schemas, 
common_schema, 1, path_set);
+    EXPECT_TRUE(status.ok());
 
     // Verify that only sparse columns not in path_set are kept
     const auto& result_variant = common_schema->column_by_uid(1);
@@ -1889,3 +1904,306 @@ TEST_F(SchemaUtilTest, 
get_compaction_subcolumns_advanced) {
         }
     }
 }
+
+// Test has_different_structure_in_same_path function indirectly through 
check_variant_has_no_ambiguous_paths
+TEST_F(SchemaUtilTest, has_different_structure_in_same_path_indirect) {
+    // Test case 1: Same structure and same length - should not detect 
ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", false).append("b", false).append("c", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("b", false).append("c", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 2: Different keys at same position - should not detect 
ambiguity (different keys)
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", false).append("b", false).append("c", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("d", false).append("c", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 3: Same keys but different nested structure - should detect 
ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", false).append("b", true);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("b", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_FALSE(status.ok());
+        EXPECT_TRUE(status.to_string().find("Ambiguous paths") != 
std::string::npos);
+    }
+
+    // Test case 4: Same keys but different anonymous array levels - should 
detect ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", true).append("b", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("b", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_FALSE(status.ok());
+        EXPECT_TRUE(status.to_string().find("Ambiguous paths") != 
std::string::npos);
+    }
+
+    // Test case 5: Same keys but different nested and anonymous levels - 
should detect ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", true).append("b", true);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("b", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_FALSE(status.ok());
+        EXPECT_TRUE(status.to_string().find("Ambiguous paths") != 
std::string::npos);
+    }
+
+    // Test case 6: Different lengths - should not detect ambiguity (new 
behavior: only check same length paths)
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", false).append("b", false).append("c", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("b", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 7: Different lengths with structure difference - should not 
detect ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", false).append("b", true).append("c", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a", false).append("b", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 8: Complex nested structure difference with same length - 
should detect ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("user", false).append("address", 
true).append("street", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("user", false).append("address", 
false).append("street", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_FALSE(status.ok());
+        EXPECT_TRUE(status.to_string().find("Ambiguous paths") != 
std::string::npos);
+    }
+
+    // Test case 9: Multiple paths with different lengths - should not detect 
ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("config", false).append("database", 
false).append("host", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("config", false).append("database", false);
+        paths.emplace_back(builder2.build());
+
+        vectorized::PathInDataBuilder builder3;
+        builder3.append("config", false);
+        paths.emplace_back(builder3.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 10: Empty paths - should not detect ambiguity
+    {
+        vectorized::PathsInData paths;
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 11: Single path - should not detect ambiguity
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("single", false).append("path", false);
+        paths.emplace_back(builder1.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 12: we have path like '{"a.b": "UPPER CASE", "a.c": "lower 
case", "a" : {"b" : 123}, "a" : {"c" : 456}}'
+    {
+        vectorized::PathsInData paths;
+        vectorized::PathInDataBuilder builder1;
+        builder1.append("a", false).append("b", false);
+        paths.emplace_back(builder1.build());
+
+        vectorized::PathInDataBuilder builder2;
+        builder2.append("a.b", false);
+        paths.emplace_back(builder2.build());
+
+        auto status = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(paths);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+}
+
+// Test check_path_conflicts_with_existing function indirectly through 
update_least_common_schema
+TEST_F(SchemaUtilTest, check_path_conflicts_with_existing) {
+    // Test case 1: No conflicts - should succeed
+    {
+        TabletSchemaPB schema_pb;
+        schema_pb.set_keys_type(KeysType::DUP_KEYS);
+
+        // Create a variant column
+        construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, 
"v1_index", 1,
+                         "VARIANT", "v1", IndexType::INVERTED);
+
+        TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+        tablet_schema->init_from_pb(schema_pb);
+        std::vector<TabletColumn> subcolumns;
+
+        // Add subcolumns with different paths
+        construct_subcolumn(tablet_schema, FieldType::OLAP_FIELD_TYPE_STRING, 
1, "v1.name",
+                            &subcolumns);
+        construct_subcolumn(tablet_schema, FieldType::OLAP_FIELD_TYPE_INT, 1, 
"v1.age",
+                            &subcolumns);
+
+        std::vector<TabletSchemaSPtr> schemas = {tablet_schema};
+        TabletSchemaSPtr output_schema;
+
+        auto status = 
vectorized::schema_util::get_least_common_schema(schemas, nullptr,
+                                                                       
output_schema, false);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 2: Conflicts with same path but different structure - should 
fail
+    {
+        TabletSchemaPB schema_pb;
+        schema_pb.set_keys_type(KeysType::DUP_KEYS);
+
+        // Create a variant column
+        construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, 
"v1_index", 1,
+                         "VARIANT", "v1", IndexType::INVERTED);
+
+        TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+        tablet_schema->init_from_pb(schema_pb);
+
+        // Add subcolumns with same path but different structure
+        // This would require creating paths with different nested structure
+        // For now, we'll test the basic functionality
+
+        std::vector<TabletSchemaSPtr> schemas = {tablet_schema};
+        TabletSchemaSPtr output_schema;
+
+        auto status = 
vectorized::schema_util::get_least_common_schema(schemas, nullptr,
+                                                                       
output_schema, false);
+        // This should succeed since we don't have conflicting paths in this 
simple case
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    // Test case 3: Multiple schemas with conflicting paths - should fail
+    {
+        // Create first schema
+        TabletSchemaPB schema_pb1;
+        schema_pb1.set_keys_type(KeysType::DUP_KEYS);
+        construct_column(schema_pb1.add_column(), schema_pb1.add_index(), 
10001, "v1_index", 1,
+                         "VARIANT", "v1", IndexType::INVERTED);
+
+        TabletSchemaSPtr tablet_schema1 = std::make_shared<TabletSchema>();
+        tablet_schema1->init_from_pb(schema_pb1);
+        std::vector<TabletColumn> subcolumns;
+        construct_subcolumn(tablet_schema1, FieldType::OLAP_FIELD_TYPE_STRING, 
1, "v1.address",
+                            &subcolumns);
+
+        // Create second schema with same path but different structure
+        TabletSchemaPB schema_pb2;
+        schema_pb2.set_keys_type(KeysType::DUP_KEYS);
+        construct_column(schema_pb2.add_column(), schema_pb2.add_index(), 
10001, "v1_index", 1,
+                         "VARIANT", "v1", IndexType::INVERTED);
+
+        TabletSchemaSPtr tablet_schema2 = std::make_shared<TabletSchema>();
+        tablet_schema2->init_from_pb(schema_pb2);
+        std::vector<TabletColumn> subcolumns2;
+        construct_subcolumn(tablet_schema2, FieldType::OLAP_FIELD_TYPE_INT, 1, 
"v1.address",
+                            &subcolumns2);
+
+        std::vector<TabletSchemaSPtr> schemas = {tablet_schema1, 
tablet_schema2};
+        TabletSchemaSPtr output_schema;
+
+        auto status = 
vectorized::schema_util::get_least_common_schema(schemas, nullptr,
+                                                                       
output_schema, false);
+        // This should succeed since the paths are the same and we're just 
checking for structure conflicts
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+}
+
+TEST_F(SchemaUtilTest, parse_variant_columns_ambiguous_paths) {
+    using namespace doris::vectorized;
+    // Prepare the string column with two rows
+    auto string_col = ColumnString::create();
+    auto field1 = vectorized::Field(String("{\"nested\": [{\"a\": 2.5, \"b\": 
\"123.1\"}]}"));
+    auto field2 = vectorized::Field(String("{\"nested\": {\"a\": 2.5, \"b\": 
\"123.1\"}}"));
+    string_col->insert(field1);
+    string_col->insert(field2);
+    auto string_type = std::make_shared<DataTypeString>();
+
+    // Prepare the variant column with the string column as root
+    vectorized::ColumnObject::Subcolumns dynamic_subcolumns;
+    dynamic_subcolumns.create_root(
+            vectorized::ColumnObject::Subcolumn(string_col->assume_mutable(), 
string_type, true));
+
+    auto variant_col = ColumnObject::create(0, std::move(dynamic_subcolumns));
+    auto variant_type = std::make_shared<DataTypeObject>();
+
+    // Construct the block
+    Block block;
+    block.insert(
+            vectorized::ColumnWithTypeAndName(variant_col->assume_mutable(), 
variant_type, "v"));
+
+    // The variant column is at index 0
+    std::vector<int> variant_pos = {0};
+    ParseConfig config;
+    config.enable_flatten_nested = true;
+
+    // Should throw due to ambiguous paths
+    Status st = schema_util::parse_variant_columns(block, variant_pos, config);
+    EXPECT_FALSE(st.ok());
+    EXPECT_TRUE(st.to_string().find("Ambiguous paths") != std::string::npos);
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 713e8a0f772..3d37142b8c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -2666,6 +2666,16 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         boolean variantEnableFlattenNested  = false;
         try {
             variantEnableFlattenNested = 
PropertyAnalyzer.analyzeVariantFlattenNested(properties);
+            // only if session variable: disable_variant_flatten_nested = 
false and
+            // table property: variant_enable_flatten_nested = true
+            // we can enable variant flatten nested otherwise throw error
+            if (ctx != null && 
!ctx.getSessionVariable().getDisableVariantFlattenNested()
+                    && variantEnableFlattenNested) {
+                
olapTable.setVariantEnableFlattenNested(variantEnableFlattenNested);
+            } else if (variantEnableFlattenNested) {
+                throw new DdlException("If you want to enable variant flatten 
nested, "
+                        + "please set session variable: 
disable_variant_flatten_nested = false");
+            }
         } catch (AnalysisException e) {
             throw new DdlException(e.getMessage());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fb4805ce9ab..39199a16496 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -693,6 +693,10 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String DISABLE_INVERTED_INDEX_V1_FOR_VARIANT = 
"disable_inverted_index_v1_for_variant";
 
+    // disable variant flatten nested as session variable, default is true,
+    // which means disable variant flatten nested when create table
+    public static final String DISABLE_VARIANT_FLATTEN_NESTED = 
"disable_variant_flatten_nested";
+
     // CLOUD_VARIABLES_BEGIN
     public static final String CLOUD_CLUSTER = "cloud_cluster";
     public static final String DISABLE_EMPTY_PARTITION_PRUNE = 
"disable_empty_partition_prune";
@@ -1307,6 +1311,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT, 
needForward = true)
     private boolean disableInvertedIndexV1ForVaraint = true;
 
+    @VariableMgr.VarAttr(name = DISABLE_VARIANT_FLATTEN_NESTED, needForward = 
true)
+    private boolean disableVariantFlattenNested = true;
+
     public int getBeNumberForTest() {
         return beNumberForTest;
     }
@@ -4926,6 +4933,14 @@ public class SessionVariable implements Serializable, 
Writable {
         return disableInvertedIndexV1ForVaraint;
     }
 
+    public void setDisableVariantFlattenNested(boolean 
disableVariantFlattenNested) {
+        this.disableVariantFlattenNested = disableVariantFlattenNested;
+    }
+
+    public boolean getDisableVariantFlattenNested() {
+        return disableVariantFlattenNested;
+    }
+
     public void checkSqlConvertorFeatures(String features) {
         if (Strings.isNullOrEmpty(features)) {
             return;
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index a86bd3c84ef..5bda66105f4 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -48,7 +48,7 @@ struct TTabletSchema {
     // col unique id for row store column
     20: optional list<i32> row_store_col_cids
     21: optional i64 row_store_page_size = 16384
-    22: optional bool variant_enable_flatten_nested = false 
+    22: optional bool variant_enable_flatten_nested = false
     23: optional i64 storage_page_size = 65536
     24: optional i64 storage_dict_page_size = 262144
 }
diff --git a/regression-test/data/variant_p0/nested/load.out 
b/regression-test/data/variant_p0/nested/load.out
new file mode 100644
index 00000000000..24965dce8cd
Binary files /dev/null and b/regression-test/data/variant_p0/nested/load.out 
differ
diff --git a/regression-test/data/variant_p0/nested/sql/q01.out 
b/regression-test/data/variant_p0/nested/sql/q01.out
new file mode 100644
index 00000000000..ea77db963fe
Binary files /dev/null and b/regression-test/data/variant_p0/nested/sql/q01.out 
differ
diff --git a/regression-test/suites/variant_p0/delete_update.groovy 
b/regression-test/suites/variant_p0/delete_update.groovy
index 89a98cc2a26..357e6614ab8 100644
--- a/regression-test/suites/variant_p0/delete_update.groovy
+++ b/regression-test/suites/variant_p0/delete_update.groovy
@@ -21,6 +21,7 @@ suite("regression_test_variant_delete_and_update", 
"variant_type"){
     // MOR
     def table_name = "var_delete_update"
     sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """ set disable_variant_flatten_nested = false """
     sql """
         CREATE TABLE IF NOT EXISTS ${table_name} (
             k bigint,
diff --git a/regression-test/suites/variant_p0/nested.groovy 
b/regression-test/suites/variant_p0/nested.groovy
index 818dd507a12..934e771b451 100644
--- a/regression-test/suites/variant_p0/nested.groovy
+++ b/regression-test/suites/variant_p0/nested.groovy
@@ -20,6 +20,7 @@ suite("regression_test_variant_nested", "p0"){
         def table_name = "var_nested"
         sql "DROP TABLE IF EXISTS ${table_name}"
         def disable_auto_compaction = Math.random() < 0.5 ? "true" : "false"
+        sql "set disable_variant_flatten_nested = false"
         sql """
                 CREATE TABLE IF NOT EXISTS ${table_name} (
                     k bigint,
diff --git a/regression-test/suites/variant_p0/nested/load.groovy 
b/regression-test/suites/variant_p0/nested/load.groovy
new file mode 100644
index 00000000000..36d1fdbe836
--- /dev/null
+++ b/regression-test/suites/variant_p0/nested/load.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// this test is used to test the load of nested array
+suite("variant_nested_type_load", "p0"){
+
+    try {
+
+        // create a table with conflict variant which insert same nested 
subcolumn and scalar subcolumn data 
+        def table_name = "var_nested_load_conflict"
+        sql "DROP TABLE IF EXISTS ${table_name}"
+        sql """set describe_extend_variant_column = true"""
+
+        // set disable_variant_flatten_nested = true to disable variant 
flatten nested which is default behavior
+        sql """ set disable_variant_flatten_nested = true """
+        test {
+            sql """
+                    CREATE TABLE IF NOT EXISTS ${table_name} (
+                        k bigint,
+                        v variant
+                    )
+                    DUPLICATE KEY(`k`)
+                    DISTRIBUTED BY HASH(k) BUCKETS 1 -- 1 bucket make really 
compaction in conflict case
+                    properties("replication_num" = "1", 
"disable_auto_compaction" = "false", "variant_enable_flatten_nested" = "true");
+                """
+            exception "If you want to enable variant flatten nested, please 
set session variable"
+        }
+        
+
+        // set disable_variant_flatten_nested = false to enable variant 
flatten nested
+        sql """ set disable_variant_flatten_nested = false """
+        sql """
+                    CREATE TABLE IF NOT EXISTS ${table_name} (
+                        k bigint,
+                        v variant
+                    )
+                    DUPLICATE KEY(`k`)
+                    DISTRIBUTED BY HASH(k) BUCKETS 1 -- 1 bucket make really 
compaction in conflict case
+                    properties("replication_num" = "1", 
"disable_auto_compaction" = "false", "variant_enable_flatten_nested" = "true");
+                """
+        sql """ insert into ${table_name} values (1, '{"nested": [{"a": 1, 
"c": 1.1}, {"b": "1"}]}'); """ 
+        
+        def desc_table = { tn ->
+            sql """ set describe_extend_variant_column = true """
+            sql """ select * from ${tn} order by k """
+            qt_sql_desc """ desc ${tn} """
+        }
+
+       def sql_select_batch = { tn ->
+            qt_sql_0 """select * from ${tn} order by k"""
+
+            qt_sql_1 """select v['nested']['a'] from ${tn} order by k"""
+            qt_sql_2 """select v['nested']['b'] from ${tn} order by k"""
+            qt_sql_3 """select v['nested']['c'] from ${tn} order by k"""
+
+            qt_sql_4 """select v['nested'] from ${tn} order by k"""
+        }
+
+        def sql_test_cast_to_array = { tn ->
+            // test cast to array<int> 
+            qt_sql_8 """select cast(v['nested']['a'] as array<int>), 
size(cast(v['nested']['a'] as array<int>)) from ${tn} order by k"""
+            qt_sql_9 """select cast(v['nested']['b'] as array<int>), 
size(cast(v['nested']['b'] as array<int>)) from ${tn} order by k"""
+            qt_sql_10 """select cast(v['nested']['c'] as array<int>), 
size(cast(v['nested']['c'] as array<int>)) from ${tn} order by k"""
+
+            // test cast to array<string> 
+            qt_sql_11 """select cast(v['nested']['a'] as array<string>), 
size(cast(v['nested']['a'] as array<string>)) from ${tn} order by k"""
+            qt_sql_12 """select cast(v['nested']['b'] as array<string>), 
size(cast(v['nested']['b'] as array<string>)) from ${tn} order by k"""
+            qt_sql_13 """select cast(v['nested']['c'] as array<string>), 
size(cast(v['nested']['c'] as array<string>)) from ${tn} order by k"""
+
+            // test cast to array<double> 
+            qt_sql_14 """select cast(v['nested']['a'] as array<double>), 
size(cast(v['nested']['a'] as array<double>)) from ${tn} order by k"""
+            qt_sql_15 """select cast(v['nested']['b'] as array<double>), 
size(cast(v['nested']['b'] as array<double>)) from ${tn} order by k"""
+            qt_sql_16 """select cast(v['nested']['c'] as array<double>), 
size(cast(v['nested']['c'] as array<double>)) from ${tn} order by k"""
+
+        }
+
+        def sql_test_cast_to_scalar = { tn ->
+            qt_sql_17 """select cast(v['nested']['a'] as int), 
cast(v['nested']['b'] as int), cast(v['nested']['c'] as int) from ${tn} order 
by k"""
+            qt_sql_18 """select cast(v['nested']['a'] as string), 
cast(v['nested']['b'] as string), cast(v['nested']['c'] as string) from ${tn} 
order by k"""
+            qt_sql_19 """select cast(v['nested']['a'] as double), 
cast(v['nested']['b'] as double), cast(v['nested']['c'] as double) from ${tn} 
order by k"""
+        }
+
+        /// insert a array of object for a, b, c 
+        // insert structure conflict in one row
+        //  a , b, c is Nested array,
+        def table_name_1 = "var_nested_load_no_conflict"
+        sql "DROP TABLE IF EXISTS ${table_name_1}"
+        sql """set describe_extend_variant_column = true"""
+        sql """
+                CREATE TABLE IF NOT EXISTS ${table_name_1} (
+                    k bigint,
+                    v variant<properties("variant_max_subcolumns_count" = "0")>
+                )
+                DUPLICATE KEY(`k`)
+                DISTRIBUTED BY HASH(k) BUCKETS 1 -- 1 bucket make really 
compaction in conflict case
+                properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "variant_enable_flatten_nested" = "true");
+            """
+        // insert a array of object for a, b, c first then insert structure 
conflict in one row
+        // insert structure conflict in one row
+        //  a , b, c is Nested array,
+        sql """
+            insert into ${table_name_1} values (1, '{"nested": [{"a": 1, "c": 
1.1}, {"b": "1"}]}'); 
+            """
+        sql_select_batch(table_name_1)
+        sql_test_cast_to_array(table_name_1)
+        sql_test_cast_to_scalar(table_name_1)
+        // insert structure conflict in one row
+        test {
+            sql """
+                insert into ${table_name_1} values (2, '{"nested": {"a": 2.5, 
"b": "123.1"}}');
+                """
+            exception "Ambiguous paths"
+        }
+        // insert more different combination data for a, b, c
+        sql """
+            insert into ${table_name_1} values (3, '{"nested": [{"a": 2.5, 
"b": "123.1"}]}');
+            """
+        sql """
+            insert into ${table_name_1} values (4, '{"nested": [{"a": 2.5, 
"b": 123.1}]}');
+            """
+        sql """
+            insert into ${table_name_1} values (5, '{"nested": [{"a": 2.5, 
"c": "123.1"}, {"b": "123.1"}]}');
+            """
+        sql """
+            insert into ${table_name_1} values (6, '{"nested": [{"a": 2.5}, 
{"b": 123.1}]}');
+            """
+        sql """
+            insert into ${table_name_1} values (7, '{"nested": [{"a": 2.5}, 
{"c": 123.1}, {"b": "123.1"}]}');
+            """
+        sql_select_batch(table_name_1)
+        sql_test_cast_to_array(table_name_1)
+        sql_test_cast_to_scalar(table_name_1)
+        // trigger and wait compaction
+        trigger_and_wait_compaction("${table_name_1}", "full")
+        sql_select_batch(table_name_1)
+        sql_test_cast_to_array(table_name_1)    
+        sql_test_cast_to_scalar(table_name_1)
+
+        // drop table
+        sql """ drop table ${table_name_1} """
+        sql """ create table ${table_name_1} (k bigint, v 
variant<properties("variant_max_subcolumns_count" = "0")>) duplicate key(k) 
distributed by hash(k) buckets 1 properties("replication_num" = "1", 
"disable_auto_compaction" = "false", "variant_enable_flatten_nested" = "true") 
"""
+        // insert scalar data first then insert structure conflict in one row
+        sql """
+            insert into ${table_name_1} values (1, '{"nested": {"a": 2.5, "b": 
"123.1"}}');
+            """
+        sql_select_batch(table_name_1)
+        sql_test_cast_to_array(table_name_1)
+        sql_test_cast_to_scalar(table_name_1)
+        // insert structure conflict in one row:  a array of object for a, b, c
+        test {
+            sql """
+                insert into ${table_name_1} values (2, '{"nested": [{"a": 2.5, 
"b": "123.1"}]}');
+                """
+            exception "Ambiguous paths"
+        }
+        // insert more different combination data for a, b, c in scalar
+        sql """
+            insert into ${table_name_1} values (3, '{"nested": {"a": 2.5, "b": 
123.1}}');
+            """
+        sql """
+            insert into ${table_name_1} values (4, '{"nested": {"a": 2.5, "c": 
"123.1"}}');
+            """
+        sql """
+            insert into ${table_name_1} values (5, '{"nested": {"a": 2.5, "c": 
123.1}}');
+            """
+        sql """
+            insert into ${table_name_1} values (6, '{"nested": {"a": 2.5, "c": 
"123.1"}}');
+            """
+        sql """
+            insert into ${table_name_1} values (7, '{"nested": {"a": 2.5, "b": 
"123.1", "c": 123.1}}');
+            """
+        sql_select_batch(table_name_1)
+        sql_test_cast_to_array(table_name_1)
+        sql_test_cast_to_scalar(table_name_1)
+        // trigger and wait compaction
+        trigger_and_wait_compaction("${table_name_1}", "full")
+        sql_select_batch(table_name_1)
+        sql_test_cast_to_array(table_name_1)    
+        sql_test_cast_to_scalar(table_name_1)
+
+    } finally {
+    }
+
+}
diff --git a/regression-test/suites/variant_p0/nested/sql/q01.sql 
b/regression-test/suites/variant_p0/nested/sql/q01.sql
new file mode 100644
index 00000000000..71ee81428ed
--- /dev/null
+++ b/regression-test/suites/variant_p0/nested/sql/q01.sql
@@ -0,0 +1,13 @@
+-- TABLES: var_nested_load_conflict
+select v['nested']['a'] from var_nested_load_conflict order by k;
+select v['nested']['b'] from var_nested_load_conflict order by k;
+select v['nested']['c'] from var_nested_load_conflict order by k;
+select v['nested'] from var_nested_load_conflict order by k;
+
+select cast(v['nested']['a'] as array<int>), size(cast(v['nested']['a'] as 
array<int>)) from var_nested_load_conflict order by k;
+select cast(v['nested']['b'] as array<int>), size(cast(v['nested']['b'] as 
array<int>)) from var_nested_load_conflict order by k;
+select cast(v['nested']['c'] as array<int>), size(cast(v['nested']['c'] as 
array<int>)) from var_nested_load_conflict order by k;
+
+select cast(v['nested']['a'] as array<string>), size(cast(v['nested']['a'] as 
array<string>)) from var_nested_load_conflict order by k;
+select cast(v['nested']['b'] as array<string>), size(cast(v['nested']['b'] as 
array<string>)) from var_nested_load_conflict order by k;
+select cast(v['nested']['c'] as array<string>), size(cast(v['nested']['c'] as 
array<string>)) from var_nested_load_conflict order by k; 
\ No newline at end of file
diff --git a/regression-test/suites/variant_p0/nested2.groovy 
b/regression-test/suites/variant_p0/nested2.groovy
index 73180128bfb..b8f2d7de2a2 100644
--- a/regression-test/suites/variant_p0/nested2.groovy
+++ b/regression-test/suites/variant_p0/nested2.groovy
@@ -24,6 +24,7 @@ suite("variant_nested_type_conflict", "p0"){
         sql "DROP TABLE IF EXISTS ${table_name}"
         sql """set describe_extend_variant_column = true"""
 
+        sql """ set disable_variant_flatten_nested = false """
         sql """
                 CREATE TABLE IF NOT EXISTS ${table_name} (
                     k bigint,
@@ -67,6 +68,14 @@ suite("variant_nested_type_conflict", "p0"){
                 """
             exception "Nesting of array in Nested array within variant 
subcolumns is currently not supported."
         }
+
+        // insert batch different structure in same path
+        test {
+            sql """
+                insert into ${table_name} values (3, '{"nested": [{"a": 2.5, 
"b": "123.1"}]}'),  (4, '{"nested": {"a": 2.5, "b": "123.1"}}');
+                """
+            exception "Ambiguous paths"
+        }
         /// insert a array of object for a, b, c 
         // insert type conflict in multiple rows
         sql """
@@ -117,7 +126,7 @@ suite("variant_nested_type_conflict", "p0"){
             select * from ${table_name} order by k limit 1;
             """
         qt_sql_desc_4 """
-               select variant_type(v) from ${table_name} order by k 
+                       select variant_type(v) from ${table_name} order by k 
             """
         // now select for a, b, c
         sql_select_batch()
diff --git a/regression-test/suites/variant_p0/predefine/load.groovy 
b/regression-test/suites/variant_p0/predefine/load.groovy
index b645daa4b51..5b4cc703305 100644
--- a/regression-test/suites/variant_p0/predefine/load.groovy
+++ b/regression-test/suites/variant_p0/predefine/load.groovy
@@ -55,6 +55,7 @@ suite("regression_test_variant_predefine_schema", "p0"){
     qt_sql """select * from test_predefine  where  v1['dt'] is not null order 
by id limit 10;"""
 
     sql """DROP TABLE IF EXISTS test_predefine1"""
+    sql """ set disable_variant_flatten_nested = false """
     sql """
         CREATE TABLE `test_predefine1` (
             `id` bigint NOT NULL,
diff --git 
a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
 
b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
index 5c9d85fb8ed..a8b78bdd258 100644
--- 
a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
+++ 
b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
@@ -57,6 +57,7 @@ suite("double_write_schema_change_with_variant", 
"nonConcurrent") {
 
     def table_name = "github_events"
     sql """DROP TABLE IF EXISTS ${table_name}"""
+    sql "set disable_variant_flatten_nested = false"
     sql """
         CREATE TABLE IF NOT EXISTS ${table_name} (
             k bigint,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to