This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 81101fc1c5 [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719) 81101fc1c5 is described below commit 81101fc1c58100a05c16966b1d2b3f408df2a81f Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Mon Jul 11 12:16:48 2022 +0800 [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719) Co-authored-by: caiconghui1 <caicongh...@jd.com> --- docs/en/docs/admin-manual/config/fe-config.md | 4 +-- docs/zh-CN/docs/admin-manual/config/fe-config.md | 4 +-- .../java/org/apache/doris/alter/RollupJobV2.java | 34 ++++++++++++++++------ .../org/apache/doris/alter/SchemaChangeJobV2.java | 34 +++++++++++++++------- .../main/java/org/apache/doris/common/Config.java | 4 +-- 5 files changed, 55 insertions(+), 25 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 21e9fe5623..13c6e8374f 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2084,11 +2084,11 @@ Whether to allow multiple replicas of the same tablet to be distributed on the s ### min_version_count_indicate_replica_compaction_too_slow -Default: 300 +Default: 200 Dynamically configured: true -Only for Master FE: true +Only for Master FE: false The version count threshold used to judge whether replica compaction is too slow diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 5aa42bd793..bd09d69e53 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2129,11 +2129,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 ### `min_version_count_indicate_replica_compaction_too_slow` -默认值:300 +默认值:200 是否可以动态配置:true -是否为 Master FE 节点独有的配置项:true +是否为 Master FE 节点独有的配置项:false 版本计数阈值,用来判断副本做 compaction 的速度是否太慢 diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 79ec2d4107..4fe0c9854e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -131,6 +131,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); + // save failed task after retry three times, tabletId -> agentTask + private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap(); public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, @@ -152,10 +154,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { this.origStmt = origStmt; } - private RollupJobV2() { - super(JobType.ROLLUP); - } - public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) { Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap .computeIfAbsent(partitionId, k -> Maps.newHashMap()); @@ -425,13 +423,27 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } catch (MetaNotFoundException e) { throw new AlterCancelException(e.getMessage()); } - if (!rollupBatchTask.isFinished()) { LOG.info("rollup tasks not finished. job: {}", jobId); List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000); for (AgentTask task : tasks) { if (task.getFailedTimes() >= 3) { - throw new AlterCancelException("rollup task failed after try three times: " + task.getErrorMsg()); + task.setFinished(true); + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); + LOG.warn("rollup task failed after try three times: " + task.getErrorMsg()); + if (!failedAgentTasks.containsKey(task.getTabletId())) { + failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task)); + } else { + failedAgentTasks.get(task.getTabletId()).add(task); + } + int expectSucceedTaskNum = tbl.getPartitionInfo() + .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); + int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); + if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { + throw new AlterCancelException( + "rollup tasks failed on same tablet reach threshold " + + failedAgentTasks.get(task.getTabletId())); + } } } return; @@ -444,6 +456,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); + for (List<AgentTask> tasks : failedAgentTasks.values()) { + for (AgentTask task : tasks) { + invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true); + } + } for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); Partition partition = tbl.getPartition(partitionId); @@ -454,14 +472,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { long visiableVersion = partition.getVisibleVersion(); short expectReplicationNum = tbl.getPartitionInfo().getReplicaAllocation( partitionId).getTotalReplicaNum(); - - MaterializedIndex rollupIndex = entry.getValue(); for (Tablet rollupTablet : rollupIndex.getTablets()) { List<Replica> replicas = rollupTablet.getReplicas(); int healthyReplicaNum = 0; for (Replica replica : replicas) { - if (replica.getLastFailedVersion() < 0 + if (!replica.isBad() && replica.getLastFailedVersion() < 0 && replica.checkVersionCatchUp(visiableVersion, false)) { healthyReplicaNum++; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index d7a0662bb2..1c112d6d28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -129,15 +129,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // save all schema change tasks private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask(); + // save failed task after retry three times, tabletId -> agentTask + private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap(); public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) { super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs); } - private SchemaChangeJobV2() { - super(JobType.SCHEMA_CHANGE); - } - public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) { Map<Long, Long> tabletMap = partitionIndexTabletMap.get(partitionId, shadowIdxId); if (tabletMap == null) { @@ -494,14 +492,25 @@ public class SchemaChangeJobV2 extends AlterJobV2 { List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000); for (AgentTask task : tasks) { if (task.getFailedTimes() >= 3) { - throw new AlterCancelException("schema change task failed after try three times: " - + task.getErrorMsg()); + task.setFinished(true); + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); + LOG.warn("schema change task failed after try three times: " + task.getErrorMsg()); + if (!failedAgentTasks.containsKey(task.getTabletId())) { + failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task)); + } else { + failedAgentTasks.get(task.getTabletId()).add(task); + } + int expectSucceedTaskNum = tbl.getPartitionInfo() + .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); + int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); + if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { + throw new AlterCancelException("schema change tasks failed on same tablet reach threshold " + + failedAgentTasks.get(task.getTabletId())); + } } } return; } - - /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. @@ -509,7 +518,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); - + TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); + for (List<AgentTask> tasks : failedAgentTasks.values()) { + for (AgentTask task : tasks) { + invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true); + } + } for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); Preconditions.checkNotNull(partition, partitionId); @@ -526,7 +540,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { List<Replica> replicas = shadowTablet.getReplicas(); int healthyReplicaNum = 0; for (Replica replica : replicas) { - if (replica.getLastFailedVersion() < 0 + if (!replica.isBad() && replica.getLastFailedVersion() < 0 && replica.checkVersionCatchUp(visiableVersion, false)) { healthyReplicaNum++; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index c64de8ea6b..4f9231516d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1583,8 +1583,8 @@ public class Config extends ConfigBase { /** * The version count threshold used to judge whether replica compaction is too slow */ - @ConfField(mutable = true, masterOnly = true) - public static int min_version_count_indicate_replica_compaction_too_slow = 300; + @ConfField(mutable = true) + public static int min_version_count_indicate_replica_compaction_too_slow = 200; /** * The valid ratio threshold of the difference between the version count of the slowest replicaand the fastest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org