This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 52d2cf8ed69 [fix] (inverted index v2) fix inverted index format is lost during a schema change (#36059) 52d2cf8ed69 is described below commit 52d2cf8ed69125f2746dce1eb135015d25236a59 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Sun Jun 9 07:57:16 2024 +0800 [fix] (inverted index v2) fix inverted index format is lost during a schema change (#36059) inverted index format is lost during a schema change or replica recovery --- .../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +- .../java/org/apache/doris/backup/RestoreJob.java | 2 +- .../org/apache/doris/master/ReportHandler.java | 2 + .../test_schema_change_storage_format.groovy | 149 +++++++++++++++++++++ 4 files changed, 153 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 8a75e32ddbc..57ae326374a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -282,7 +282,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); } - + createReplicaTask.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat()); batchTask.addTask(createReplicaTask); } // end for rollupReplicas } // end for rollupTablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 57861551c3a..70a0ed94c17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1104,7 +1104,7 @@ public class RestoreJob extends AbstractJob { localTbl.getTimeSeriesCompactionLevelThreshold(), localTbl.storeRowColumn(), binlogConfig); - + task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat()); task.setInRestoreMode(true); batchTask.addTask(task); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index c0ad4183d45..27213652d46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -884,6 +884,8 @@ public class ReportHandler extends Daemon { binlogConfig); createReplicaTask.setIsRecoverTask(true); + createReplicaTask.setInvertedIndexStorageFormat(olapTable + .getInvertedIndexStorageFormat()); createReplicaBatchTask.addTask(createReplicaTask); } else { // just set this replica as bad diff --git a/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy b/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy new file mode 100644 index 00000000000..0b50896c21f --- /dev/null +++ b/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_schema_change_storge_format", "p0") { + + def calc_file_crc_on_tablet = { ip, port, tablet -> + return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) + } + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def table_name = "github_events" + sql """DROP TABLE IF EXISTS ${table_name}""" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant, + change_column double, + INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""") + + def getJobState = { indexName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def wait_for_schema_change = { -> + int max_try_time = 3000 + while (max_try_time--){ + String result = getJobState(table_name) + if (result == "FINISHED") { + sleep(3000) + break + } else { + if (result == "RUNNING") { + sleep(3000) + } + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + tablets = sql_return_maparray """ show tablets from ${table_name}; """ + String tablet_id = tablets[0].TabletId + String backend_id = tablets[0].BackendId + String ip = backendId_to_backendIP.get(backend_id) + String port = backendId_to_backendHttpPort.get(backend_id) + def (code_0, out_0, err_0) = calc_file_crc_on_tablet(ip, port, tablet_id) + logger.info("Run calc_file_crc_on_tablet: code=" + code_0 + ", out=" + out_0 + ", err=" + err_0) + assertTrue(code_0 == 0) + assertTrue(out_0.contains("crc_value")) + assertTrue(out_0.contains("used_time_ms")) + assertEquals("0", parseJson(out_0.trim()).start_version) + assertEquals("5", parseJson(out_0.trim()).end_version) + assertEquals("5", parseJson(out_0.trim()).rowset_count) + // inverted index format = v2, 4 segments + 4 inverted index file + assertEquals("8", parseJson(out_0.trim()).file_count) + + sql """ ALTER TABLE ${table_name} modify COLUMN change_column text""" + wait_for_schema_change.call() + + tablets = sql_return_maparray """ show tablets from ${table_name}; """ + tablet_id = tablets[0].TabletId + backend_id = tablets[0].BackendId + ip = backendId_to_backendIP.get(backend_id) + port = backendId_to_backendHttpPort.get(backend_id) + def (code_1, out_1, err_1) = calc_file_crc_on_tablet(ip, port, tablet_id) + logger.info("Run calc_file_crc_on_tablet: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertTrue(code_1 == 0) + assertTrue(out_1.contains("crc_value")) + assertTrue(out_1.contains("used_time_ms")) + assertEquals("0", parseJson(out_1.trim()).start_version) + assertEquals("5", parseJson(out_1.trim()).end_version) + assertEquals("5", parseJson(out_1.trim()).rowset_count) + // inverted index format = v2, 4 segments + 4 inverted index file + assertEquals("8", parseJson(out_1.trim()).file_count) + + // sql """ALTER TABLE ${table_name} drop index idx_var""" + // double_write.call() + // qt_sql "select v['type'], v['id'], v['created_at'] from ${table_name} where cast(v['id'] as bigint) != 25061216922 order by k, cast(v['id'] as bigint) limit 10" + + + set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "2147483648") +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org