This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 48935c14e2c [Improvement](variant) limit the column size on tablet 
schema (#27399) (#27785)
48935c14e2c is described below

commit 48935c14e2c835a20aba61f4eac02453aa7d6c9d
Author: lihangyu <15605149...@163.com>
AuthorDate: Mon Dec 4 14:47:36 2023 +0800

    [Improvement](variant) limit the column size on tablet schema (#27399) 
(#27785)
    
    1. limit the column count to default 2048
    2. fix get_inverted_index return nullptr when variant's unique id is -1, 
using it's parent unique id instead
    3. avoid add same path subcolumn duplicately in tablet schema
    4. make extracted column unique id -1
---
 be/src/common/config.cpp                           |  2 +
 be/src/common/config.h                             |  3 ++
 be/src/olap/base_tablet.cpp                        | 11 ++--
 be/src/olap/base_tablet.h                          |  2 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |  5 +-
 be/src/olap/rowset/segment_creator.cpp             |  7 +--
 be/src/olap/rowset/segment_v2/segment.cpp          |  5 +-
 be/src/olap/rowset_builder.cpp                     |  2 +-
 be/src/olap/tablet.cpp                             |  3 +-
 be/src/olap/tablet_schema.cpp                      |  3 +-
 be/src/vec/common/schema_util.cpp                  | 56 +++++++++++---------
 be/src/vec/common/schema_util.h                    |  5 +-
 .../suites/variant_p0/column_size_limit.groovy     | 59 ++++++++++++++++++++++
 regression-test/suites/variant_p0/load.groovy      | 10 ++--
 14 files changed, 128 insertions(+), 45 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 293ba3c0282..8efd18248e7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1128,6 +1128,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");
 
 DEFINE_Bool(enable_snapshot_action, "false");
 
+DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ddc47c678c6..1b9fd62a3ad 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1212,6 +1212,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
 // whether to enable /api/snapshot api
 DECLARE_Bool(enable_snapshot_action);
 
+// The max columns size for a tablet schema
+DECLARE_mInt32(variant_max_merged_tablet_schema_size);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index db17e68706a..011a901ba5b 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -19,6 +19,7 @@
 
 #include <fmt/format.h>
 
+#include "olap/tablet_fwd.h"
 #include "olap/tablet_schema_cache.h"
 #include "util/doris_metrics.h"
 #include "vec/common/schema_util.h"
@@ -66,13 +67,17 @@ void BaseTablet::update_max_version_schema(const 
TabletSchemaSPtr& tablet_schema
     }
 }
 
-void BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& 
update_schema) {
+Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& 
update_schema) {
     std::lock_guard wrlock(_meta_lock);
     CHECK(_max_version_schema->schema_version() >= 
update_schema->schema_version());
-    auto final_schema = vectorized::schema_util::get_least_common_schema(
-            {_max_version_schema, update_schema}, _max_version_schema);
+    TabletSchemaSPtr final_schema;
+    bool check_column_size = true;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_max_version_schema, update_schema}, _max_version_schema, 
final_schema,
+            check_column_size));
     _max_version_schema = final_schema;
     VLOG_DEBUG << "dump updated tablet schema: " << 
final_schema->dump_structure();
+    return Status::OK();
 }
 
 } /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index b3fc9f8b9b7..2fa494b420a 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -65,7 +65,7 @@ public:
 
     void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
 
