This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 8528a1bcb2d [bugfix](backup)(cooldown) cancel backup properly when be 
backup failed #38724 (#38994)
8528a1bcb2d is described below

commit 8528a1bcb2d368b9db12e4d503395248146ad703
Author: walter <w41te...@gmail.com>
AuthorDate: Wed Aug 7 19:41:16 2024 +0800

    [bugfix](backup)(cooldown) cancel backup properly when be backup failed 
#38724 (#38994)
    
    cherry pick from #38724
    
    Co-authored-by: zhangyuan <ayuanzh...@tencent.com>
---
 be/src/common/status.cpp                           |   7 +
 be/src/olap/snapshot_manager.cpp                   |   3 +
 be/src/olap/tablet.cpp                             |   8 +
 .../java/org/apache/doris/backup/BackupJob.java    |  69 +++++++
 .../backup_restore/test_backup_cancelled.groovy    | 207 +++++++++++++++++++++
 5 files changed, 294 insertions(+)

diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp
index bc8ed22a235..910ad3d2d18 100644
--- a/be/src/common/status.cpp
+++ b/be/src/common/status.cpp
@@ -25,6 +25,13 @@ void Status::to_thrift(TStatus* s) const {
         return;
     }
     s->status_code = (int16_t)_code > 0 ? (TStatusCode::type)_code : 
TStatusCode::INTERNAL_ERROR;
+
+    if (_code == ErrorCode::VERSION_ALREADY_MERGED) {
+        s->status_code = TStatusCode::OLAP_ERR_VERSION_ALREADY_MERGED;
+    } else if (_code == ErrorCode::TABLE_NOT_FOUND) {
+        s->status_code = TStatusCode::TABLET_MISSING;
+    }
+
     s->error_msgs.push_back(fmt::format("({})[{}]{}", 
BackendOptions::get_localhost(),
                                         code_as_string(), _err_msg ? 
_err_msg->_msg : ""));
     s->__isset.error_msgs = true;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index df7c442ff2c..0fcc09cec49 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -88,6 +88,9 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& 
request, string* s
 
     TabletSharedPtr ref_tablet =
             
StorageEngine::instance()->tablet_manager()->get_tablet(request.tablet_id);
+
+    DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { 
ref_tablet = nullptr; })
+
     if (ref_tablet == nullptr) {
         return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. 
tablet={}", request.tablet_id);
     }
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 137bb26c1fb..8d8b47580ad 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -982,6 +982,14 @@ Status Tablet::capture_consistent_versions(const Version& 
spec_version,
             }
         }
     }
+
+    DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", {
+        auto tablet_id = dp->param<int64>("tablet_id", -1);
+        if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) {
+            status = Status::Error<VERSION_ALREADY_MERGED>("version already 
merged");
+        }
+    });
+
     return status;
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 37581e848cb..937c97f473d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -159,6 +159,61 @@ public class BackupJob extends AbstractJob {
         return BackupContent.ALL;
     }
 
+    private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) {
+        Table table = 
env.getInternalCatalog().getTableByTableId(task.getTableId());
+        if (table == null) {
+            return false;
+        }
+        OlapTable tbl = (OlapTable) table;
+        tbl.readLock();
+        try {
+            if (tbl.getId() != task.getTableId()) {
+                return false;
+            }
+            Partition partition = tbl.getPartition(task.getPartitionId());
+            if (partition == null) {
+                return false;
+            }
+            MaterializedIndex index = partition.getIndex(task.getIndexId());
+            if (index == null) {
+                return false;
+            }
+            Tablet tablet = index.getTablet(task.getTabletId());
+            if (tablet == null) {
+                return false;
+            }
+            Replica replica = chooseReplica(tablet, task.getVersion());
+            if (replica == null) {
+                return false;
+            }
+
+            //clear old task
+            AgentTaskQueue.removeTaskOfType(TTaskType.MAKE_SNAPSHOT, 
task.getTabletId());
+            unfinishedTaskIds.remove(task.getTabletId());
+            taskProgress.remove(task.getTabletId());
+            taskErrMsg.remove(task.getTabletId());
+
+            SnapshotTask newTask = new SnapshotTask(null, 
replica.getBackendId(), task.getTabletId(),
+                    task.getJobId(), task.getDbId(), tbl.getId(), 
task.getPartitionId(),
+                    task.getIndexId(), task.getTabletId(),
+                    task.getVersion(),
+                    task.getSchemaHash(), timeoutMs, false /* not restore task 
*/);
+            AgentBatchTask batchTask = new AgentBatchTask();
+            batchTask.addTask(newTask);
+            unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
+
+            //send task
+            AgentTaskQueue.addTask(newTask);
+            AgentTaskExecutor.submit(batchTask);
+
+        } finally {
+            tbl.readUnlock();
+        }
+
+        return true;
+    }
+
+
     public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, 
TFinishTaskRequest request) {
         Preconditions.checkState(task.getJobId() == jobId);
 
@@ -171,6 +226,20 @@ public class BackupJob extends AbstractJob {
                         "make snapshot failed, version already merged");
                 cancelInternal();
             }
