This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 48935c14e2c [Improvement](variant) limit the column size on tablet schema (#27399) (#27785) 48935c14e2c is described below commit 48935c14e2c835a20aba61f4eac02453aa7d6c9d Author: lihangyu <15605149...@163.com> AuthorDate: Mon Dec 4 14:47:36 2023 +0800 [Improvement](variant) limit the column size on tablet schema (#27399) (#27785) 1. limit the column count to default 2048 2. fix get_inverted_index return nullptr when variant's unique id is -1, using it's parent unique id instead 3. avoid add same path subcolumn duplicately in tablet schema 4. make extracted column unique id -1 --- be/src/common/config.cpp | 2 + be/src/common/config.h | 3 ++ be/src/olap/base_tablet.cpp | 11 ++-- be/src/olap/base_tablet.h | 2 +- be/src/olap/rowset/beta_rowset_writer.cpp | 5 +- be/src/olap/rowset/segment_creator.cpp | 7 +-- be/src/olap/rowset/segment_v2/segment.cpp | 5 +- be/src/olap/rowset_builder.cpp | 2 +- be/src/olap/tablet.cpp | 3 +- be/src/olap/tablet_schema.cpp | 3 +- be/src/vec/common/schema_util.cpp | 56 +++++++++++--------- be/src/vec/common/schema_util.h | 5 +- .../suites/variant_p0/column_size_limit.groovy | 59 ++++++++++++++++++++++ regression-test/suites/variant_p0/load.groovy | 10 ++-- 14 files changed, 128 insertions(+), 45 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 293ba3c0282..8efd18248e7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1128,6 +1128,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000"); DEFINE_Bool(enable_snapshot_action, "false"); +DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index ddc47c678c6..1b9fd62a3ad 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1212,6 +1212,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms); // whether to enable /api/snapshot api DECLARE_Bool(enable_snapshot_action); +// The max columns size for a tablet schema +DECLARE_mInt32(variant_max_merged_tablet_schema_size); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index db17e68706a..011a901ba5b 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -19,6 +19,7 @@ #include <fmt/format.h> +#include "olap/tablet_fwd.h" #include "olap/tablet_schema_cache.h" #include "util/doris_metrics.h" #include "vec/common/schema_util.h" @@ -66,13 +67,17 @@ void BaseTablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema } } -void BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_schema) { +Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_schema) { std::lock_guard wrlock(_meta_lock); CHECK(_max_version_schema->schema_version() >= update_schema->schema_version()); - auto final_schema = vectorized::schema_util::get_least_common_schema( - {_max_version_schema, update_schema}, _max_version_schema); + TabletSchemaSPtr final_schema; + bool check_column_size = true; + RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( + {_max_version_schema, update_schema}, _max_version_schema, final_schema, + check_column_size)); _max_version_schema = final_schema; VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_structure(); + return Status::OK(); } } /* namespace doris */ diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index b3fc9f8b9b7..2fa494b420a 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -65,7 +65,7 @@ public: void update_max_version_schema(const TabletSchemaSPtr& tablet_schema); - void update_by_least_common_schema(const TabletSchemaSPtr& update_schema); + Status update_by_least_common_schema(const TabletSchemaSPtr& update_schema); TabletSchemaSPtr tablet_schema() const { std::shared_lock rlock(_meta_lock); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index bcac89ad9a0..3b2852a283f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -561,8 +561,9 @@ bool BetaRowsetWriter::_is_segment_overlapping( // => update_schema: A(bigint), B(double), C(int), D(int) void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) { std::lock_guard<std::mutex> lock(*(_context.schema_lock)); - TabletSchemaSPtr update_schema = vectorized::schema_util::get_least_common_schema( - {_context.tablet_schema, flush_schema}, nullptr); + TabletSchemaSPtr update_schema; + static_cast<void>(vectorized::schema_util::get_least_common_schema( + {_context.tablet_schema, flush_schema}, nullptr, update_schema)); CHECK_GE(update_schema->num_columns(), flush_schema->num_columns()) << "Rowset merge schema columns count is " << update_schema->num_columns() << ", but flush_schema is larger " << flush_schema->num_columns() diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 7e904478b1d..af7cec3e80e 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -136,7 +136,7 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, .build(); TabletColumn tablet_column = vectorized::schema_util::get_column_by_type( final_data_type_from_object, column_name, - vectorized::schema_util::ExtraInfo {.unique_id = parent_variant.unique_id(), + vectorized::schema_util::ExtraInfo {.unique_id = -1, .parent_unique_id = parent_variant.unique_id(), .path_info = full_path}); flush_schema->append_column(std::move(tablet_column)); @@ -194,8 +194,9 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, // ctx.tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) std::lock_guard<std::mutex> lock(*(_context->schema_lock)); - TabletSchemaSPtr update_schema = vectorized::schema_util::get_least_common_schema( - {_context->tablet_schema, flush_schema}, nullptr); + TabletSchemaSPtr update_schema; + static_cast<void>(vectorized::schema_util::get_least_common_schema( + {_context->tablet_schema, flush_schema}, nullptr, update_schema)); CHECK_GE(update_schema->num_columns(), flush_schema->num_columns()) << "Rowset merge schema columns count is " << update_schema->num_columns() << ", but flush_schema is larger " << flush_schema->num_columns() diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index c7bda4d3adf..d3a4f3db135 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -355,13 +355,14 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { column_path_to_footer_ordinal; for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) { auto& column_pb = footer.columns(ordinal); + // column path for accessing subcolumns of variant if (column_pb.has_column_path_info()) { - // column path vectorized::PathInData path; path.from_protobuf(column_pb.column_path_info()); column_path_to_footer_ordinal.emplace(path, ordinal); } - if (column_pb.has_unique_id()) { + // unique_id is unsigned, -1 meaning no unique id(e.g. an extracted column from variant) + if (static_cast<int>(column_pb.unique_id()) >= 0) { // unique id column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal); } diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 395d37f7150..21fbed78022 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -309,7 +309,7 @@ Status RowsetBuilder::commit_txn() { // _tabelt->tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) const RowsetWriterContext& rw_ctx = _rowset_writer->context(); - _tablet->update_by_least_common_schema(rw_ctx.tablet_schema); + RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema)); } // Transfer ownership of `PendingRowsetGuard` to `TxnManager` Status res = storage_engine->txn_manager()->commit_txn(_req.partition_id, *tablet, _req.txn_id, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 21d390650ab..0d056d2c235 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -641,7 +641,8 @@ TabletSchemaSPtr Tablet::tablet_schema_with_merged_max_schema_version( std::vector<TabletSchemaSPtr> schemas; std::transform(rowset_metas.begin(), rowset_metas.end(), std::back_inserter(schemas), [](const RowsetMetaSharedPtr& rs_meta) { return rs_meta->tablet_schema(); }); - target_schema = vectorized::schema_util::get_least_common_schema(schemas, nullptr); + static_cast<void>( + vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema)); VLOG_DEBUG << "dump schema: " << target_schema->dump_structure(); } return target_schema; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 85203b0b12b..9346c3573ac 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1186,7 +1186,8 @@ const TabletIndex* TabletSchema::get_inverted_index(int32_t col_unique_id, const TabletIndex* TabletSchema::get_inverted_index(const TabletColumn& col) const { // TODO use more efficient impl - int32_t col_unique_id = col.unique_id(); + // Use parent id if unique not assigned, this could happend when accessing subcolumns of variants + int32_t col_unique_id = col.unique_id() < 0 ? col.parent_unique_id() : col.unique_id(); const std::string& suffix_path = !col.path_info().empty() ? escape_for_path_name(col.path_info().get_path()) : ""; return get_inverted_index(col_unique_id, suffix_path); diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 2c678f7051e..d0fbb287a14 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -310,10 +310,11 @@ void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, TabletColumn common_column; // const std::string& column_name = variant_col_name + "." + tuple_paths[i].get_path(); get_column_by_type(tuple_types[i], tuple_paths[i].get_path(), common_column, - ExtraInfo {.unique_id = variant_col_unique_id, + ExtraInfo {.unique_id = -1, .parent_unique_id = variant_col_unique_id, .path_info = tuple_paths[i]}); - common_schema->append_column(common_column); + // set ColumnType::VARIANT to occupy _field_path_to_index + common_schema->append_column(common_column, TabletSchema::ColumnType::VARIANT); } } @@ -350,23 +351,21 @@ void inherit_tablet_index(TabletSchemaSPtr& schema) { } } -TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, - const TabletSchemaSPtr& base_schema) { - auto output_schema = std::make_shared<TabletSchema>(); +Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, + const TabletSchemaSPtr& base_schema, TabletSchemaSPtr& output_schema, + bool check_schema_size) { std::vector<int32_t> variant_column_unique_id; - if (base_schema == nullptr) { - // Pick tablet schema with max schema version - auto max_version_schema = - *std::max_element(schemas.cbegin(), schemas.cend(), - [](const TabletSchemaSPtr a, const TabletSchemaSPtr b) { - return a->schema_version() < b->schema_version(); - }); - CHECK(max_version_schema); - output_schema->copy_from(*max_version_schema); + + // Construct a schema excluding the extracted columns and gather unique identifiers for variants. + // Ensure that the output schema also excludes these extracted columns. This approach prevents + // duplicated paths following the update_least_common_schema process. + auto build_schema_without_extracted_columns = [&](const TabletSchemaSPtr& base_schema) { + output_schema = std::make_shared<TabletSchema>(); + output_schema->copy_from(*base_schema); // Merge columns from other schemas output_schema->clear_columns(); // Get all columns without extracted columns and collect variant col unique id - for (const TabletColumn& col : max_version_schema->columns()) { + for (const TabletColumn& col : base_schema->columns()) { if (col.is_variant_type()) { variant_column_unique_id.push_back(col.unique_id()); } @@ -374,15 +373,19 @@ TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>& sc output_schema->append_column(col); } } + }; + if (base_schema == nullptr) { + // Pick tablet schema with max schema version + auto max_version_schema = + *std::max_element(schemas.cbegin(), schemas.cend(), + [](const TabletSchemaSPtr a, const TabletSchemaSPtr b) { + return a->schema_version() < b->schema_version(); + }); + CHECK(max_version_schema); + build_schema_without_extracted_columns(max_version_schema); } else { - // use input common schema as base schema - // Get all columns without extracted columns and collect variant col unique id - for (const TabletColumn& col : base_schema->columns()) { - if (col.is_variant_type()) { - variant_column_unique_id.push_back(col.unique_id()); - } - } - output_schema->copy_from(*base_schema); + // use input base_schema schema as base schema + build_schema_without_extracted_columns(base_schema); } for (int32_t unique_id : variant_column_unique_id) { @@ -390,7 +393,12 @@ TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>& sc } inherit_tablet_index(output_schema); - return output_schema; + if (check_schema_size && + output_schema->columns().size() > config::variant_max_merged_tablet_schema_size) { + return Status::DataQualityError("Reached max column size limit {}", + config::variant_max_merged_tablet_schema_size); + } + return Status::OK(); } Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos) { diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index d5d01b57ed9..de5778157df 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -96,8 +96,9 @@ void encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& vari // Then update all variant columns to there least common types. // Return the final merged schema as common schema. // If base_schema == nullptr then, max schema version tablet schema will be picked as base schema -TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, - const TabletSchemaSPtr& base_schema); +Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, + const TabletSchemaSPtr& base_schema, TabletSchemaSPtr& result, + bool check_schema_size = false); // Get least common types for extracted columns which has Path info, // with a speicified variant column's unique id diff --git a/regression-test/suites/variant_p0/column_size_limit.groovy b/regression-test/suites/variant_p0/column_size_limit.groovy new file mode 100644 index 00000000000..70567d89c07 --- /dev/null +++ b/regression-test/suites/variant_p0/column_size_limit.groovy @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import groovy.json.JsonBuilder + +suite("regression_test_variant_column_limit", "nonConcurrent"){ + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + def table_name = "var_column_limit" + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + try { + def jsonBuilder = new JsonBuilder() + def root = jsonBuilder { + // Generate 2049 fields + (1..2049).each { fieldNumber -> + "field$fieldNumber" fieldNumber + } + } + + String jsonString = jsonBuilder.toPrettyString() + sql """insert into ${table_name} values (1, '$jsonString')""" + } catch(Exception ex) { + logger.info("""INSERT INTO ${table_name} failed: """ + ex) + assertTrue(ex.toString().contains("Reached max column")); + } finally { + } + sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" : 3}')""" + +} \ No newline at end of file diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index 2310e7605fc..57737397b49 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -75,7 +75,7 @@ suite("regression_test_variant", "variant_type"){ } try { - + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") def key_types = ["DUPLICATE", "UNIQUE"] for (int i = 0; i < key_types.size(); i++) { def table_name = "simple_variant_${key_types[i]}" @@ -311,18 +311,17 @@ suite("regression_test_variant", "variant_type"){ qt_sql_35_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162 limit 1;""" // TODO add test case that some certain columns are materialized in some file while others are not materilized(sparse) - // unique table + // unique table set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") - table_name = "github_events_unique" - sql """DROP TABLE IF EXISTS ${table_name}""" table_name = "github_events" + sql """DROP TABLE IF EXISTS ${table_name}""" sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( k bigint, v variant ) UNIQUE KEY(`k`) - DISTRIBUTED BY HASH(k) BUCKETS 4 + DISTRIBUTED BY HASH(k) BUCKETS 4 properties("replication_num" = "1", "disable_auto_compaction" = "true"); """ load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") @@ -399,5 +398,6 @@ suite("regression_test_variant", "variant_type"){ } finally { // reset flags set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + set_be_config.call("variant_max_merged_tablet_schema_size", "2048") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org