-    void update_by_least_common_schema(const TabletSchemaSPtr& update_schema);
+    Status update_by_least_common_schema(const TabletSchemaSPtr& 
update_schema);
 
     TabletSchemaSPtr tablet_schema() const {
         std::shared_lock rlock(_meta_lock);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index bcac89ad9a0..3b2852a283f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -561,8 +561,9 @@ bool BetaRowsetWriter::_is_segment_overlapping(
 //  => update_schema:       A(bigint), B(double), C(int), D(int)
 void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
     std::lock_guard<std::mutex> lock(*(_context.schema_lock));
-    TabletSchemaSPtr update_schema = 
vectorized::schema_util::get_least_common_schema(
-            {_context.tablet_schema, flush_schema}, nullptr);
+    TabletSchemaSPtr update_schema;
+    static_cast<void>(vectorized::schema_util::get_least_common_schema(
+            {_context.tablet_schema, flush_schema}, nullptr, update_schema));
     CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
             << "Rowset merge schema columns count is " << 
update_schema->num_columns()
             << ", but flush_schema is larger " << flush_schema->num_columns()
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 7e904478b1d..af7cec3e80e 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -136,7 +136,7 @@ Status 
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
                                  .build();
         TabletColumn tablet_column = 
vectorized::schema_util::get_column_by_type(
                 final_data_type_from_object, column_name,
-                vectorized::schema_util::ExtraInfo {.unique_id = 
parent_variant.unique_id(),
+                vectorized::schema_util::ExtraInfo {.unique_id = -1,
                                                     .parent_unique_id = 
parent_variant.unique_id(),
                                                     .path_info = full_path});
         flush_schema->append_column(std::move(tablet_column));
@@ -194,8 +194,9 @@ Status 
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
         // ctx.tablet_schema:  A(bigint), B(double)
         // => update_schema:   A(bigint), B(double), C(int), D(int)
         std::lock_guard<std::mutex> lock(*(_context->schema_lock));
-        TabletSchemaSPtr update_schema = 
vectorized::schema_util::get_least_common_schema(
-                {_context->tablet_schema, flush_schema}, nullptr);
+        TabletSchemaSPtr update_schema;
+        static_cast<void>(vectorized::schema_util::get_least_common_schema(
+                {_context->tablet_schema, flush_schema}, nullptr, 
update_schema));
         CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
                 << "Rowset merge schema columns count is " << 
update_schema->num_columns()
                 << ", but flush_schema is larger " << 
flush_schema->num_columns()
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index c7bda4d3adf..d3a4f3db135 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -355,13 +355,14 @@ Status Segment::_create_column_readers(const 
SegmentFooterPB& footer) {
             column_path_to_footer_ordinal;
     for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
         auto& column_pb = footer.columns(ordinal);
+        // column path for accessing subcolumns of variant
         if (column_pb.has_column_path_info()) {
-            // column path
             vectorized::PathInData path;
             path.from_protobuf(column_pb.column_path_info());
             column_path_to_footer_ordinal.emplace(path, ordinal);
         }
-        if (column_pb.has_unique_id()) {
+        // unique_id is unsigned, -1 meaning no unique id(e.g. an extracted 
column from variant)
+        if (static_cast<int>(column_pb.unique_id()) >= 0) {
             // unique id
             column_id_to_footer_ordinal.emplace(column_pb.unique_id(), 
ordinal);
         }
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 395d37f7150..21fbed78022 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -309,7 +309,7 @@ Status RowsetBuilder::commit_txn() {
         // _tabelt->tablet_schema:  A(bigint), B(double)
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
         const RowsetWriterContext& rw_ctx = _rowset_writer->context();
-        _tablet->update_by_least_common_schema(rw_ctx.tablet_schema);
+        
RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
     // Transfer ownership of `PendingRowsetGuard` to `TxnManager`
     Status res = storage_engine->txn_manager()->commit_txn(_req.partition_id, 
*tablet, _req.txn_id,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 21d390650ab..0d056d2c235 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -641,7 +641,8 @@ TabletSchemaSPtr 
Tablet::tablet_schema_with_merged_max_schema_version(
         std::vector<TabletSchemaSPtr> schemas;
         std::transform(rowset_metas.begin(), rowset_metas.end(), 
std::back_inserter(schemas),
                        [](const RowsetMetaSharedPtr& rs_meta) { return 
rs_meta->tablet_schema(); });
-        target_schema = 
vectorized::schema_util::get_least_common_schema(schemas, nullptr);
+        static_cast<void>(
+                vectorized::schema_util::get_least_common_schema(schemas, 
nullptr, target_schema));
         VLOG_DEBUG << "dump schema: " << target_schema->dump_structure();
     }
     return target_schema;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 85203b0b12b..9346c3573ac 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -1186,7 +1186,8 @@ const TabletIndex* 
TabletSchema::get_inverted_index(int32_t col_unique_id,
 
 const TabletIndex* TabletSchema::get_inverted_index(const TabletColumn& col) 
const {
     // TODO use more efficient impl
-    int32_t col_unique_id = col.unique_id();
+    // Use parent id if unique not assigned, this could happend when accessing 
subcolumns of variants
+    int32_t col_unique_id = col.unique_id() < 0 ? col.parent_unique_id() : 
col.unique_id();
     const std::string& suffix_path =
             !col.path_info().empty() ? 
escape_for_path_name(col.path_info().get_path()) : "";
     return get_inverted_index(col_unique_id, suffix_path);
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index 2c678f7051e..d0fbb287a14 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -310,10 +310,11 @@ void update_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
         TabletColumn common_column;
         // const std::string& column_name = variant_col_name + "." + 
tuple_paths[i].get_path();
         get_column_by_type(tuple_types[i], tuple_paths[i].get_path(), 
common_column,
-                           ExtraInfo {.unique_id = variant_col_unique_id,
+                           ExtraInfo {.unique_id = -1,
                                       .parent_unique_id = 
variant_col_unique_id,
                                       .path_info = tuple_paths[i]});
-        common_schema->append_column(common_column);
+        // set ColumnType::VARIANT to occupy _field_path_to_index
+        common_schema->append_column(common_column, 
TabletSchema::ColumnType::VARIANT);
     }
 }
 
@@ -350,23 +351,21 @@ void inherit_tablet_index(TabletSchemaSPtr& schema) {
     }
 }
 
-TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>& 
schemas,
-                                         const TabletSchemaSPtr& base_schema) {
-    auto output_schema = std::make_shared<TabletSchema>();
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                               const TabletSchemaSPtr& base_schema, 
TabletSchemaSPtr& output_schema,
+                               bool check_schema_size) {
     std::vector<int32_t> variant_column_unique_id;
-    if (base_schema == nullptr) {
-        // Pick tablet schema with max schema version
-        auto max_version_schema =
-                *std::max_element(schemas.cbegin(), schemas.cend(),
-                                  [](const TabletSchemaSPtr a, const 
TabletSchemaSPtr b) {
-                                      return a->schema_version() < 
b->schema_version();
-                                  });
-        CHECK(max_version_schema);
-        output_schema->copy_from(*max_version_schema);
+
+    // Construct a schema excluding the extracted columns and gather unique 
identifiers for variants.
+    // Ensure that the output schema also excludes these extracted columns. 
This approach prevents
+    // duplicated paths following the update_least_common_schema process.
+    auto build_schema_without_extracted_columns = [&](const TabletSchemaSPtr& 
base_schema) {
+        output_schema = std::make_shared<TabletSchema>();
+        output_schema->copy_from(*base_schema);
         // Merge columns from other schemas
         output_schema->clear_columns();
         // Get all columns without extracted columns and collect variant col 
unique id
-        for (const TabletColumn& col : max_version_schema->columns()) {
+        for (const TabletColumn& col : base_schema->columns()) {
             if (col.is_variant_type()) {
                 variant_column_unique_id.push_back(col.unique_id());
             }
@@ -374,15 +373,19 @@ TabletSchemaSPtr get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& sc
                 output_schema->append_column(col);
             }
         }
+    };
+    if (base_schema == nullptr) {
+        // Pick tablet schema with max schema version
+        auto max_version_schema =
+                *std::max_element(schemas.cbegin(), schemas.cend(),
+                                  [](const TabletSchemaSPtr a, const 
TabletSchemaSPtr b) {
+                                      return a->schema_version() < 
b->schema_version();
+                                  });
+        CHECK(max_version_schema);
+        build_schema_without_extracted_columns(max_version_schema);
     } else {
-        // use input common schema as base schema
-        // Get all columns without extracted columns and collect variant col 
unique id
-        for (const TabletColumn& col : base_schema->columns()) {
-            if (col.is_variant_type()) {
-                variant_column_unique_id.push_back(col.unique_id());
-            }
-        }
-        output_schema->copy_from(*base_schema);
+        // use input base_schema schema as base schema
+        build_schema_without_extracted_columns(base_schema);
     }
 
     for (int32_t unique_id : variant_column_unique_id) {
@@ -390,7 +393,12 @@ TabletSchemaSPtr get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& sc
     }
 
     inherit_tablet_index(output_schema);
-    return output_schema;
+    if (check_schema_size &&
+        output_schema->columns().size() > 
config::variant_max_merged_tablet_schema_size) {
+        return Status::DataQualityError("Reached max column size limit {}",
+                                        
config::variant_max_merged_tablet_schema_size);
+    }
+    return Status::OK();
 }
 
 Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos) {
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index d5d01b57ed9..de5778157df 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -96,8 +96,9 @@ void encode_variant_sparse_subcolumns(Block& block, const 
std::vector<int>& vari
 // Then update all variant columns to there least common types.
 // Return the final merged schema as common schema.
 // If base_schema == nullptr then, max schema version tablet schema will be 
picked as base schema
-TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>& 
schemas,
-                                         const TabletSchemaSPtr& base_schema);
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                               const TabletSchemaSPtr& base_schema, 
TabletSchemaSPtr& result,
+                               bool check_schema_size = false);
 
 // Get least common types for extracted columns which has Path info,
 // with a speicified variant column's unique id
diff --git a/regression-test/suites/variant_p0/column_size_limit.groovy 
b/regression-test/suites/variant_p0/column_size_limit.groovy
new file mode 100644
index 00000000000..70567d89c07
--- /dev/null
+++ b/regression-test/suites/variant_p0/column_size_limit.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonBuilder
+
+suite("regression_test_variant_column_limit", "nonConcurrent"){
+    def set_be_config = { key, value ->
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+        logger.info("update config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    }
+    def table_name = "var_column_limit"
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (
+            k bigint,
+            v variant
+        )
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(k) BUCKETS 1
+        properties("replication_num" = "1", "disable_auto_compaction" = 
"false");
+    """
+    try {
+        def jsonBuilder = new JsonBuilder()
+        def root = jsonBuilder {
+            // Generate 2049 fields
+            (1..2049).each { fieldNumber ->
+                "field$fieldNumber" fieldNumber
+            }
+        }
+
+        String jsonString = jsonBuilder.toPrettyString()
+        sql """insert into ${table_name} values (1, '$jsonString')"""
+    } catch(Exception ex) {
+        logger.info("""INSERT INTO ${table_name} failed: """ + ex)
+        assertTrue(ex.toString().contains("Reached max column"));
+    } finally {
+    }
+    sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" : 
3}')"""
+
+}
\ No newline at end of file
diff --git a/regression-test/suites/variant_p0/load.groovy 
b/regression-test/suites/variant_p0/load.groovy
index 2310e7605fc..57737397b49 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -75,7 +75,7 @@ suite("regression_test_variant", "variant_type"){
     }
 
     try {
-
+        set_be_config.call("variant_ratio_of_defaults_as_sparse_column", 
"0.95")
         def key_types = ["DUPLICATE", "UNIQUE"]
         for (int i = 0; i < key_types.size(); i++) {
             def table_name = "simple_variant_${key_types[i]}"
@@ -311,18 +311,17 @@ suite("regression_test_variant", "variant_type"){
         qt_sql_35_1 """select v:json.parseFailed from  logdata where 
cast(v:json.parseFailed as string) is not null and k = 162 limit 1;"""
 
         // TODO add test case that some certain columns are materialized in 
some file while others are not materilized(sparse)
-         // unique table
+        // unique table
         set_be_config.call("variant_ratio_of_defaults_as_sparse_column", 
"0.95")
-        table_name = "github_events_unique"
-        sql """DROP TABLE IF EXISTS ${table_name}"""
         table_name = "github_events"
+        sql """DROP TABLE IF EXISTS ${table_name}"""
         sql """
             CREATE TABLE IF NOT EXISTS ${table_name} (
                 k bigint,
                 v variant
             )
             UNIQUE KEY(`k`)
-            DISTRIBUTED BY HASH(k) BUCKETS 4 
+            DISTRIBUTED BY HASH(k) BUCKETS 4
             properties("replication_num" = "1", "disable_auto_compaction" = 
"true");
         """
         load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
@@ -399,5 +398,6 @@ suite("regression_test_variant", "variant_type"){
     } finally {
         // reset flags
         set_be_config.call("variant_ratio_of_defaults_as_sparse_column", 
"0.95")
+        set_be_config.call("variant_max_merged_tablet_schema_size", "2048")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to