This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 81101fc1c5 [enhancement](alter) Make alter job more robust by ignoring 
some task failure (#10719)
81101fc1c5 is described below

commit 81101fc1c58100a05c16966b1d2b3f408df2a81f
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Mon Jul 11 12:16:48 2022 +0800

    [enhancement](alter) Make alter job more robust by ignoring some task 
failure (#10719)
    
    Co-authored-by: caiconghui1 <caicongh...@jd.com>
---
 docs/en/docs/admin-manual/config/fe-config.md      |  4 +--
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |  4 +--
 .../java/org/apache/doris/alter/RollupJobV2.java   | 34 ++++++++++++++++------
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 34 +++++++++++++++-------
 .../main/java/org/apache/doris/common/Config.java  |  4 +--
 5 files changed, 55 insertions(+), 25 deletions(-)

diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index 21e9fe5623..13c6e8374f 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2084,11 +2084,11 @@ Whether to allow multiple replicas of the same tablet 
to be distributed on the s
 
 ### min_version_count_indicate_replica_compaction_too_slow
 
-Default: 300
+Default: 200
 
 Dynamically configured: true
 
-Only for Master FE: true
+Only for Master FE: false
 
 The version count threshold used to judge whether replica compaction is too 
slow
 
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 5aa42bd793..bd09d69e53 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2129,11 +2129,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
 
 ### `min_version_count_indicate_replica_compaction_too_slow`
 
-默认值:300
+默认值:200
 
 是否可以动态配置:true
 
-是否为 Master FE 节点独有的配置项:true
+是否为 Master FE 节点独有的配置项:false
 
 版本计数阈值,用来判断副本做 compaction 的速度是否太慢
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 79ec2d4107..4fe0c9854e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -131,6 +131,8 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
 
     // save all create rollup tasks
     private AgentBatchTask rollupBatchTask = new AgentBatchTask();
+    // save failed task after retry three times, tabletId -> agentTask
+    private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
 
     public RollupJobV2(long jobId, long dbId, long tableId, String tableName, 
long timeoutMs,
             long baseIndexId, long rollupIndexId, String baseIndexName, String 
rollupIndexName,
@@ -152,10 +154,6 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         this.origStmt = origStmt;
     }
 
-    private RollupJobV2() {
-        super(JobType.ROLLUP);
-    }
-
     public void addTabletIdMap(long partitionId, long rollupTabletId, long 
baseTabletId) {
         Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap
                 .computeIfAbsent(partitionId, k -> Maps.newHashMap());
@@ -425,13 +423,27 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         } catch (MetaNotFoundException e) {
             throw new AlterCancelException(e.getMessage());
         }
-
         if (!rollupBatchTask.isFinished()) {
             LOG.info("rollup tasks not finished. job: {}", jobId);
             List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
             for (AgentTask task : tasks) {
                 if (task.getFailedTimes() >= 3) {
-                    throw new AlterCancelException("rollup task failed after 
try three times: " + task.getErrorMsg());
+                    task.setFinished(true);
+                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.ALTER, task.getSignature());
+                    LOG.warn("rollup task failed after try three times: " + 
task.getErrorMsg());
+                    if (!failedAgentTasks.containsKey(task.getTabletId())) {
+                        failedAgentTasks.put(task.getTabletId(), 
Lists.newArrayList(task));
+                    } else {
+                        failedAgentTasks.get(task.getTabletId()).add(task);
+                    }
+                    int expectSucceedTaskNum = tbl.getPartitionInfo()
+                            
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+                    int failedTaskCount = 
failedAgentTasks.get(task.getTabletId()).size();
+                    if (expectSucceedTaskNum - failedTaskCount < 
expectSucceedTaskNum / 2 + 1) {
+                        throw new AlterCancelException(
+                                "rollup tasks failed on same tablet reach 
threshold "
+                                        + 
failedAgentTasks.get(task.getTabletId()));
+                    }
                 }
             }
             return;
@@ -444,6 +456,12 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         tbl.writeLockOrAlterCancelException();
         try {
             Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+            TabletInvertedIndex invertedIndex = 
Catalog.getCurrentInvertedIndex();
+            for (List<AgentTask> tasks : failedAgentTasks.values()) {
+                for (AgentTask task : tasks) {
+                    invertedIndex.getReplica(task.getTabletId(), 
task.getBackendId()).setBad(true);
+                }
+            }
             for (Map.Entry<Long, MaterializedIndex> entry : 
this.partitionIdToRollupIndex.entrySet()) {
                 long partitionId = entry.getKey();
                 Partition partition = tbl.getPartition(partitionId);
@@ -454,14 +472,12 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                 long visiableVersion = partition.getVisibleVersion();
                 short expectReplicationNum = 
tbl.getPartitionInfo().getReplicaAllocation(
                         partitionId).getTotalReplicaNum();
-
-
                 MaterializedIndex rollupIndex = entry.getValue();
                 for (Tablet rollupTablet : rollupIndex.getTablets()) {
                     List<Replica> replicas = rollupTablet.getReplicas();
                     int healthyReplicaNum = 0;
                     for (Replica replica : replicas) {
-                        if (replica.getLastFailedVersion() < 0
+                        if (!replica.isBad() && replica.getLastFailedVersion() 
< 0
                                 && 
replica.checkVersionCatchUp(visiableVersion, false)) {
                             healthyReplicaNum++;
                         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index d7a0662bb2..1c112d6d28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -129,15 +129,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     // save all schema change tasks
     private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
+    // save failed task after retry three times, tabletId -> agentTask
+    private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
 
     public SchemaChangeJobV2(long jobId, long dbId, long tableId, String 
tableName, long timeoutMs) {
         super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, 
timeoutMs);
     }
 
-    private SchemaChangeJobV2() {
-        super(JobType.SCHEMA_CHANGE);
-    }
-
     public void addTabletIdMap(long partitionId, long shadowIdxId, long 
shadowTabletId, long originTabletId) {
         Map<Long, Long> tabletMap = partitionIndexTabletMap.get(partitionId, 
shadowIdxId);
         if (tabletMap == null) {
@@ -494,14 +492,25 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             List<AgentTask> tasks = 
schemaChangeBatchTask.getUnfinishedTasks(2000);
             for (AgentTask task : tasks) {
                 if (task.getFailedTimes() >= 3) {
-                    throw new AlterCancelException("schema change task failed 
after try three times: "
-                            + task.getErrorMsg());
+                    task.setFinished(true);
+                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.ALTER, task.getSignature());
+                    LOG.warn("schema change task failed after try three times: 
" + task.getErrorMsg());
+                    if (!failedAgentTasks.containsKey(task.getTabletId())) {
+                        failedAgentTasks.put(task.getTabletId(), 
Lists.newArrayList(task));
+                    } else {
+                        failedAgentTasks.get(task.getTabletId()).add(task);
+                    }
+                    int expectSucceedTaskNum = tbl.getPartitionInfo()
+                            
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+                    int failedTaskCount = 
failedAgentTasks.get(task.getTabletId()).size();
+                    if (expectSucceedTaskNum - failedTaskCount < 
expectSucceedTaskNum / 2 + 1) {
+                        throw new AlterCancelException("schema change tasks 
failed on same tablet reach threshold "
+                                    + 
failedAgentTasks.get(task.getTabletId()));
+                    }
                 }
             }
             return;
         }
-
-
         /*
          * all tasks are finished. check the integrity.
          * we just check whether all new replicas are healthy.
@@ -509,7 +518,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         tbl.writeLockOrAlterCancelException();
         try {
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
-
+            TabletInvertedIndex invertedIndex = 
Catalog.getCurrentInvertedIndex();
+            for (List<AgentTask> tasks : failedAgentTasks.values()) {
+                for (AgentTask task : tasks) {
+                    invertedIndex.getReplica(task.getTabletId(), 
task.getBackendId()).setBad(true);
+                }
+            }
             for (long partitionId : partitionIndexMap.rowKeySet()) {
                 Partition partition = tbl.getPartition(partitionId);
                 Preconditions.checkNotNull(partition, partitionId);
@@ -526,7 +540,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                         List<Replica> replicas = shadowTablet.getReplicas();
                         int healthyReplicaNum = 0;
                         for (Replica replica : replicas) {
-                            if (replica.getLastFailedVersion() < 0
+                            if (!replica.isBad() && 
replica.getLastFailedVersion() < 0
                                     && 
replica.checkVersionCatchUp(visiableVersion, false)) {
                                 healthyReplicaNum++;
                             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index c64de8ea6b..4f9231516d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1583,8 +1583,8 @@ public class Config extends ConfigBase {
     /**
      *  The version count threshold used to judge whether replica compaction 
is too slow
      */
-    @ConfField(mutable = true, masterOnly = true)
-    public static int min_version_count_indicate_replica_compaction_too_slow = 
300;
+    @ConfField(mutable = true)
+    public static int min_version_count_indicate_replica_compaction_too_slow = 
200;
 
     /**
      * The valid ratio threshold of the difference between the version count 
of the slowest replicaand the fastest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to