This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 385589c92b1 [fix](regression)Fix unstable compaction related cases (#46920) (#47003) 385589c92b1 is described below commit 385589c92b1920c41394a1e0b6f35913d107c9d3 Author: qiye <l...@selectdb.com> AuthorDate: Wed Jan 15 13:42:04 2025 +0800 [fix](regression)Fix unstable compaction related cases (#46920) (#47003) bp #46920 --- regression-test/plugins/plugin_compaction.groovy | 10 ++- ...st_skip_index_compaction_fault_injection.groovy | 86 ++-------------------- ..._index_change_with_cumulative_compaction.groovy | 33 +++++---- 3 files changed, 33 insertions(+), 96 deletions(-) diff --git a/regression-test/plugins/plugin_compaction.groovy b/regression-test/plugins/plugin_compaction.groovy index eaefa9a10d3..45dd99a97a3 100644 --- a/regression-test/plugins/plugin_compaction.groovy +++ b/regression-test/plugins/plugin_compaction.groovy @@ -57,8 +57,7 @@ Suite.metaClass.be_run_full_compaction_by_table_id = { String ip, String port, S } logger.info("Added 'be_run_full_compaction' function to Suite") - -Suite.metaClass.trigger_and_wait_compaction = { String table_name, String compaction_type, int timeout_seconds=300 -> +Suite.metaClass.trigger_and_wait_compaction = { String table_name, String compaction_type, int timeout_seconds=300, String[] ignored_errors=[] -> if (!(compaction_type in ["cumulative", "base", "full"])) { throw new IllegalArgumentException("invalid compaction type: ${compaction_type}, supported types: cumulative, base, full") } @@ -102,12 +101,15 @@ Suite.metaClass.trigger_and_wait_compaction = { String table_name, String compac assert exit_code == 0: "trigger compaction failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}" def trigger_status = parseJson(stdout.trim()) if (trigger_status.status.toLowerCase() != "success") { - if (trigger_status.status.toLowerCase() == "already_exist") { + def status_lower = trigger_status.status.toLowerCase() + if (status_lower == "already_exist") { triggered_tablets.add(tablet) // compaction already in queue, treat it as successfully triggered } else if (!auto_compaction_disabled) { // ignore the error if auto compaction enabled - } else if (trigger_status.status.contains("E-2000")) { + } else if (status_lower.contains("e-2000")) { // ignore this tablet compaction. + } else if (ignored_errors.any { error -> status_lower.contains(error.toLowerCase()) }) { + // ignore this tablet compaction if the error is in the ignored_errors list } else { throw new Exception("trigger compaction failed, be host: ${be_host}, tablet id: ${tablet.TabletId}, status: ${trigger_status.status}") } diff --git a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy index 52d8d481c8d..ee18ffb7304 100644 --- a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy @@ -19,8 +19,8 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { def isCloudMode = isCloudMode() + // branch-2.1 only support index compaction with index_format_v1 def tableName1 = "test_skip_index_compaction_fault_injection_1" - def tableName2 = "test_skip_index_compaction_fault_injection_2" def backendId_to_backendIP = [:] def backendId_to_backendHttpPort = [:] getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); @@ -46,29 +46,8 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { ); """ - sql "DROP TABLE IF EXISTS ${tableName2}" - sql """ - CREATE TABLE ${tableName2} ( - `@timestamp` int(11) NULL COMMENT "", - `clientip` varchar(20) NULL COMMENT "", - `request` text NULL COMMENT "", - `status` int(11) NULL COMMENT "", - `size` int(11) NULL COMMENT "", - INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', - INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' - ) ENGINE=OLAP - DUPLICATE KEY(`@timestamp`) - COMMENT "OLAP" - DISTRIBUTED BY RANDOM BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "disable_auto_compaction" = "true", - "inverted_index_storage_format" = "V2" - ); - """ - boolean disableAutoCompaction = false - + def set_be_config = { key, value -> for (String backend_id: backendId_to_backendIP.keySet()) { def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) @@ -76,49 +55,6 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { } } - def trigger_full_compaction_on_tablets = { tablets -> - for (def tablet : tablets) { - String tablet_id = tablet.TabletId - String backend_id = tablet.BackendId - int times = 1 - - String compactionStatus; - do{ - def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - ++times - sleep(2000) - compactionStatus = parseJson(out.trim()).status.toLowerCase(); - } while (compactionStatus!="success" && times<=10 && compactionStatus!="e-6010") - - - if (compactionStatus == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction && compactionStatus!="e-6010") { - assertEquals("success", compactionStatus) - } - } - } - - def wait_full_compaction_done = { tablets -> - for (def tablet in tablets) { - boolean running = true - do { - Thread.sleep(1000) - String tablet_id = tablet.TabletId - String backend_id = tablet.BackendId - def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactionStatus = parseJson(out.trim()) - assertEquals("success", compactionStatus.status.toLowerCase()) - running = compactionStatus.run_status - } while (running) - } - } - def get_rowset_count = { tablets -> int rowsetCount = 0 for (def tablet in tablets) { @@ -148,7 +84,7 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { } } - def run_test = { tableName -> + def run_test = { tableName -> sql """ INSERT INTO ${tableName} VALUES (1, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """ sql """ INSERT INTO ${tableName} VALUES (2, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """ sql """ INSERT INTO ${tableName} VALUES (3, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """ @@ -178,15 +114,13 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { assert (rowsetCount == 11 * replicaNum) // first - trigger_full_compaction_on_tablets.call(tablets) - wait_full_compaction_done.call(tablets) + trigger_and_wait_compaction(tableName, "full", 300, new String[]{"e-6010"}) rowsetCount = get_rowset_count.call(tablets); assert (rowsetCount == 11 * replicaNum) // second - trigger_full_compaction_on_tablets.call(tablets) - wait_full_compaction_done.call(tablets) + trigger_and_wait_compaction(tableName, "full", 300, new String[]{"e-6010"}) rowsetCount = get_rowset_count.call(tablets); if (isCloudMode) { @@ -202,7 +136,7 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { String backend_id; backend_id = backendId_to_backendIP.keySet()[0] def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) - + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def configList = parseJson(out.trim()) @@ -226,15 +160,9 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") { GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader") } - // try { - // GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer") - // run_test.call(tableName2) - // } finally { - // GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer") - // } } finally { if (has_update_be_config) { set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString()) } } -} \ No newline at end of file +} diff --git a/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy b/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy index ce12d1ede0c..4276edfdedf 100644 --- a/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy +++ b/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy @@ -40,21 +40,25 @@ suite("test_index_change_with_cumulative_compaction", "nonConcurrent") { assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout") } - def trigger_compaction_with_retry = {table_name, compaction_type = "cumulative", max_retries = 10, delay_ms = 2000 -> - def retry_count = 0 - while (true) { - try { - trigger_and_wait_compaction(table_name, compaction_type) - return // Success - } catch (Exception e) { - retry_count++ - if (retry_count >= max_retries) { - throw new Exception("Failed to complete ${compaction_type} compaction after ${max_retries} attempts", e) + def wait_for_build_index_on_partition_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}";""" + def expected_finished_num = alter_res.size(); + def finished_num = 0; + for (int i = 0; i < expected_finished_num; i++) { + logger.info(table_name + " build index job state: " + alter_res[i][7] + i) + if (alter_res[i][7] == "FINISHED") { + ++finished_num; } - logger.warn("Compaction attempt ${retry_count} failed: ${e.getMessage()}") - Thread.sleep(delay_ms) } + if (finished_num == expected_finished_num) { + logger.info(table_name + " all build index jobs finished, detail: " + alter_res) + break + } + useTime = t + sleep(delta_time) } + assertTrue(useTime <= OpTimeout, "wait_for_latest_build_index_on_partition_finish timeout") } try { @@ -162,12 +166,15 @@ suite("test_index_change_with_cumulative_compaction", "nonConcurrent") { // build index if (!isCloudMode()) { sql "build index idx_user_id on ${tableName}" + wait_for_build_index_on_partition_finish(tableName, timeout) sql "build index idx_date on ${tableName}" + wait_for_build_index_on_partition_finish(tableName, timeout) sql "build index idx_city on ${tableName}" + wait_for_build_index_on_partition_finish(tableName, timeout) } // trigger compactions for all tablets in ${tableName} - trigger_compaction_with_retry(tableName, "cumulative") + trigger_and_wait_compaction(tableName, "cumulative") int rowCount = 0 for (def tablet in tablets) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org