This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 971aa72dea5 [fix](schema change) follow fe set sc fail replicas as bad 
#33569 (#34040)
971aa72dea5 is described below

commit 971aa72dea5e93c90572b7d892abad564fd4169d
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Sat Apr 27 13:59:22 2024 +0800

    [fix](schema change) follow fe set sc fail replicas as bad #33569 (#34040)
---
 be/src/olap/schema_change.cpp                      | 11 ++-
 .../java/org/apache/doris/alter/AlterHandler.java  |  1 +
 .../java/org/apache/doris/alter/AlterJobV2.java    |  6 ++
 .../java/org/apache/doris/alter/RollupJobV2.java   | 27 ++++---
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 27 ++++---
 .../test_schema_change_fail.groovy                 | 90 ++++++++++++++++++++++
 6 files changed, 139 insertions(+), 23 deletions(-)

diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 84ecbb10c60..4ea1ccbc3ff 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -790,11 +790,20 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
         do {
             RowsetSharedPtr max_rowset;
             // get history data to be converted and it will check if there is 
hold in base tablet
-            if (!_get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed, &max_rowset)) {
+            res = _get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed, &max_rowset);
+            if (!res) {
                 LOG(WARNING) << "fail to get version to be changed. res=" << 
res;
                 break;
             }
 
+            DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", 
{
+                res = Status::InternalError(
+                        "inject alter tablet failed. base_tablet={}, 
new_tablet={}",
+                        request.base_tablet_id, request.new_tablet_id);
+                LOG(WARNING) << "inject error. res=" << res;
+                break;
+            });
+
             // should check the max_version >= request.alter_version, if not 
