cosven commented on code in PR #49204: URL: https://github.com/apache/doris/pull/49204#discussion_r2030527777
########## regression-test/suites/fault_injection_p0/cloud/test_delete_bitmap_lock_with_restart.groovy: ########## @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_delete_bitmap_lock_with_restart", "docker") { + if (!isCloudMode()) { + return + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.cloudMode = true + + def customFeConfig1 = [meta_service_rpc_retry_times: 5] + def tableName = "tbl_basic" + def do_stream_load = { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + } + //1. load + docker(options) { + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(10) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + do_stream_load() + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep", [percent: "1.0", sleep: "15"]) + Thread.startDaemon { + do_stream_load() + } + // 1. load + restart fe + cluster.restartFrontends() + def now = System.currentTimeMillis() + do_stream_load() + def time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost > 30000, "wait time should bigger than 30s") + + // 2. load + restart be + + Thread.startDaemon { + do_stream_load() + } + cluster.restartBackends() + now = System.currentTimeMillis() + do_stream_load() + time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost < 10000, "wait time should bigger than 10s") + } + //2. compaction + options.beConfigs += [ + 'delete_bitmap_lock_expiration_seconds=60', + ] + docker(options) { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(100) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + if (code == 0) { + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } else { + break + } + } while (running) + } + + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(10) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA", 15);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, "BBB", 25);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, "CCC", 35);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4, "DDD", 45);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5, "EEE", 55);""" + + GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", [percent: "1.0", sleep: "10"]) + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + logger.info("tablets: " + tablets) + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + String trigger_backend_id = tablet.BackendId + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")) + + } + // 1. compaction + restart fe + cluster.restartFrontends() + def now = System.currentTimeMillis() + do_stream_load() + def time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost < 10000, "wait time should less than 10s") + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + } + sleep(30000) + context.reconnectFe() + // 2. compaction + restart be + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA", 15);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, "BBB", 25);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, "CCC", 35);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4, "DDD", 45);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5, "EEE", 55);""" + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + String trigger_backend_id = tablet.BackendId + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")) + + } + cluster.restartBackends() + now = System.currentTimeMillis() + do_stream_load() + time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost > 10000, "wait time should less than 10s") + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + } + } + //3. sc + docker(options) { + def getJobState = { + def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName='${tableName}' ORDER BY createtime DESC LIMIT 1" + assert res.size() == 1 + log.info("res:" + res[0].State) + return res[0].State + } + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(10) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA", 15);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, "BBB", 25);""" + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, "CCC", 35);""" + GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", [percent: "1.0", sleep: "10"]) + sql "alter table ${tableName} modify column score varchar(100);" + // 1. sc + restart fe + cluster.restartFrontends() + context.reconnectFe() + for (int i = 0; i < 30; i++) { + log.info("i: ${i}") + try { + def now = System.currentTimeMillis() + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA", 15);""" + def time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost < 10000, "wait time should less than 10s") + break + } catch (Exception e) { + log.info("Exception:" + e) + Thread.sleep(2000) + } + } + int max_try_time = 30 + while (max_try_time--) { + def result = getJobState(tableName) + if (result == "FINISHED" || result == "CANCELLED") { + break + } else { + Thread.sleep(1000) + } + } + // 2. sc + restart be + sql "alter table ${tableName} modify column score varchar(200);" + cluster.restartBackends() + def now = System.currentTimeMillis() + do_stream_load() + def time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost > 10000, "wait time should less than 10s") Review Comment:  `do_stream_load` asserts that the duration is less than 10s. Here, the test case asserts that the `time_cost` is bigger than 10s. Can this test case pass? ########## regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.groovy: ########## @@ -0,0 +1,784 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_mow_delete_bitmap_lock_case", "nonConcurrent") { + if (!isCloudMode()) { + return + } + GetDebugPoint().clearDebugPointsForAllFEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig1 = [calculate_delete_bitmap_task_timeout_seconds: 2, meta_service_rpc_retry_times: 5] + def customFeConfig2 = [delete_bitmap_lock_expiration_seconds: 2, meta_service_rpc_retry_times: 5] + def customFeConfig3 = [mow_calculate_delete_bitmap_retry_times: 1] + def customFeConfig4 = [calculate_delete_bitmap_task_timeout_seconds: 2, mow_calculate_delete_bitmap_retry_times: 1] + def customFeConfig5 = [meta_service_rpc_retry_times: 5] + def tableName = "tbl_basic" + String[][] backends = sql """ show backends """ + assertTrue(backends.size() > 0) + String backendId; + def backendIdToBackendIP = [:] + def backendIdToBackendBrpcPort = [:] + for (String[] backend in backends) { + if (backend[9].equals("true")) { + backendIdToBackendIP.put(backend[0], backend[1]) + backendIdToBackendBrpcPort.put(backend[0], backend[5]) + } + } + + backendId = backendIdToBackendIP.keySet()[0] + def getMetricsMethod = { check_func -> + httpTest { + endpoint backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendBrpcPort.get(backendId) + uri "/brpc_metrics" + op "get" + check check_func + } + } + + int total_retry = 0; + int last_total_retry = -1; + + def getTotalRetry = { + getMetricsMethod.call() { respCode, body -> + logger.info("get total retry resp Code {}", "${respCode}".toString()) + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + for (String line in strs) { + if (line.startsWith("stream_load_commit_retry_counter")) { + logger.info("find: {}", line) + total_retry = line.replaceAll("stream_load_commit_retry_counter ", "").toInteger() + if (last_total_retry < 0) { + last_total_retry = total_retry + } + break + } + } + } + } + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(100) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def do_stream_load = { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + } + + def do_insert_into = { + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "Emily", 25),(2, "Benjamin", 35);""" + } + + def getAlterTableState = { table_name -> + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${table_name}' ORDER BY createtime DESC LIMIT 1 """ + time 600 + } + return true + } + + def waitForSC = { + Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> { + def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName='${tableName}' ORDER BY createtime DESC LIMIT 1" + assert res.size() == 1 + if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") { + return true; + } + return false; + }); + } + + try { + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.check.lock.release', null) + getTotalRetry.call() + log.info("last_total_retry:" + last_total_retry) + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + set_be_param("mow_stream_load_commit_retry_times", "2") + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(10) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // 1.test normal load, lock is released normally, retry times is 0 + // 1.1 first load success + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load0.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + } + qt_sql1 """ select * from ${tableName} order by id""" + + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + // 1.2 second load success + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load1.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql2 """ select * from ${tableName} order by id""" + + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + + + //2. test commit fail, lock is released normally, will not retry + // 2.1 first load will fail on fe commit phase + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load2.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("FE.mow.commit.exception")) + } + } + qt_sql3 """ select * from ${tableName} order by id""" + + // commit fail is not DELETE_BITMAP_LOCK_ERR will not retry + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + + // 2.2 second load will success because of removing exception injection + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load2.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql4 """ select * from ${tableName} order by id""" + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + + // 3. test update delete bitmap fail, lock is released normally, will retry + setFeConfigTemporary(customFeConfig2) { + // 3.1 first load will fail on calculate delete bitmap timeout + GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail") + + def now = System.currentTimeMillis() + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load3.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("update delete bitmap failed")) + } + } + def time_cost = System.currentTimeMillis() - now + getTotalRetry.call() + assertEquals(last_total_retry + 2, total_retry) + qt_sql5 """ select * from ${tableName} order by id""" + + // 3.2 second load will success because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load3.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + getTotalRetry.call() + assertEquals(last_total_retry + 2, total_retry) + qt_sql6 """ select * from ${tableName} order by id""" + } + + //4. test wait fe lock timeout, will retry + setFeConfigTemporary(customFeConfig1) { + get_be_param("txn_commit_rpc_timeout_ms") + set_be_param("txn_commit_rpc_timeout_ms", "10000") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout", [sleep_time: 5]) + // 4.1 first load will fail, because of waiting for fe lock timeout + def now = System.currentTimeMillis() + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load4.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("get table cloud commit lock timeout")) + } + } + def time_cost = System.currentTimeMillis() - now + getTotalRetry.call() + assertEquals(last_total_retry + 4, total_retry) + assertTrue(time_cost > 10000, "wait time should bigger than total retry interval") + qt_sql7 """ select * from ${tableName} order by id""" + + // 4.2 second load will success because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load4.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + getTotalRetry.call() + assertEquals(last_total_retry + 4, total_retry) + qt_sql8 """ select * from ${tableName} order by id""" + reset_be_param("txn_commit_rpc_timeout_ms") + } + //5. test wait delete bitmap lock timeout, lock is released normally, will retry + GetDebugPoint().enableDebugPointForAllFEs("FE.mow.get_delete_bitmap_lock.fail") + // 5.1 first load will fail, because of waiting for delete bitmap lock timeout + setFeConfigTemporary(customFeConfig1) { + def now = System.currentTimeMillis() + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load5.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("test get_delete_bitmap_lock fail")) + } + } + def time_cost = System.currentTimeMillis() - now + getTotalRetry.call() + assertEquals(last_total_retry + 6, total_retry) + qt_sql9 """ select * from ${tableName} order by id""" + + // 5.2 second load will success because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllFEs("FE.mow.get_delete_bitmap_lock.fail") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load5.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + getTotalRetry.call() + assertEquals(last_total_retry + 6, total_retry) + qt_sql10 """ select * from ${tableName} order by id""" + } + + //6.test calculate delete bitmap task timeout, after retry, will succeed + setFeConfigTemporary(customFeConfig1) { + // 6.1 first load will retry because of calculating delete bitmap timeout, and finally succeed + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + + def now = System.currentTimeMillis() + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load6.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + def time_cost = System.currentTimeMillis() - now + getTotalRetry.call() + assertEquals(last_total_retry + 7, total_retry) + assertTrue(time_cost > 2000, "wait time should bigger than total retry interval") + qt_sql11 """ select * from ${tableName} order by id""" + + // 6.2 second load will success and no need retry because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + getTotalRetry.call() + assertEquals(last_total_retry + 7, total_retry) + qt_sql12 """ select * from ${tableName} order by id""" + } + + //7. test parallel load + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.check.lock.release') + setFeConfigTemporary(customFeConfig2) { + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + def threads = [] + def now = System.currentTimeMillis() + for (int k = 0; k <= 1; k++) { + logger.info("start load thread:" + k) + threads.add(Thread.startDaemon { + do_stream_load() + }) + } + for (Thread th in threads) { + th.join() + } + def time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost > 6000, "wait time should bigger than 6s") + + threads = [] + now = System.currentTimeMillis() + for (int k = 0; k <= 1; k++) { + logger.info("start insert into thread:" + k) + threads.add(Thread.startDaemon { + do_insert_into() + }) + } + for (Thread th in threads) { + th.join() + } + time_cost = System.currentTimeMillis() - now + log.info("time_cost(ms): ${time_cost}") + assertTrue(time_cost > 6000, "wait time should bigger than 6s") + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + + } + //8. test insert into timeout config + setFeConfigTemporary(customFeConfig3) { + try { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout", [sleep_time: 15]) + sql """ set global insert_visible_timeout_ms=15000; """ + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "Emily", 25),(2, "Benjamin", 35);""" Review Comment: Add an assertion to check that the insert statement fails in about 15s. ########## regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.groovy: ########## @@ -0,0 +1,784 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_mow_delete_bitmap_lock_case", "nonConcurrent") { + if (!isCloudMode()) { + return + } + GetDebugPoint().clearDebugPointsForAllFEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig1 = [calculate_delete_bitmap_task_timeout_seconds: 2, meta_service_rpc_retry_times: 5] + def customFeConfig2 = [delete_bitmap_lock_expiration_seconds: 2, meta_service_rpc_retry_times: 5] + def customFeConfig3 = [mow_calculate_delete_bitmap_retry_times: 1] + def customFeConfig4 = [calculate_delete_bitmap_task_timeout_seconds: 2, mow_calculate_delete_bitmap_retry_times: 1] + def customFeConfig5 = [meta_service_rpc_retry_times: 5] + def tableName = "tbl_basic" + String[][] backends = sql """ show backends """ + assertTrue(backends.size() > 0) + String backendId; + def backendIdToBackendIP = [:] + def backendIdToBackendBrpcPort = [:] + for (String[] backend in backends) { + if (backend[9].equals("true")) { + backendIdToBackendIP.put(backend[0], backend[1]) + backendIdToBackendBrpcPort.put(backend[0], backend[5]) + } + } + + backendId = backendIdToBackendIP.keySet()[0] + def getMetricsMethod = { check_func -> + httpTest { + endpoint backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendBrpcPort.get(backendId) + uri "/brpc_metrics" + op "get" + check check_func + } + } + + int total_retry = 0; + int last_total_retry = -1; + + def getTotalRetry = { + getMetricsMethod.call() { respCode, body -> + logger.info("get total retry resp Code {}", "${respCode}".toString()) + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + for (String line in strs) { + if (line.startsWith("stream_load_commit_retry_counter")) { + logger.info("find: {}", line) + total_retry = line.replaceAll("stream_load_commit_retry_counter ", "").toInteger() + if (last_total_retry < 0) { + last_total_retry = total_retry + } + break + } + } + } + } + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(100) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def do_stream_load = { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + } + + def do_insert_into = { + sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "Emily", 25),(2, "Benjamin", 35);""" + } + + def getAlterTableState = { table_name -> + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${table_name}' ORDER BY createtime DESC LIMIT 1 """ + time 600 + } + return true + } + + def waitForSC = { + Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> { + def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName='${tableName}' ORDER BY createtime DESC LIMIT 1" + assert res.size() == 1 + if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") { + return true; + } + return false; + }); + } + + try { + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.check.lock.release', null) + getTotalRetry.call() + log.info("last_total_retry:" + last_total_retry) + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + set_be_param("mow_stream_load_commit_retry_times", "2") + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(10) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // 1.test normal load, lock is released normally, retry times is 0 + // 1.1 first load success + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load0.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + } + qt_sql1 """ select * from ${tableName} order by id""" + + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + // 1.2 second load success + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load1.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql2 """ select * from ${tableName} order by id""" + + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + + + //2. test commit fail, lock is released normally, will not retry + // 2.1 first load will fail on fe commit phase + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load2.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("FE.mow.commit.exception")) + } + } + qt_sql3 """ select * from ${tableName} order by id""" + + // commit fail is not DELETE_BITMAP_LOCK_ERR will not retry + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + + // 2.2 second load will success because of removing exception injection + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load2.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql4 """ select * from ${tableName} order by id""" + getTotalRetry.call() + assertEquals(last_total_retry, total_retry) + + // 3. test update delete bitmap fail, lock is released normally, will retry + setFeConfigTemporary(customFeConfig2) { + // 3.1 first load will fail on calculate delete bitmap timeout + GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail") + + def now = System.currentTimeMillis() + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load3.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("update delete bitmap failed")) + } + } + def time_cost = System.currentTimeMillis() - now + getTotalRetry.call() + assertEquals(last_total_retry + 2, total_retry) + qt_sql5 """ select * from ${tableName} order by id""" + + // 3.2 second load will success because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load3.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + getTotalRetry.call() + assertEquals(last_total_retry + 2, total_retry) + qt_sql6 """ select * from ${tableName} order by id""" + } + + //4. test wait fe lock timeout, will retry + setFeConfigTemporary(customFeConfig1) { + get_be_param("txn_commit_rpc_timeout_ms") + set_be_param("txn_commit_rpc_timeout_ms", "10000") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout", [sleep_time: 5]) + // 4.1 first load will fail, because of waiting for fe lock timeout + def now = System.currentTimeMillis() + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load4.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("get table cloud commit lock timeout")) + } + } + def time_cost = System.currentTimeMillis() - now + getTotalRetry.call() + assertEquals(last_total_retry + 4, total_retry) + assertTrue(time_cost > 10000, "wait time should bigger than total retry interval") Review Comment: Also add an assertion to check that the `time_cost` should be **almost** equal to the `txn_commit_rpc_timeout_ms`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org