This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 80f25cbf7553e30ca79bfd88b51f58584cf4b7b0 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Sun Apr 14 09:44:43 2024 +0800 [fix](schema change) follow fe set sc fail replicas as bad (#33569) --- be/src/olap/schema_change.cpp | 6 ++ .../java/org/apache/doris/alter/AlterHandler.java | 1 + .../java/org/apache/doris/alter/AlterJobV2.java | 5 ++ .../java/org/apache/doris/alter/RollupJobV2.java | 27 ++++--- .../org/apache/doris/alter/SchemaChangeJobV2.java | 27 ++++--- .../test_schema_change_fail.groovy | 94 ++++++++++++++++++++++ 6 files changed, 138 insertions(+), 22 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 3235220b21b..e753da43454 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -794,6 +794,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& break; } + DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", { + LOG(WARNING) << "inject alter tablet failed. base_tablet=" << request.base_tablet_id + << ", new_tablet=" << request.new_tablet_id; + break; + }); + // should check the max_version >= request.alter_version, if not the convert is useless if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { res = Status::InternalError( diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 43ad9de5ffd..030fd17452d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -269,6 +269,7 @@ public abstract class AlterHandler extends MasterDaemon { alterJob.replay(alterJob); alterJobsV2.put(alterJob.getJobId(), alterJob); } else { + existingJob.failedTabletBackends = alterJob.failedTabletBackends; existingJob.replay(alterJob); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index c1a62023042..3574b1e6e6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +37,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.IOException; import java.util.List; +import java.util.Map; /* * Version 2 of AlterJob, for replacing the old version of AlterJob. @@ -94,6 +96,9 @@ public abstract class AlterJobV2 implements Writable { @SerializedName(value = "watershedTxnId") protected long watershedTxnId = -1; + // save failed task after retry three times, tablet -> backends + @SerializedName(value = "failedTabletBackends") + protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap(); public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 61e4496a358..8ad67cfd92e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -135,8 +135,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); - // save failed task after retry three times, tabletId -> agentTask - private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap(); private Analyzer analyzer; @@ -517,17 +515,18 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); LOG.warn("rollup task failed: " + task.getErrorMsg()); - if (!failedAgentTasks.containsKey(task.getTabletId())) { - failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task)); - } else { - failedAgentTasks.get(task.getTabletId()).add(task); + List<Long> failedBackends = failedTabletBackends.get(task.getTabletId()); + if (failedBackends == null) { + failedBackends = Lists.newArrayList(); + failedTabletBackends.put(task.getTabletId(), failedBackends); } + failedBackends.add(task.getBackendId()); int expectSucceedTaskNum = tbl.getPartitionInfo() .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); - int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); + int failedTaskCount = failedBackends.size(); if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { throw new AlterCancelException("rollup tasks failed on same tablet reach threshold " - + failedAgentTasks.get(task.getTabletId()) + ", reason=" + task.getErrorMsg()); + + failedTaskCount + ", reason=" + task.getErrorMsg()); } } } @@ -542,9 +541,11 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (List<AgentTask> tasks : failedAgentTasks.values()) { - for (AgentTask task : tasks) { - invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true); + for (Map.Entry<Long, List<Long>> entry : failedTabletBackends.entrySet()) { + long tabletId = entry.getKey(); + List<Long> failedBackends = entry.getValue(); + for (long backendId : failedBackends) { + invertedIndex.getReplica(tabletId, backendId).setBad(true); } } for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { @@ -594,8 +595,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { MaterializedIndex rollupIndex = partition.getIndex(rollupIndexId); Preconditions.checkNotNull(rollupIndex, rollupIndexId); for (Tablet tablet : rollupIndex.getTablets()) { + List<Long> failedBackends = failedTabletBackends.get(tablet.getId()); for (Replica replica : tablet.getReplicas()) { replica.setState(ReplicaState.NORMAL); + if (failedBackends != null && failedBackends.contains(replica.getBackendId())) { + replica.setBad(true); + } } } partition.visualiseShadowIndex(rollupIndexId, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 33e605c7af4..a71fa4208ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -131,8 +131,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // save all schema change tasks private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask(); - // save failed task after retry three times, tabletId -> agentTask - private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap(); private SchemaChangeJobV2() { super(JobType.SCHEMA_CHANGE); @@ -516,17 +514,18 @@ public class SchemaChangeJobV2 extends AlterJobV2 { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); LOG.warn("schema change task failed: " + task.getErrorMsg()); - if (!failedAgentTasks.containsKey(task.getTabletId())) { - failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task)); - } else { - failedAgentTasks.get(task.getTabletId()).add(task); + List<Long> failedBackends = failedTabletBackends.get(task.getTabletId()); + if (failedBackends == null) { + failedBackends = Lists.newArrayList(); + failedTabletBackends.put(task.getTabletId(), failedBackends); } + failedBackends.add(task.getBackendId()); int expectSucceedTaskNum = tbl.getPartitionInfo() .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); - int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); + int failedTaskCount = failedBackends.size(); if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { throw new AlterCancelException("schema change tasks failed on same tablet reach threshold " - + failedAgentTasks.get(task.getTabletId())); + + failedTaskCount); } } } @@ -544,9 +543,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 { try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (List<AgentTask> tasks : failedAgentTasks.values()) { - for (AgentTask task : tasks) { - invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true); + for (Map.Entry<Long, List<Long>> entry : failedTabletBackends.entrySet()) { + long tabletId = entry.getKey(); + List<Long> failedBackends = entry.getValue(); + for (long backendId : failedBackends) { + invertedIndex.getReplica(tabletId, backendId).setBad(true); } } for (long partitionId : partitionIndexMap.rowKeySet()) { @@ -620,8 +621,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // set replica state for (Tablet tablet : shadowIdx.getTablets()) { + List<Long> failedBackends = failedTabletBackends.get(tablet.getId()); for (Replica replica : tablet.getReplicas()) { replica.setState(ReplicaState.NORMAL); + if (failedBackends != null && failedBackends.contains(replica.getBackendId())) { + replica.setBad(true); + } } } diff --git a/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy new file mode 100644 index 00000000000..f7ec1b42f0c --- /dev/null +++ b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_fail', 'nonConcurrent') { + if (isCloudMode()) { + return + } + + def frontends = sql_return_maparray('show frontends') + def backends = sql_return_maparray('show backends') + def forceReplicaNum = getFeConfig('force_olap_table_replication_num').toInteger() + if (frontends.size() < 2 || backends.size() < 3 || forceReplicaNum == 1) { + return + } + + def tbl = 'test_schema_change_fail' + + def beId = backends[0].BackendId.toLong() + def beHost = backends[0].Host + def beHttpPort = backends[0].HttpPort.toInteger() + def injectName = 'SchemaChangeJob.process_alter_tablet.alter_fail' + + def checkReplicaBad = { -> + def tabletId = sql_return_maparray("SHOW TABLETS FROM ${tbl}")[0].TabletId.toLong() + def replicas = sql_return_maparray(sql_return_maparray("SHOW TABLET ${tabletId}").DetailCmd) + assertEquals(backends.size(), replicas.size()) + for (def replica : replicas) { + if (replica.BackendId.toLong() == beId) { + assertEquals(true, replica.IsBad.toBoolean()) + } + } + } + + def followFe = frontends.stream().filter(fe -> !fe.IsMaster.toBoolean()).findFirst().orElse(null) + def followFeUrl = "jdbc:mysql://${followFe.Host}:${followFe.QueryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false" + followFeUrl = context.config.buildUrlWithDb(followFeUrl, context.dbName) + + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + sql """ + CREATE TABLE ${tbl} + ( + `a` TINYINT NOT NULL, + `b` TINYINT NULL + ) + UNIQUE KEY (`a`) + DISTRIBUTED BY HASH(`a`) BUCKETS 1 + PROPERTIES + ( + 'replication_num' = '${backends.size()}', + 'light_schema_change' = 'false' + ) + """ + + sql "INSERT INTO ${tbl} VALUES (1, 2), (3, 4)" + + try { + DebugPoint.enableDebugPoint(beHost, beHttpPort, NodeType.BE, injectName) + setFeConfig('disable_tablet_scheduler', true) + + sleep(1000) + sql "ALTER TABLE ${tbl} MODIFY COLUMN b DOUBLE" + sleep(5 * 1000) + + def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName = '${tbl}' ORDER BY CreateTime DESC LIMIT 1" + assertEquals(1, jobs.size()) + assertEquals('FINISHED', jobs[0].State) + + checkReplicaBad() + connect('root', '', followFeUrl) { + checkReplicaBad() + } + } finally { + DebugPoint.disableDebugPoint(beHost, beHttpPort, NodeType.BE, injectName) + setFeConfig('disable_tablet_scheduler', false) + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org