This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a8d2cc1e18 [fix](cloud-schema-change) Make SC tablet job abort logic 
really work (#50908)
4a8d2cc1e18 is described below

commit 4a8d2cc1e18e1992079c0996554e043bad589811
Author: Siyang Tang <tangsiy...@selectdb.com>
AuthorDate: Fri May 23 19:23:44 2025 +0800

    [fix](cloud-schema-change) Make SC tablet job abort logic really work 
(#50908)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    Under cloud mode, the abort action for schema change actually never
    really work, since the pruneMeta action will clear some data structure
    dependent by schema change abort.
    
    Moreover, pruneMeta is called before recording SC's edit log, which may
    cause job replay worl incorrectly.
---
 be/src/cloud/cloud_schema_change_job.cpp           |   7 ++
 cloud/src/meta-service/meta_service_job.cpp        |   4 +-
 .../org/apache/doris/alter/CloudRollupJobV2.java   |   2 +-
 .../apache/doris/alter/CloudSchemaChangeJobV2.java |   4 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   6 +-
 .../cloud/datasource/CloudInternalCatalog.java     |   3 +-
 .../test_base_compaction_after_sc_fail.groovy      | 100 +++++++++++++++++++++
 7 files changed, 119 insertions(+), 7 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 8eca2bf4662..0fa47d76cbf 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -130,6 +130,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
         }
         return st;
     }
+    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", {
+        auto res =
+                Status::InternalError("inject alter tablet failed. 
base_tablet={}, new_tablet={}",
+                                      request.base_tablet_id, 
request.new_tablet_id);
+        LOG(WARNING) << "inject error. res=" << res;
+        return res;
+    });
     if (request.alter_version > 1) {
         // [0-1] is a placeholder rowset, no need to convert
         RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, 
start_resp.alter_version()},
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 9f4ba3c3072..8ebaf0d9baa 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1165,7 +1165,9 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
 
     // MUST check initiator to let the retried BE commit this schema_change 
job.
     if (schema_change.id() != recorded_schema_change.id() ||
-        schema_change.initiator() != recorded_schema_change.initiator()) {
+        (schema_change.initiator() != recorded_schema_change.initiator() &&
+         request->action() != FinishTabletJobRequest::ABORT)) {
+        // abort is from FE, so initiator differ from the original one, just 
skip this check
         SS << "unmatched job id or initiator, recorded_id=" << 
recorded_schema_change.id()
            << " given_id=" << schema_change.id()
            << " recorded_job=" << proto_to_json(recorded_schema_change)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 7d4338322c6..894528177e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -137,7 +137,7 @@ public class CloudRollupJobV2 extends RollupJobV2 {
                         Long rollupTabletId = tabletEntry.getKey();
                         Long baseTabletId = tabletEntry.getValue();
                         ((CloudInternalCatalog) 
Env.getCurrentInternalCatalog())
-                                .removeSchemaChangeJob(dbId, tableId, 
baseIndexId, rollupIndexId,
+                                .removeSchemaChangeJob(jobId, dbId, tableId, 
baseIndexId, rollupIndexId,
                                     partitionId, baseTabletId, rollupTabletId);
                     }
                     LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index c481914d78c..b57fe1609d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -117,8 +117,8 @@ public class CloudSchemaChangeJobV2 extends 
SchemaChangeJobV2 {
                         Long shadowTabletId = entry.getKey();
                         Long originTabletId = entry.getValue();
                         ((CloudInternalCatalog) 
Env.getCurrentInternalCatalog())
-                                .removeSchemaChangeJob(dbId, tableId, 
originIndexId, shadowIndexId,
-                                    partitionId, originTabletId, 
shadowTabletId);
+                                .removeSchemaChangeJob(jobId, dbId, tableId, 
originIndexId, shadowIndexId,
+                                        partitionId, originTabletId, 
shadowTabletId);
                     }
                     LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in 
ms."
                             + "dbId:{}, tableId:{}, originIndexId:{}, 
partitionId:{}. tabletSize:{}",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index f334d485e98..e269855a916 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -664,7 +664,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             commitShadowIndex();
             // all partitions are good
             onFinished(tbl);
-            pruneMeta();
 
             LOG.info("schema change job finished: {}", jobId);
 
@@ -676,6 +675,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             // Write edit log with table's write lock held, to avoid adding 
partitions before writing edit log,
             // else it will try to transform index in newly added partition 
while replaying and result in failure.
             Env.getCurrentEnv().getEditLog().logAlterJob(this);
+            pruneMeta();
         } finally {
             tbl.writeUnlock();
         }
@@ -790,7 +790,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         cancelInternal();
 
-        pruneMeta();
         this.errMsg = errMsg;
         this.finishedTimeMs = System.currentTimeMillis();
         changeTableState(dbId, tableId, OlapTableState.NORMAL);
@@ -799,6 +798,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         Env.getCurrentEnv().getEditLog().logAlterJob(this);
         LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
         onCancel();
+        pruneMeta();
 
         return true;
     }
