w41ter commented on code in PR #33415: URL: https://github.com/apache/doris/pull/33415#discussion_r1558802079
########## be/src/service/backend_service.cpp: ########## @@ -307,41 +307,81 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { std::vector<uint64_t> segment_index_file_sizes; std::vector<std::string> segment_index_file_names; auto tablet_schema = rowset_meta->tablet_schema(); - for (const auto& index : tablet_schema->indexes()) { - if (index.index_type() != IndexType::INVERTED) { - continue; + if (tablet_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { + for (const auto& index : tablet_schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + auto get_segment_index_file_size_url = fmt::format( + "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={" + "}", + binlog_api_url, "get_segment_index_file", request.remote_tablet_id, + remote_rowset_id, segment_index, index_id); + uint64_t segment_index_file_size; + auto get_segment_index_file_size_cb = + [&get_segment_index_file_size_url, + &segment_index_file_size](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); + client->set_timeout_ms(kMaxTimeoutMs); + RETURN_IF_ERROR(client->head()); + return client->get_content_length(&segment_index_file_size); + }; + auto index_file = InvertedIndexDescriptor::inverted_index_file_path( + local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index, + index_id, index.get_index_suffix()); + segment_index_file_names.push_back(index_file); + + status = HttpClient::execute_with_retry(max_retry, 1, + get_segment_index_file_size_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get segment file size from " + << get_segment_index_file_size_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + + segment_index_file_sizes.push_back(segment_index_file_size); + segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url)); + } } - auto index_id = index.index_id(); + } else { for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { - auto get_segment_index_file_size_url = fmt::format( - "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={" - "}", - binlog_api_url, "get_segment_index_file", request.remote_tablet_id, - remote_rowset_id, segment_index, index_id); - uint64_t segment_index_file_size; - auto get_segment_index_file_size_cb = [&get_segment_index_file_size_url, - &segment_index_file_size](HttpClient* client) { - RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); - RETURN_IF_ERROR(client->head()); - return client->get_content_length(&segment_index_file_size); - }; - auto index_file = InvertedIndexDescriptor::inverted_index_file_path( - local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index, index_id, - index.get_index_suffix()); - segment_index_file_names.push_back(index_file); - - status = HttpClient::execute_with_retry(max_retry, 1, get_segment_index_file_size_cb); - if (!status.ok()) { - LOG(WARNING) << "failed to get segment file size from " - << get_segment_index_file_size_url - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; - } + if (tablet_schema->has_inverted_index()) { + auto get_segment_index_file_size_url = fmt::format( + "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={" + "}", + binlog_api_url, "get_segment_index_file", request.remote_tablet_id, + remote_rowset_id, segment_index, -1); + uint64_t segment_index_file_size; + auto get_segment_index_file_size_cb = + [&get_segment_index_file_size_url, + &segment_index_file_size](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); + client->set_timeout_ms(kMaxTimeoutMs); + RETURN_IF_ERROR(client->head()); + return client->get_content_length(&segment_index_file_size); + }; + auto local_segment_path = BetaRowset::segment_file_path( + local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index); + auto index_file = InvertedIndexDescriptor::get_index_file_name(local_segment_path); + segment_index_file_names.push_back(index_file); + + status = HttpClient::execute_with_retry(max_retry, 1, + get_segment_index_file_size_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get segment file size from " + << get_segment_index_file_size_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } - segment_index_file_sizes.push_back(segment_index_file_size); - segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url)); + segment_index_file_sizes.push_back(segment_index_file_size); + segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url)); Review Comment: It looks like the only difference is the url and index file name, how about changing it to the following code? ```c++ auto get_segment_index_file_size_url = (tablet_schema->get_reverted_index_storage_format() == V1) ? fmt::format(xxxx, index_id) : fmt::format(xxxx, -1); ``` ########## be/src/olap/tablet.cpp: ########## @@ -2411,14 +2411,24 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, std::string_view segment_index, std::string_view index_id) const { - // TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged - return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id); + auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format(); + if (format == doris::InvertedIndexStorageFormatPB::V1) { + return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, + index_id); + } else { + return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index); + } } std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index, int64_t index_id) const { - // TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged - return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id); + auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format(); + if (format == doris::InvertedIndexStorageFormatPB::V1) { + return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, + index_id); + } else { Review Comment: Consider adding a check: ```c++ DCHECK(index_id == -1); ``` ########## regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_index") { + + def syncer = getSyncer() + if (!syncer.checkEnableFeatureBinlog()) { + logger.info("fe enable_feature_binlog is false, skip case test_backup_restore") + return + } + + def insert_data = { tableName -> + [ + """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100);""", + """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""", + """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""", + """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""", + """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""", + """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);""" + ] + } + + def sqls = { tableName -> + [ + """ select * from ${tableName} order by id, name, hobbies, score """, + """ select * from ${tableName} where name match "andy" order by id, name, hobbies, score """, + """ select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """, + """ select * from ${tableName} where score < 100 order by id, name, hobbies, score """ + ] + } + + def run_sql = { tableName -> + sqls(tableName).each { sqlStatement -> + def target_res = target_sql sqlStatement + def res = sql sqlStatement + assertEquals(res, target_res) + } + } + + def create_table_v1 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true" + ); + """ + } + + def create_table_v2 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true", + "inverted_index_storage_format" = "V2" + ); + """ + } + + def create_table_mow_v1 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true", + "enable_unique_key_merge_on_write" = "true"); + """ + } + + def create_table_mow_v2 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true", + "enable_unique_key_merge_on_write" = "true", + "inverted_index_storage_format" = "V2"); + """ + } + + def run_test = { create_table, tableName -> + sql "DROP TABLE IF EXISTS ${tableName}" + sql create_table + + logger.info("=== Test 1: Common backup and restore ===") + def snapshotName = "snapshot_test_" + tableName + insert_data(tableName).each { sqlStatement -> + sql sqlStatement + } + sql " sync " + def res = sql "SELECT * FROM ${tableName}" + if (tableName.contains("mow")) { + assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer) + } else { + assertEquals(res.size(), insert_data(tableName).size()) + } + + sql """ + BACKUP SNAPSHOT ${context.dbName}.${snapshotName} + TO `__keep_on_local__` + ON (${tableName}) + PROPERTIES ("type" = "full") + """ + while (syncer.checkSnapshotFinish() == false) { + Thread.sleep(3000) + } Review Comment: ```suggestion syncer.waitAllSnapshotFinish() ``` ########## regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_index") { + + def syncer = getSyncer() + if (!syncer.checkEnableFeatureBinlog()) { + logger.info("fe enable_feature_binlog is false, skip case test_backup_restore") + return + } + + def insert_data = { tableName -> + [ + """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100);""", + """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""", + """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""", + """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""", + """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""", + """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);""" + ] + } + + def sqls = { tableName -> + [ + """ select * from ${tableName} order by id, name, hobbies, score """, + """ select * from ${tableName} where name match "andy" order by id, name, hobbies, score """, + """ select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """, + """ select * from ${tableName} where score < 100 order by id, name, hobbies, score """ + ] + } + + def run_sql = { tableName -> + sqls(tableName).each { sqlStatement -> + def target_res = target_sql sqlStatement + def res = sql sqlStatement + assertEquals(res, target_res) + } + } + + def create_table_v1 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true" + ); + """ + } + + def create_table_v2 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true", + "inverted_index_storage_format" = "V2" + ); + """ + } + + def create_table_mow_v1 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true", + "enable_unique_key_merge_on_write" = "true"); + """ + } + + def create_table_mow_v2 = { tableName -> + """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "binlog.enable" = "true", + "enable_unique_key_merge_on_write" = "true", + "inverted_index_storage_format" = "V2"); + """ + } + + def run_test = { create_table, tableName -> + sql "DROP TABLE IF EXISTS ${tableName}" + sql create_table + + logger.info("=== Test 1: Common backup and restore ===") + def snapshotName = "snapshot_test_" + tableName + insert_data(tableName).each { sqlStatement -> + sql sqlStatement + } + sql " sync " + def res = sql "SELECT * FROM ${tableName}" + if (tableName.contains("mow")) { + assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer) + } else { + assertEquals(res.size(), insert_data(tableName).size()) + } + + sql """ + BACKUP SNAPSHOT ${context.dbName}.${snapshotName} + TO `__keep_on_local__` + ON (${tableName}) + PROPERTIES ("type" = "full") + """ + while (syncer.checkSnapshotFinish() == false) { + Thread.sleep(3000) + } + assertTrue(syncer.getSnapshot("${snapshotName}", "${tableName}")) + assertTrue(syncer.restoreSnapshot(true)) + while (syncer.checkRestoreFinish() == false) { + Thread.sleep(3000) + } Review Comment: ```suggestion syncer.waitTargetRestoreFinish() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org