This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 85077dcea4ab525f3f198baa9ace1bfef59642e7 Author: qiye <jianliang5...@gmail.com> AuthorDate: Fri Jun 14 10:25:46 2024 +0800 [improvement](build index)Make build index and clone mutually exclusive and add timeout for index change job (#35724) Currently the index change job and clone task can be executed at the same time. If the clone task gets stuck at this point, it will cause the index change job to get stuck as well and keep retrying. To solve this problem, we can refer to alter job and make index change job exclusive with clone task, and introduce the timeout to prevent infinite retries of build index. Add the following checks and status in FE. 1. Check if table is stable (build index is not allowed when clone is in progress) 1.1. Tablet is HEALTHY. 1.2. Whether the tablet is included in the Tablet scheduler, if so, it means the current tablet is doing clone. 2. When creating the index change job, set the timeout at the same time. --------- Co-authored-by: Luennng <luen...@gmail.com> --- .../org/apache/doris/alter/IndexChangeJob.java | 76 ++++++++++++++++- .../apache/doris/alter/SchemaChangeHandler.java | 3 +- .../test_build_index_with_clone_fault.groovy | 96 +++++++++++++++++++++ .../test_build_index_with_clone_by_docker.groovy | 97 ++++++++++++++++++++++ 4 files changed, 269 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java index 57a5f9d9ec6..4cdd96a5f21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java @@ -37,6 +37,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterInvertedIndexTask; @@ -46,6 +47,7 @@ import org.apache.doris.thrift.TTaskType; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,6 +56,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.Map; public class IndexChangeJob implements Writable { @@ -106,6 +109,11 @@ public class IndexChangeJob implements Writable { private long originIndexId; @SerializedName(value = "invertedIndexBatchTask") AgentBatchTask invertedIndexBatchTask = new AgentBatchTask(); + // save failed task after retry three times, tablet -> backends + @SerializedName(value = "failedTabletBackends") + protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap(); + @SerializedName(value = "timeoutMs") + protected long timeoutMs = -1; public IndexChangeJob() { this.jobId = -1; @@ -117,7 +125,7 @@ public class IndexChangeJob implements Writable { this.jobState = JobState.WAITING_TXN; } - public IndexChangeJob(long jobId, long dbId, long tableId, String tableName) throws Exception { + public IndexChangeJob(long jobId, long dbId, long tableId, String tableName, long timeoutMs) throws Exception { this.jobId = jobId; this.dbId = dbId; this.tableId = tableId; @@ -126,6 +134,7 @@ public class IndexChangeJob implements Writable { this.createTimeMs = System.currentTimeMillis(); this.jobState = JobState.WAITING_TXN; this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId(); + this.timeoutMs = timeoutMs; } public long getJobId() { @@ -206,6 +215,10 @@ public class IndexChangeJob implements Writable { this.finishedTimeMs = finishedTimeMs; } + public boolean isTimeout() { + return System.currentTimeMillis() - createTimeMs > timeoutMs; + } + /** * The keyword 'synchronized' only protects 2 methods: * run() and cancel() @@ -217,6 +230,10 @@ public class IndexChangeJob implements Writable { * db lock */ public synchronized void run() { + if (isTimeout()) { + cancelImpl("Timeout"); + return; + } try { switch (jobState) { case WAITING_TXN: @@ -237,6 +254,31 @@ public class IndexChangeJob implements Writable { return cancelImpl(errMsg); } + /** + * should be called before executing the job. + * return false if table is not stable. + */ + protected boolean checkTableStable(OlapTable tbl) throws AlterCancelException { + tbl.writeLockOrAlterCancelException(); + try { + boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(), + Env.getCurrentEnv().getTabletScheduler()); + + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("wait table {} to be stable before doing index change job", tableId); + return false; + } else { + // table is stable + LOG.info("table {} is stable, start index change job {}", tableId, jobId); + errMsg = ""; + return true; + } + } finally { + tbl.writeUnlock(); + } + } + // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() throws AnalysisException { return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( @@ -265,6 +307,10 @@ public class IndexChangeJob implements Writable { throw new AlterCancelException(e.getMessage()); } + if (!checkTableStable(olapTable)) { + return; + } + olapTable.readLock(); try { List<Column> originSchemaColumns = olapTable.getSchemaByIndexId(originIndexId, true); @@ -307,10 +353,36 @@ public class IndexChangeJob implements Writable { protected void runRunningJob() throws AlterCancelException { Preconditions.checkState(jobState == JobState.RUNNING, jobState); + // must check if db or table still exist first. + // or if table is dropped, the tasks will never be finished, + // and the job will be in RUNNING state forever. + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + OlapTable tbl; + try { + tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + } catch (MetaNotFoundException e) { + throw new AlterCancelException(e.getMessage()); + } if (!invertedIndexBatchTask.isFinished()) { LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId); - // TODO: task failed limit + List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000); + for (AgentTask task : tasks) { + if (task.getFailedTimes() > 3) { + LOG.warn("alter inverted index task failed: " + task.getErrorMsg()); + List<Long> failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(), + k -> Lists.newArrayList()); + failedBackends.add(task.getBackendId()); + int expectSucceedTaskNum = tbl.getPartitionInfo() + .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); + int failedTaskCount = failedBackends.size(); + if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { + throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold " + + failedTaskCount); + } + } + } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index cd1632faf8f..b214afe387e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2987,13 +2987,14 @@ public class SchemaChangeHandler extends AlterHandler { } try { + long timeoutSecond = Config.alter_table_timeout_second; for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) { long originIndexId = entry.getKey(); for (Partition partition : olapTable.getPartitions()) { // create job long jobId = Env.getCurrentEnv().getNextId(); IndexChangeJob indexChangeJob = new IndexChangeJob( - jobId, db.getId(), olapTable.getId(), olapTable.getName()); + jobId, db.getId(), olapTable.getId(), olapTable.getName(), timeoutSecond * 1000); indexChangeJob.setOriginIndexId(originIndexId); indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes); long partitionId = partition.getId(); diff --git a/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy new file mode 100644 index 00000000000..07fcfbe1a94 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_build_index_with_clone_fault_injection", "nonConcurrent"){ + if (isCloudMode()) { + return + } + def backends = sql_return_maparray('show backends') + // if backens is less than 2, skip this case + if (backends.size() < 2) { + return + } + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def tbl = 'test_build_index_with_clone' + try { + GetDebugPoint().enableDebugPointForAllBEs("EngineCloneTask.wait_clone") + logger.info("add debug point EngineCloneTask.wait_clone") + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } + + sql """ sync """ + + // get tablets and set replica status to DROP + def tablet = sql_return_maparray("show tablets from ${tbl}")[0] + sql """ + ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop"); + """ + // create index on table + sql """ create index idx_k2 on ${tbl}(k2) using inverted """ + sql """ build index idx_k2 on ${tbl} """ + // sleep 5s to wait for the build index job report table is unstable + sleep(5000) + def show_build_index = sql_return_maparray("show build index where TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1") + assertEquals('WAITING_TXN', show_build_index[0].State) + assertEquals('table is unstable', show_build_index[0].Msg) + + def state = wait_for_last_build_index_on_table_finish(tbl, timeout) + assertEquals(state, "FINISHED") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("EngineCloneTask.wait_clone") + } +} diff --git a/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy new file mode 100644 index 00000000000..f8478c3ea61 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_build_index_with_clone_by_docker"){ + if (isCloudMode()) { + return + } + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(1) + options.setBeNum(3) + options.cloudMode = false + def tbl = 'test_build_index_with_clone_by_docker' + docker(options) { + cluster.injectDebugPoints(NodeType.BE, ['EngineCloneTask.wait_clone' : null]) + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } + + sql """ sync """ + + // get tablets and set replica status to DROP + def tablet = sql_return_maparray("show tablets from ${tbl}")[0] + sql """ + ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop"); + """ + // create index on table + sql """ create index idx_k2 on ${tbl}(k2) using inverted """ + sql """ build index idx_k2 on ${tbl} """ + // sleep 5s to wait for the build index job report table is unstable + sleep(5000) + def show_build_index = sql_return_maparray("show build index where TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1") + assertEquals('WAITING_TXN', show_build_index[0].State) + assertEquals('table is unstable', show_build_index[0].Msg) + + def state = wait_for_last_build_index_on_table_finish(tbl, timeout) + assertEquals(state, "FINISHED") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org