This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 971aa72dea5 [fix](schema change) follow fe set sc fail replicas as bad #33569 (#34040) 971aa72dea5 is described below commit 971aa72dea5e93c90572b7d892abad564fd4169d Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Sat Apr 27 13:59:22 2024 +0800 [fix](schema change) follow fe set sc fail replicas as bad #33569 (#34040) --- be/src/olap/schema_change.cpp | 11 ++- .../java/org/apache/doris/alter/AlterHandler.java | 1 + .../java/org/apache/doris/alter/AlterJobV2.java | 6 ++ .../java/org/apache/doris/alter/RollupJobV2.java | 27 ++++--- .../org/apache/doris/alter/SchemaChangeJobV2.java | 27 ++++--- .../test_schema_change_fail.groovy | 90 ++++++++++++++++++++++ 6 files changed, 139 insertions(+), 23 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 84ecbb10c60..4ea1ccbc3ff 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -790,11 +790,20 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& do { RowsetSharedPtr max_rowset; // get history data to be converted and it will check if there is hold in base tablet - if (!_get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset)) { + res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset); + if (!res) { LOG(WARNING) << "fail to get version to be changed. res=" << res; break; } + DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", { + res = Status::InternalError( + "inject alter tablet failed. base_tablet={}, new_tablet={}", + request.base_tablet_id, request.new_tablet_id); + LOG(WARNING) << "inject error. res=" << res; + break; + }); + // should check the max_version >= request.alter_version, if not the convert is useless if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { res = Status::InternalError( diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 7ab2f8732d0..dbc43059e5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -269,6 +269,7 @@ public abstract class AlterHandler extends MasterDaemon { alterJob.replay(alterJob); alterJobsV2.put(alterJob.getJobId(), alterJob); } else { + existingJob.failedTabletBackends = alterJob.failedTabletBackends; existingJob.replay(alterJob); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index 29bf3d994ed..78f34c2cccd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +37,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.IOException; import java.util.List; +import java.util.Map; /* * Version 2 of AlterJob, for replacing the old version of AlterJob. @@ -90,6 +92,10 @@ public abstract class AlterJobV2 implements Writable { @SerializedName(value = "rawSql") protected String rawSql; + // save failed task after retry three times, tablet -> backends + @SerializedName(value = "failedTabletBackends") + protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap(); + public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { this.rawSql = rawSql; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 482944acf8b..c127bd4b8d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -140,8 +140,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); - // save failed task after retry three times, tabletId -> agentTask - private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap(); private Analyzer analyzer; @@ -529,17 +527,18 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); LOG.warn("rollup task failed: " + task.getErrorMsg()); - if (!failedAgentTasks.containsKey(task.getTabletId())) { - failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task)); - } else { - failedAgentTasks.get(task.getTabletId()).add(task); + List<Long> failedBackends = failedTabletBackends.get(task.getTabletId()); + if (failedBackends == null) { + failedBackends = Lists.newArrayList(); + failedTabletBackends.put(task.getTabletId(), failedBackends); } + failedBackends.add(task.getBackendId()); int expectSucceedTaskNum = tbl.getPartitionInfo() .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); - int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); + int failedTaskCount = failedBackends.size(); if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { throw new AlterCancelException("rollup tasks failed on same tablet reach threshold " - + failedAgentTasks.get(task.getTabletId()) + ", reason=" + task.getErrorMsg()); + + failedTaskCount + ", reason=" + task.getErrorMsg()); } } } @@ -554,9 +553,11 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (List<AgentTask> tasks : failedAgentTasks.values()) { - for (AgentTask task : tasks) { - invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true); + for (Map.Entry<Long, List<Long>> entry : failedTabletBackends.entrySet()) { + long tabletId = entry.getKey(); + List<Long> failedBackends = entry.getValue(); + for (long backendId : failedBackends) { + invertedIndex.getReplica(tabletId, backendId).setBad(true); } } for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { @@ -606,8 +607,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { MaterializedIndex rollupIndex = partition.getIndex(rollupIndexId); Preconditions.checkNotNull(rollupIndex, rollupIndexId); for (Tablet tablet : rollupIndex.getTablets()) { + List<Long> failedBackends = failedTabletBackends.get(tablet.getId()); for (Replica replica : tablet.getReplicas()) { replica.setState(ReplicaState.NORMAL); + if (failedBackends != null && failedBackends.contains(replica.getBackendId())) { + replica.setBad(true); + } } } partition.visualiseShadowIndex(rollupIndexId, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index c1ecc202d79..d8c12066df6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -134,8 +134,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // save all schema change tasks private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask(); - // save failed task after retry three times, tabletId -> agentTask - private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap(); private SchemaChangeJobV2() { super(JobType.SCHEMA_CHANGE); @@ -520,17 +518,18 @@ public class SchemaChangeJobV2 extends AlterJobV2 { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); LOG.warn("schema change task failed: " + task.getErrorMsg()); - if (!failedAgentTasks.containsKey(task.getTabletId())) { - failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task)); - } else { - failedAgentTasks.get(task.getTabletId()).add(task); + List<Long> failedBackends = failedTabletBackends.get(task.getTabletId()); + if (failedBackends == null) { + failedBackends = Lists.newArrayList(); + failedTabletBackends.put(task.getTabletId(), failedBackends); } + failedBackends.add(task.getBackendId()); int expectSucceedTaskNum = tbl.getPartitionInfo() .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); - int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); + int failedTaskCount = failedBackends.size(); if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { throw new AlterCancelException("schema change tasks failed on same tablet reach threshold " - + failedAgentTasks.get(task.getTabletId())); + + failedTaskCount); } } } @@ -545,9 +544,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 { try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (List<AgentTask> tasks : failedAgentTasks.values()) { - for (AgentTask task : tasks) { - invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true); + for (Map.Entry<Long, List<Long>> entry : failedTabletBackends.entrySet()) { + long tabletId = entry.getKey(); + List<Long> failedBackends = entry.getValue(); + for (long backendId : failedBackends) { + invertedIndex.getReplica(tabletId, backendId).setBad(true); } } for (long partitionId : partitionIndexMap.rowKeySet()) { @@ -622,8 +623,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // set replica state for (Tablet tablet : shadowIdx.getTablets()) { + List<Long> failedBackends = failedTabletBackends.get(tablet.getId()); for (Replica replica : tablet.getReplicas()) { replica.setState(ReplicaState.NORMAL); + if (failedBackends != null && failedBackends.contains(replica.getBackendId())) { + replica.setBad(true); + } } } diff --git a/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy new file mode 100644 index 00000000000..8dc65365c39 --- /dev/null +++ b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_fail', 'p0,p2,nonConcurrent') { + def frontends = sql_return_maparray('show frontends') + def backends = sql_return_maparray('show backends') + def forceReplicaNum = getFeConfig('force_olap_table_replication_num').toInteger() + if (frontends.size() < 2 || backends.size() < 3 || forceReplicaNum == 1 || forceReplicaNum == 2) { + return + } + + def tbl = 'test_schema_change_fail' + + def beId = backends[0].BackendId.toLong() + def beHost = backends[0].Host + def beHttpPort = backends[0].HttpPort.toInteger() + def injectName = 'SchemaChangeJob.process_alter_tablet.alter_fail' + + def checkReplicaBad = { -> + def tabletId = sql_return_maparray("SHOW TABLETS FROM ${tbl}")[0].TabletId.toLong() + def replicas = sql_return_maparray(sql_return_maparray("SHOW TABLET ${tabletId}").DetailCmd) + assertEquals(backends.size(), replicas.size()) + for (def replica : replicas) { + if (replica.BackendId.toLong() == beId) { + assertEquals(true, replica.IsBad.toBoolean()) + } + } + } + + def followFe = frontends.stream().filter(fe -> !fe.IsMaster.toBoolean()).findFirst().orElse(null) + def followFeUrl = "jdbc:mysql://${followFe.Host}:${followFe.QueryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false" + followFeUrl = context.config.buildUrlWithDb(followFeUrl, context.dbName) + + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + sql """ + CREATE TABLE ${tbl} + ( + `a` TINYINT NOT NULL, + `b` TINYINT NULL + ) + UNIQUE KEY (`a`) + DISTRIBUTED BY HASH(`a`) BUCKETS 1 + PROPERTIES + ( + 'replication_num' = '${backends.size()}', + 'light_schema_change' = 'false' + ) + """ + + sql "INSERT INTO ${tbl} VALUES (1, 2), (3, 4)" + + try { + DebugPoint.enableDebugPoint(beHost, beHttpPort, NodeType.BE, injectName) + setFeConfig('disable_tablet_scheduler', true) + + sleep(1000) + sql "ALTER TABLE ${tbl} MODIFY COLUMN b DOUBLE" + sleep(5 * 1000) + + def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName = '${tbl}' ORDER BY CreateTime DESC LIMIT 1" + assertEquals(1, jobs.size()) + assertEquals('FINISHED', jobs[0].State) + + checkReplicaBad() + connect('root', '', followFeUrl) { + checkReplicaBad() + } + } finally { + DebugPoint.disableDebugPoint(beHost, beHttpPort, NodeType.BE, injectName) + setFeConfig('disable_tablet_scheduler', false) + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org