This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push: new fc715acc1a7 add config variant_max_sparse_column_statistics_size and fix test (#49632) fc715acc1a7 is described below commit fc715acc1a70806b8fc3c1d9a97807418b703940 Author: lihangyu <lihan...@selectdb.com> AuthorDate: Mon Mar 31 21:52:36 2025 +0800 add config variant_max_sparse_column_statistics_size and fix test (#49632) --- be/src/common/config.cpp | 2 + be/src/common/config.h | 3 + be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +- .../segment_v2/variant_column_writer_impl.cpp | 4 +- .../rowset/segment_v2/variant_column_writer_impl.h | 8 +- be/src/vec/columns/column_object.cpp | 38 ++----- be/src/vec/common/schema_util.cpp | 8 +- .../variant_column_writer_reader_test.cpp | 10 +- be/test/vec/common/schema_util_test.cpp | 6 +- .../test_variant_compaction_with_sparse_limit.out | Bin 0 -> 5787 bytes regression-test/data/variant_p0/load.out | Bin 16080 -> 16265 bytes ...est_variant_compaction_with_sparse_limit.groovy | 123 +++++++++++++++++++++ regression-test/suites/variant_p0/load.groovy | 2 +- 13 files changed, 157 insertions(+), 49 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b9270703d3e..58c38858a09 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1257,6 +1257,8 @@ DEFINE_Bool(enable_snapshot_action, "false"); DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048"); +DEFINE_mInt32(variant_max_sparse_column_statistics_size, "10000"); + DEFINE_mBool(enable_column_type_check, "true"); // 128 MB DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 780b382f103..65728fbb050 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1337,6 +1337,9 @@ DECLARE_Bool(enable_snapshot_action); // The max columns size for a tablet schema DECLARE_mInt32(variant_max_merged_tablet_schema_size); +// The max sparse column statistics size for a variant column +DECLARE_mInt32(variant_max_sparse_column_statistics_size); + DECLARE_mInt64(local_exchange_buffer_mem_limit); DECLARE_mInt64(enable_debug_log_timeout_secs); diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 2edbb3a1350..881e4edaefa 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -435,7 +435,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet // which means the path maybe exist in sparse_column bool exceeded_sparse_column_limit = !_statistics->sparse_column_non_null_size.empty() && _statistics->sparse_column_non_null_size.size() == - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE; + config::variant_max_sparse_column_statistics_size; // For compaction operations, read flat leaves, otherwise read hierarchical data // Since the variant subcolumns are flattened in schema_util::get_compaction_schema diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index ae1f2a8de61..1a8e8ed38af 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -222,7 +222,7 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st paths.emplace(path); } // // todo : Add all remaining paths into shared data statistics until we reach its max size; - // else if (new_statistics.sparse_data_paths_statistics.size() < Statistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + // else if (new_statistics.sparse_data_paths_statistics.size() < Statistics::config::variant_max_sparse_column_statistics_size) { // new_statistics.sparse_data_paths_statistics.emplace(path, size); // } } @@ -421,7 +421,7 @@ Status VariantColumnWriterImpl::_process_sparse_column( it != sparse_data_paths_statistics.end()) { ++it->second; } else if (sparse_data_paths_statistics.size() < - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + config::variant_max_sparse_column_statistics_size) { sparse_data_paths_statistics.emplace(path, 1); } } diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index 9f67cf04505..42beb41bf8a 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -36,13 +36,7 @@ class ColumnWriter; class ScalarColumnWriter; struct VariantStatistics { - // #ifdef BE_TEST - // static constexpr size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10; - // #else - // // If reached the size of this, we should stop writing statistics for sparse data - // static constexpr size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000; - // #endif - static constexpr size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000; + // If reached the size of this, we should stop writing statistics for sparse data std::map<std::string, size_t> subcolumns_non_null_size; std::map<std::string, size_t> sparse_column_non_null_size; diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 448ff7df4c1..7947c751246 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -888,32 +888,6 @@ void ColumnObject::check_consistency() const { "unmatched sparse column:, expeted rows: {}, but meet: {}", num_rows, serialized_sparse_column->size()); } - -#ifndef NDEBUG - bool error = false; - auto [path, value] = get_sparse_data_paths_and_values(); - - auto& offsets = serialized_sparse_column_offsets(); - for (size_t row = 0; row != num_rows; ++row) { - size_t offset = offsets[row - 1]; - size_t end = offsets[row]; - // Iterator over [path, binary value] - for (size_t i = offset; i != end; ++i) { - const StringRef sparse_path_string = path->get_data_at(i); - const std::string_view sparse_path(sparse_path_string); - - const PathInData column_path(sparse_path); - if (auto* subcolumn = get_subcolumn(column_path); subcolumn != nullptr) { - LOG(WARNING) << "err path: " << sparse_path; - error = true; - } - } - } - if (error) { - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "path {} both exists in subcolumn and sparse columns"); - } -#endif } size_t ColumnObject::size() const { @@ -1810,6 +1784,18 @@ bool ColumnObject::is_visible_root_value(size_t nrow) const { if (root->data.is_null_at(nrow)) { return false; } + for (const auto& subcolumn : subcolumns) { + if (subcolumn->data.is_root) { + continue; // Skip the root column + } + + // If any non-root subcolumn is NOT null, set serialize_root to false and exit early + if (!assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>( + *subcolumn->data.get_finalized_column_ptr()) + .is_null_at(nrow)) { + return false; + } + } if (root->data.least_common_type.get_base_type_id() == TypeIndex::VARIANT) { // nested field return !root->data.is_empty_nested(nrow); diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 6298daffd8f..dd715b5267f 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -716,7 +716,7 @@ Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs, RowsetShare // In input rowsets, some rowsets may have statistics values exceeding the maximum limit, // which leads to inaccurate statistics - if (stats.size() > VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + if (stats.size() > config::variant_max_sparse_column_statistics_size) { // When there is only one segment, we can ensure that the size of each path in output stats is accurate if (output->num_segments() == 1) { for (const auto& [path, size] : stats) { @@ -841,19 +841,19 @@ void calculate_variant_stats(const IColumn& encoded_sparse_column, } // If path doesn't exist and we haven't hit the max statistics size limit, // add it with count 1 - else if (count_map.size() < VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + else if (count_map.size() < config::variant_max_sparse_column_statistics_size) { count_map.emplace(sparse_path, 1); } } } if (stats->sparse_column_non_null_size().size() > - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + config::variant_max_sparse_column_statistics_size) { throw doris::Exception( ErrorCode::INTERNAL_ERROR, "Sparse column non null size: {} is greater than max statistics size: {}", stats->sparse_column_non_null_size().size(), - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE); + config::variant_max_sparse_column_statistics_size); } } diff --git a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp index df8428ac877..476f1136992 100644 --- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp +++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp @@ -365,15 +365,15 @@ TEST_F(VariantColumnWriterReaderTest, test_write_data_normal) { // 13. check statistics size == limit auto& variant_stats = variant_column_reader->_statistics; EXPECT_TRUE(variant_stats->sparse_column_non_null_size.size() < - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE); - auto limit = VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE - + config::variant_max_sparse_column_statistics_size); + auto limit = config::variant_max_sparse_column_statistics_size - variant_stats->sparse_column_non_null_size.size(); for (int i = 0; i < limit; ++i) { std::string key = parent_column.name_lower_case() + ".key10" + std::to_string(i); variant_stats->sparse_column_non_null_size[key] = 10000; } EXPECT_TRUE(variant_stats->sparse_column_non_null_size.size() == - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE); + config::variant_max_sparse_column_statistics_size); st = variant_column_reader->new_iterator(&it, subcolumn, &storage_read_opts); EXPECT_TRUE(st.ok()) << st.msg(); @@ -607,7 +607,7 @@ TEST_F(VariantColumnWriterReaderTest, test_write_data_advanced) { EXPECT_EQ(value, inserted_jsonstr[i]); } - auto read_to_column_object = [&]() { + auto read_to_column_object = [&]() { new_column_object = ColumnObject::create(10); nrows = 1000; st = it->seek_to_ordinal(0); @@ -641,7 +641,7 @@ TEST_F(VariantColumnWriterReaderTest, test_write_data_advanced) { for (int row = 0; row < 1000; ++row) { std::string value; st = assert_cast<ColumnObject*>(new_column_object.get()) - ->serialize_one_row_to_string(row, &value); + ->serialize_one_row_to_string(row, &value); EXPECT_TRUE(st.ok()) << st.msg(); if (value.find("nested" + key_num) != std::string::npos) { key_nested_count++; diff --git a/be/test/vec/common/schema_util_test.cpp b/be/test/vec/common/schema_util_test.cpp index ffa790ddf49..7fd7eb6877b 100644 --- a/be/test/vec/common/schema_util_test.cpp +++ b/be/test/vec/common/schema_util_test.cpp @@ -208,10 +208,10 @@ TEST_F(SchemaUtilTest, calculate_variant_stats) { // test with max size column_map->clear(); const auto& key_value_counts3 = construct_column_map_with_random_values( - column_map, VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE, 5, "key2_"); + column_map, config::variant_max_sparse_column_statistics_size, 5, "key2_"); schema_util::calculate_variant_stats(*column_map, &stats, 0, - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE); - EXPECT_EQ(VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE, + config::variant_max_sparse_column_statistics_size); + EXPECT_EQ(config::variant_max_sparse_column_statistics_size, stats.sparse_column_non_null_size_size()); for (const auto& [path, size] : stats.sparse_column_non_null_size()) { diff --git a/regression-test/data/fault_injection_p0/test_variant_compaction_with_sparse_limit.out b/regression-test/data/fault_injection_p0/test_variant_compaction_with_sparse_limit.out new file mode 100644 index 00000000000..a5c4281ee98 Binary files /dev/null and b/regression-test/data/fault_injection_p0/test_variant_compaction_with_sparse_limit.out differ diff --git a/regression-test/data/variant_p0/load.out b/regression-test/data/variant_p0/load.out index dbdee9d7940..ecbfb38a747 100644 Binary files a/regression-test/data/variant_p0/load.out and b/regression-test/data/variant_p0/load.out differ diff --git a/regression-test/suites/fault_injection_p0/test_variant_compaction_with_sparse_limit.groovy b/regression-test/suites/fault_injection_p0/test_variant_compaction_with_sparse_limit.groovy new file mode 100644 index 00000000000..4cc336a2034 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_variant_compaction_with_sparse_limit.groovy @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.awaitility.Awaitility + +suite("test_compaction_variant_with_sparse_limit", "nonConcurrent") { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + try { + String backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + + set_be_config("variant_max_sparse_column_statistics_size", "2") + def create_table = { tableName, buckets="auto", key_type="DUPLICATE" -> + sql "DROP TABLE IF EXISTS ${tableName}" + def var_def = "variant" + if (key_type == "AGGREGATE") { + var_def = "variant replace" + } + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k bigint, + v ${var_def} + ) + ${key_type} KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + } + + def key_types = ["DUPLICATE", "UNIQUE", "AGGREGATE"] + // def key_types = ["AGGREGATE"] + for (int i = 0; i < key_types.size(); i++) { + def tableName = "simple_variant_${key_types[i]}" + // 1. simple cases + create_table.call(tableName, "1", key_types[i]) + def insert = { + sql """insert into ${tableName} values (1, '{"x" : [1]}'),(13, '{"a" : 1}');""" + sql """insert into ${tableName} values (2, '{"a" : "1"}'),(14, '{"a" : [[[1]]]}');""" + sql """insert into ${tableName} values (3, '{"x" : [3]}'),(15, '{"a" : 1}')""" + sql """insert into ${tableName} values (4, '{"y": 1}'),(16, '{"a" : "1223"}');""" + sql """insert into ${tableName} values (5, '{"z" : 2.0}'),(17, '{"a" : [1]}');""" + sql """insert into ${tableName} values (6, '{"x" : 111}'),(18, '{"a" : ["1", 2, 1.1]}');""" + sql """insert into ${tableName} values (7, '{"m" : 1}'),(19, '{"a" : 1, "b" : {"c" : 1}}');""" + sql """insert into ${tableName} values (8, '{"l" : 2}'),(20, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" + sql """insert into ${tableName} values (9, '{"g" : 1.11}'),(21, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" + sql """insert into ${tableName} values (10, '{"z" : 1.1111}'),(22, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" + sql """insert into ${tableName} values (11, '{"sala" : 0}'),(1999, '{"a" : 1, "b" : {"c" : 1}}'),(19921, '{"a" : 1, "b" : 10}');""" + sql """insert into ${tableName} values (12, '{"dddd" : 0.1}'),(1022, '{"a" : 1, "b" : 10}'),(1029, '{"a" : 1, "b" : {"c" : 1}}');""" + } + insert.call(); + insert.call(); + qt_sql_1 "SELECT * FROM ${tableName} ORDER BY k, cast(v as string); " + qt_sql_2 "select k, cast(v['a'] as array<int>) from ${tableName} where size(cast(v['a'] as array<int>)) > 0 order by k" + qt_sql_3 "select k, v['a'], cast(v['b'] as string) from ${tableName} where length(cast(v['b'] as string)) > 4 order by k" + qt_sql_5 "select cast(v['b'] as string), cast(v['b']['c'] as string) from ${tableName} where cast(v['b'] as string) != 'null' and cast(v['b'] as string) != '{}' order by k desc, 1, 2 limit 10;" + + + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + // trigger compactions for all tablets in ${tableName} + trigger_and_wait_compaction(tableName, "cumulative") + + int rowCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + rowCount += Integer.parseInt(rowset.split(" ")[1]) + } + } + // assert (rowCount < 8) + qt_sql_11 "SELECT * FROM ${tableName} ORDER BY k, cast(v as string); " + qt_sql_22 "select k, cast(v['a'] as array<int>) from ${tableName} where size(cast(v['a'] as array<int>)) > 0 order by k" + qt_sql_33 "select k, v['a'], cast(v['b'] as string) from ${tableName} where length(cast(v['b'] as string)) > 4 order by k" + qt_sql_55 "select cast(v['b'] as string), cast(v['b']['c'] as string) from ${tableName} where cast(v['b'] as string) != 'null' and cast(v['b'] as string) != '{}' order by k desc limit 10;" + } + + } finally { + // set back to default + set_be_config("variant_max_sparse_column_statistics_size", "10000") + } +} diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index 267f86e7e57..c2e54dffe3c 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -96,7 +96,7 @@ suite("regression_test_variant", "p0"){ sql """insert into ${table_name} values (11, '[123.1]'),(1999, '{"a" : 1, "b" : {"c" : 1}}'),(19921, '{"a" : 1, "b" : 10}');""" sql """insert into ${table_name} values (12, '[123.2]'),(1022, '{"a" : 1, "b" : 10}'),(1029, '{"a" : 1, "b" : {"c" : 1}}');""" qt_sql1 "select k, cast(v['a'] as array<int>) from ${table_name} where size(cast(v['a'] as array<int>)) > 0 order by k, cast(v['a'] as string) asc" - qt_sql2 "select k, cast(v as int), cast(v['b'] as string) from ${table_name} where length(cast(v['b'] as string)) > 4 order by k, cast(v as string), cast(v['b'] as string) " + qt_sql2 "select k, cast(v as string), cast(v['b'] as string) from ${table_name} where length(cast(v['b'] as string)) > 4 order by k, cast(v as string), cast(v['b'] as string) " qt_sql3 "select k, v from ${table_name} order by k, cast(v as string) limit 5" qt_sql4 "select v['b'], v['b']['c'], cast(v as int) from ${table_name} where cast(v['b'] as string) != 'null' and cast(v['b'] as string) is not null and cast(v['b'] as string) != '{}' order by k,cast(v as string) desc limit 10000;" qt_sql5 "select v['b'] from ${table_name} where cast(v['b'] as int) > 0;" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org