the convert is useless
             if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
                 res = Status::InternalError(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 7ab2f8732d0..dbc43059e5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -269,6 +269,7 @@ public abstract class AlterHandler extends MasterDaemon {
             alterJob.replay(alterJob);
             alterJobsV2.put(alterJob.getJobId(), alterJob);
         } else {
+            existingJob.failedTabletBackends = alterJob.failedTabletBackends;
             existingJob.replay(alterJob);
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index 29bf3d994ed..78f34c2cccd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -36,6 +37,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /*
  * Version 2 of AlterJob, for replacing the old version of AlterJob.
@@ -90,6 +92,10 @@ public abstract class AlterJobV2 implements Writable {
     @SerializedName(value = "rawSql")
     protected String rawSql;
 
+    // save failed task after retry three times, tablet -> backends
+    @SerializedName(value = "failedTabletBackends")
+    protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
+
     public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, 
long tableId, String tableName,
                       long timeoutMs) {
         this.rawSql = rawSql;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 482944acf8b..c127bd4b8d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -140,8 +140,6 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
 
     // save all create rollup tasks
     private AgentBatchTask rollupBatchTask = new AgentBatchTask();
-    // save failed task after retry three times, tabletId -> agentTask
-    private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
 
     private Analyzer analyzer;
 
@@ -529,17 +527,18 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                     task.setFinished(true);
                     AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.ALTER, task.getSignature());
                     LOG.warn("rollup task failed: " + task.getErrorMsg());
-                    if (!failedAgentTasks.containsKey(task.getTabletId())) {
-                        failedAgentTasks.put(task.getTabletId(), 
Lists.newArrayList(task));
-                    } else {
-                        failedAgentTasks.get(task.getTabletId()).add(task);
+                    List<Long> failedBackends = 
failedTabletBackends.get(task.getTabletId());
+                    if (failedBackends == null) {
+                        failedBackends = Lists.newArrayList();
+                        failedTabletBackends.put(task.getTabletId(), 
failedBackends);
                     }
+                    failedBackends.add(task.getBackendId());
                     int expectSucceedTaskNum = tbl.getPartitionInfo()
                             
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
-                    int failedTaskCount = 
failedAgentTasks.get(task.getTabletId()).size();
+                    int failedTaskCount = failedBackends.size();
                     if (expectSucceedTaskNum - failedTaskCount < 
expectSucceedTaskNum / 2 + 1) {
                         throw new AlterCancelException("rollup tasks failed on 
same tablet reach threshold "
-                                + failedAgentTasks.get(task.getTabletId()) + 
", reason=" + task.getErrorMsg());
+                                + failedTaskCount + ", reason=" + 
task.getErrorMsg());
                     }
                 }
             }
@@ -554,9 +553,11 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         try {
             Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
             TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
-            for (List<AgentTask> tasks : failedAgentTasks.values()) {
-                for (AgentTask task : tasks) {
-                    invertedIndex.getReplica(task.getTabletId(), 
task.getBackendId()).setBad(true);
+            for (Map.Entry<Long, List<Long>> entry : 
failedTabletBackends.entrySet()) {
+                long tabletId = entry.getKey();
+                List<Long> failedBackends = entry.getValue();
+                for (long backendId : failedBackends) {
+                    invertedIndex.getReplica(tabletId, backendId).setBad(true);
                 }
             }
             for (Map.Entry<Long, MaterializedIndex> entry : 
this.partitionIdToRollupIndex.entrySet()) {
@@ -606,8 +607,12 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
             MaterializedIndex rollupIndex = partition.getIndex(rollupIndexId);
             Preconditions.checkNotNull(rollupIndex, rollupIndexId);
             for (Tablet tablet : rollupIndex.getTablets()) {
+                List<Long> failedBackends = 
failedTabletBackends.get(tablet.getId());
                 for (Replica replica : tablet.getReplicas()) {
                     replica.setState(ReplicaState.NORMAL);
+                    if (failedBackends != null && 
failedBackends.contains(replica.getBackendId())) {
+                        replica.setBad(true);
+                    }
                 }
             }
             partition.visualiseShadowIndex(rollupIndexId, false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index c1ecc202d79..d8c12066df6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -134,8 +134,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     // save all schema change tasks
     private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
-    // save failed task after retry three times, tabletId -> agentTask
-    private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
 
     private SchemaChangeJobV2() {
         super(JobType.SCHEMA_CHANGE);
@@ -520,17 +518,18 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                     task.setFinished(true);
                     AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.ALTER, task.getSignature());
                     LOG.warn("schema change task failed: " + 
task.getErrorMsg());
-                    if (!failedAgentTasks.containsKey(task.getTabletId())) {
-                        failedAgentTasks.put(task.getTabletId(), 
Lists.newArrayList(task));
-                    } else {
-                        failedAgentTasks.get(task.getTabletId()).add(task);
+                    List<Long> failedBackends = 
failedTabletBackends.get(task.getTabletId());
+                    if (failedBackends == null) {
+                        failedBackends = Lists.newArrayList();
+                        failedTabletBackends.put(task.getTabletId(), 
failedBackends);
                     }
+                    failedBackends.add(task.getBackendId());
                     int expectSucceedTaskNum = tbl.getPartitionInfo()
                             
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
-                    int failedTaskCount = 
failedAgentTasks.get(task.getTabletId()).size();
+                    int failedTaskCount = failedBackends.size();
                     if (expectSucceedTaskNum - failedTaskCount < 
expectSucceedTaskNum / 2 + 1) {
                         throw new AlterCancelException("schema change tasks 
failed on same tablet reach threshold "
-                                    + 
failedAgentTasks.get(task.getTabletId()));
+                                + failedTaskCount);
                     }
                 }
             }
@@ -545,9 +544,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         try {
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
             TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
-            for (List<AgentTask> tasks : failedAgentTasks.values()) {
-                for (AgentTask task : tasks) {
-                    invertedIndex.getReplica(task.getTabletId(), 
task.getBackendId()).setBad(true);
+            for (Map.Entry<Long, List<Long>> entry : 
failedTabletBackends.entrySet()) {
+                long tabletId = entry.getKey();
+                List<Long> failedBackends = entry.getValue();
+                for (long backendId : failedBackends) {
+                    invertedIndex.getReplica(tabletId, backendId).setBad(true);
                 }
             }
             for (long partitionId : partitionIndexMap.rowKeySet()) {
@@ -622,8 +623,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
                 // set replica state
                 for (Tablet tablet : shadowIdx.getTablets()) {
+                    List<Long> failedBackends = 
failedTabletBackends.get(tablet.getId());
                     for (Replica replica : tablet.getReplicas()) {
                         replica.setState(ReplicaState.NORMAL);
+                        if (failedBackends != null && 
failedBackends.contains(replica.getBackendId())) {
+                            replica.setBad(true);
+                        }
                     }
                 }
 
diff --git 
a/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy 
b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy
new file mode 100644
index 00000000000..8dc65365c39
--- /dev/null
+++ b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite('test_schema_change_fail', 'p0,p2,nonConcurrent') {
+    def frontends = sql_return_maparray('show frontends')
+    def backends = sql_return_maparray('show backends')
+    def forceReplicaNum = 
getFeConfig('force_olap_table_replication_num').toInteger()
+    if (frontends.size() < 2 || backends.size() < 3 || forceReplicaNum == 1 || 
forceReplicaNum == 2) {
+        return
+    }
+
+    def tbl = 'test_schema_change_fail'
+
+    def beId = backends[0].BackendId.toLong()
+    def beHost = backends[0].Host
+    def beHttpPort = backends[0].HttpPort.toInteger()
+    def injectName = 'SchemaChangeJob.process_alter_tablet.alter_fail'
+
+    def checkReplicaBad = { ->
+        def tabletId = sql_return_maparray("SHOW TABLETS FROM 
${tbl}")[0].TabletId.toLong()
+        def replicas = sql_return_maparray(sql_return_maparray("SHOW TABLET 
${tabletId}").DetailCmd)
+        assertEquals(backends.size(), replicas.size())
+        for (def replica : replicas) {
+            if (replica.BackendId.toLong() == beId) {
+                assertEquals(true, replica.IsBad.toBoolean())
+            }
+        }
+    }
+
+    def followFe = frontends.stream().filter(fe -> 
!fe.IsMaster.toBoolean()).findFirst().orElse(null)
+    def followFeUrl =  
"jdbc:mysql://${followFe.Host}:${followFe.QueryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false"
+    followFeUrl = context.config.buildUrlWithDb(followFeUrl, context.dbName)
+
+    sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+    sql """
+        CREATE TABLE ${tbl}
+        (
+            `a` TINYINT NOT NULL,
+            `b` TINYINT NULL
+        )
+        UNIQUE KEY (`a`)
+        DISTRIBUTED BY HASH(`a`) BUCKETS 1
+        PROPERTIES
+        (
+            'replication_num' = '${backends.size()}',
+            'light_schema_change' = 'false'
+        )
+    """
+
+    sql "INSERT INTO ${tbl} VALUES (1, 2), (3, 4)"
+
+    try {
+        DebugPoint.enableDebugPoint(beHost, beHttpPort, NodeType.BE, 
injectName)
+        setFeConfig('disable_tablet_scheduler', true)
+
+        sleep(1000)
+        sql "ALTER TABLE ${tbl} MODIFY COLUMN b DOUBLE"
+        sleep(5 * 1000)
+
+        def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE 
TableName = '${tbl}' ORDER BY CreateTime DESC LIMIT 1"
+        assertEquals(1, jobs.size())
+        assertEquals('FINISHED', jobs[0].State)
+
+        checkReplicaBad()
+        connect('root', '', followFeUrl) {
+            checkReplicaBad()
+        }
+    } finally {
+        DebugPoint.disableDebugPoint(beHost, beHttpPort, NodeType.BE, 
injectName)
+        setFeConfig('disable_tablet_scheduler', false)
+        sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to