@@ -936,6 +936,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay finished schema change job: {} table id: {}", jobId, 
tableId);
         changeTableState(dbId, tableId, OlapTableState.NORMAL);
         LOG.info("set table's state to NORMAL when replay finished, table id: 
{}, job id: {}", tableId, jobId);
+        pruneMeta();
     }
 
     /**
@@ -951,6 +952,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay cancelled schema change job: {}", jobId);
         changeTableState(dbId, tableId, OlapTableState.NORMAL);
         LOG.info("set table's state to NORMAL when replay cancelled, table id: 
{}, job id: {}", tableId, jobId);
+        pruneMeta();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 2c9b968e265..15de6f132da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -850,7 +850,7 @@ public class CloudInternalCatalog extends InternalCatalog {
         }
     }
 
-    public void removeSchemaChangeJob(long dbId, long tableId, long indexId, 
long newIndexId,
+    public void removeSchemaChangeJob(long jobId, long dbId, long tableId, 
long indexId, long newIndexId,
             long partitionId, long tabletId, long newTabletId)
             throws DdlException {
         Cloud.FinishTabletJobRequest.Builder finishTabletJobRequestBuilder = 
Cloud.FinishTabletJobRequest.newBuilder();
@@ -879,6 +879,7 @@ public class CloudInternalCatalog extends InternalCatalog {
         newtabletIndexPBBuilder.setTabletId(newTabletId);
         final Cloud.TabletIndexPB newtabletIndex = 
newtabletIndexPBBuilder.build();
         schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex);
+        schemaChangeJobPBBuilder.setId(String.valueOf(jobId));
         final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
                 schemaChangeJobPBBuilder.build();
 
diff --git 
a/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy 
b/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
new file mode 100644
index 00000000000..815bfc6281e
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.NodeType
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+
+suite("test_base_compaction_after_sc_fail", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+    
+    def tableName = "test_base_compaction_after_sc_fail"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+                ) DUPLICATE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1");
+    """
+
+    def injectBe = null
+    def backends = sql_return_maparray('show backends')
+    def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+    def injectBeId = array[0].BackendId
+    def originTabletId = array[0].TabletId
+    injectBe = backends.stream().filter(be -> be.BackendId == 
injectBeId).findFirst().orElse(null)
+    assertNotNull(injectBe)
+
+    def injectName = "CloudSchemaChangeJob::process_alter_tablet.alter_fail"
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+        sql """ ALTER TABLE ${tableName} MODIFY COLUMN c1 VARCHAR(44) """
+
+        def wait_for_schema_change = {
+                    def try_times=1000
+                    while(true){
+                        def res = sql " SHOW ALTER TABLE COLUMN WHERE 
TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+                        Thread.sleep(10)
+                        if(res[0][9].toString() == "CANCELLED") {
+                            break;
+                        }
+                        assert(try_times>0)
+                        try_times--
+                    }
+                }
+        wait_for_schema_change()
+
+        def insert_data = {
+            for (i in 0..100) {
+                sql """ INSERT INTO ${tableName} VALUES(1, "2", 3, 4) """
+                sql """ DELETE FROM ${tableName} WHERE k1=1 """
+            }
+        }
+
+        insert_data()
+
+        trigger_and_wait_compaction(tableName, "cumulative")
+
+        insert_data()
+
+        trigger_and_wait_compaction(tableName, "cumulative")
+
+        insert_data()
+
+        trigger_and_wait_compaction(tableName, "cumulative")
+
+        trigger_and_wait_compaction(tableName, "base")
+
+    } finally {
+        GetDebugPoint().disableDebugPointForAllBEs(injectName)
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to