+
+            if (request.getTaskStatus().getStatusCode() == 
TStatusCode.TABLET_MISSING
+                    && !tryNewTabletSnapshotTask(task)) {
+                status = new Status(ErrCode.NOT_FOUND,
+                        "make snapshot failed, failed to ge tablet, table will 
be droped or truncated");
+                cancelInternal();
+            }
+
+            if (request.getTaskStatus().getStatusCode() == 
TStatusCode.NOT_IMPLEMENTED_ERROR) {
+                status = new Status(ErrCode.COMMON_ERROR,
+                    "make snapshot failed, currently not support backup tablet 
with cooldowned remote data");
+                cancelInternal();
+            }
+
             return false;
         }
 
diff --git a/regression-test/suites/backup_restore/test_backup_cancelled.groovy 
b/regression-test/suites/backup_restore/test_backup_cancelled.groovy
new file mode 100644
index 00000000000..d9753af75e2
--- /dev/null
+++ b/regression-test/suites/backup_restore/test_backup_cancelled.groovy
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_cancelled", "backup_cancelled") {
+    String suiteName = "test_backup_cancelled"
+    String repoName = "${suiteName}_repo"
+    String dbName = "${suiteName}_db"
+    String tableName = "${suiteName}_table"
+    String snapshotName = "${suiteName}_snapshot"
+    String snapshotName_1 = "${suiteName}_snapshot1"
+
+    def syncer = getSyncer()
+    syncer.createS3Repository(repoName)
+
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+    sql """
+        CREATE TABLE ${dbName}.${tableName} (
+            `id` LARGEINT NOT NULL,
+            `count` LARGEINT SUM DEFAULT "0")
+        AGGREGATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 2
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+        """
+
+    List<String> values = []
+    for (int i = 1; i <= 10; ++i) {
+        values.add("(${i}, ${i})")
+    }
+    sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+    def result = sql "SELECT * FROM ${dbName}.${tableName}"
+    assertEquals(result.size(), values.size());
+
+    result = sql_return_maparray """show tablets from ${dbName}.${tableName}"""
+    assertNotNull(result)
+    def tabletId = null
+    for (def res : result) {
+        tabletId = res.TabletId
+        break
+    }
+
+    // test failed to get tablet when truncate or drop table
+
+    
GetDebugPoint().enableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure",
 [tablet_id:"${tabletId}", execute:3]);
+
+
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+        ON (${tableName})
+    """
+
+    while (syncer.checkSnapshotFinish(dbName) == false) {
+        Thread.sleep(3000)
+    }
+
+
+    
GetDebugPoint().disableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure")
+
+
+
+
+    // test missing versions when compaction or balance
+
+    
GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure",
 [tablet_id:"${tabletId}", execute:1]);
+
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName_1}
+        TO `${repoName}`
+        ON (${tableName})
+    """
+
+    while (syncer.checkSnapshotFinish(dbName) == false) {
+        Thread.sleep(3000)
+    }
+
+    
GetDebugPoint().disableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure");
+
+
+    def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+    assertTrue(snapshot != null)
+
+    sql "TRUNCATE TABLE ${dbName}.${tableName}"
+
+    sql """
+        RESTORE SNAPSHOT ${dbName}.${snapshotName}
+        FROM `${repoName}`
+        ON ( `${tableName}`)
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "reserve_replica" = "true"
+        )
+    """
+
+    while (syncer.checkSnapshotFinish(dbName) == false) {
+        Thread.sleep(3000)
+    }
+
+    result = sql "SELECT * FROM ${dbName}.${tableName}"
+    assertEquals(result.size(), values.size());
+
+    sql "DROP TABLE ${dbName}.${tableName} FORCE"
+    sql "DROP DATABASE ${dbName} FORCE"
+    sql "DROP REPOSITORY `${repoName}`"
+}
+
+
+suite("test_backup_cooldown_cancelled", "backup_cooldown_cancelled") {
+
+    String suiteName = "test_backup_cooldown_cancelled"
+    String resource_name = "resource_${suiteName}"
+    String policy_name= "policy_${suiteName}"
+    String dbName = "${suiteName}_db"
+    String tableName = "${suiteName}_table"
+    String snapshotName = "${suiteName}_snapshot"
+    String repoName = "${suiteName}_repo"
+
+    def syncer = getSyncer()
+    syncer.createS3Repository(repoName)
+
+
+
+    sql """
+      CREATE RESOURCE IF NOT EXISTS "${resource_name}"
+      PROPERTIES(
+          "type"="s3",
+          "AWS_ENDPOINT" = "${getS3Endpoint()}",
+          "AWS_REGION" = "${getS3Region()}",
+          "AWS_ROOT_PATH" = "regression/cooldown",
+          "AWS_ACCESS_KEY" = "${getS3AK()}",
+          "AWS_SECRET_KEY" = "${getS3SK()}",
+          "AWS_MAX_CONNECTIONS" = "50",
+          "AWS_REQUEST_TIMEOUT_MS" = "3000",
+          "AWS_CONNECTION_TIMEOUT_MS" = "1000",
+          "AWS_BUCKET" = "${getS3BucketName()}",
+          "s3_validity_check" = "true"
+      );
+    """
+
+    sql """
+      CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
+      PROPERTIES(
+          "storage_resource" = "${resource_name}",
+          "cooldown_ttl" = "300"
+      )
+    """
+
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+
+    sql """
+     CREATE TABLE ${dbName}.${tableName} 
+     (
+         k1 BIGINT,
+         v1 VARCHAR(48)
+     )
+     DUPLICATE KEY(k1)
+     DISTRIBUTED BY HASH (k1) BUCKETS 3
+     PROPERTIES(
+         "storage_policy" = "${policy_name}",
+         "replication_allocation" = "tag.location.default: 1"
+     );
+     """
+
+
+    // test backup cooldown table and should be cancelled
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+        ON (${tableName})
+    """
+
+    while (syncer.checkSnapshotFinish(dbName) == false) {
+        Thread.sleep(3000)
+    }
+
+    //cleanup
+    sql "DROP TABLE ${dbName}.${tableName} FORCE"
+    sql "DROP DATABASE ${dbName} FORCE"
+    sql "DROP REPOSITORY `${repoName}`"
+
+    sql """
+    drop storage policy ${policy_name};
+    """
+
+    sql """
+    drop resource ${resource_name};
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to