This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 8528a1bcb2d [bugfix](backup)(cooldown) cancel backup properly when be backup failed #38724 (#38994) 8528a1bcb2d is described below commit 8528a1bcb2d368b9db12e4d503395248146ad703 Author: walter <w41te...@gmail.com> AuthorDate: Wed Aug 7 19:41:16 2024 +0800 [bugfix](backup)(cooldown) cancel backup properly when be backup failed #38724 (#38994) cherry pick from #38724 Co-authored-by: zhangyuan <ayuanzh...@tencent.com> --- be/src/common/status.cpp | 7 + be/src/olap/snapshot_manager.cpp | 3 + be/src/olap/tablet.cpp | 8 + .../java/org/apache/doris/backup/BackupJob.java | 69 +++++++ .../backup_restore/test_backup_cancelled.groovy | 207 +++++++++++++++++++++ 5 files changed, 294 insertions(+) diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp index bc8ed22a235..910ad3d2d18 100644 --- a/be/src/common/status.cpp +++ b/be/src/common/status.cpp @@ -25,6 +25,13 @@ void Status::to_thrift(TStatus* s) const { return; } s->status_code = (int16_t)_code > 0 ? (TStatusCode::type)_code : TStatusCode::INTERNAL_ERROR; + + if (_code == ErrorCode::VERSION_ALREADY_MERGED) { + s->status_code = TStatusCode::OLAP_ERR_VERSION_ALREADY_MERGED; + } else if (_code == ErrorCode::TABLE_NOT_FOUND) { + s->status_code = TStatusCode::TABLET_MISSING; + } + s->error_msgs.push_back(fmt::format("({})[{}]{}", BackendOptions::get_localhost(), code_as_string(), _err_msg ? _err_msg->_msg : "")); s->__isset.error_msgs = true; diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index df7c442ff2c..0fcc09cec49 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -88,6 +88,9 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s TabletSharedPtr ref_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request.tablet_id); + + DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { ref_tablet = nullptr; }) + if (ref_tablet == nullptr) { return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. tablet={}", request.tablet_id); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 137bb26c1fb..8d8b47580ad 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -982,6 +982,14 @@ Status Tablet::capture_consistent_versions(const Version& spec_version, } } } + + DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", { + auto tablet_id = dp->param<int64>("tablet_id", -1); + if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) { + status = Status::Error<VERSION_ALREADY_MERGED>("version already merged"); + } + }); + return status; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 37581e848cb..937c97f473d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -159,6 +159,61 @@ public class BackupJob extends AbstractJob { return BackupContent.ALL; } + private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) { + Table table = env.getInternalCatalog().getTableByTableId(task.getTableId()); + if (table == null) { + return false; + } + OlapTable tbl = (OlapTable) table; + tbl.readLock(); + try { + if (tbl.getId() != task.getTableId()) { + return false; + } + Partition partition = tbl.getPartition(task.getPartitionId()); + if (partition == null) { + return false; + } + MaterializedIndex index = partition.getIndex(task.getIndexId()); + if (index == null) { + return false; + } + Tablet tablet = index.getTablet(task.getTabletId()); + if (tablet == null) { + return false; + } + Replica replica = chooseReplica(tablet, task.getVersion()); + if (replica == null) { + return false; + } + + //clear old task + AgentTaskQueue.removeTaskOfType(TTaskType.MAKE_SNAPSHOT, task.getTabletId()); + unfinishedTaskIds.remove(task.getTabletId()); + taskProgress.remove(task.getTabletId()); + taskErrMsg.remove(task.getTabletId()); + + SnapshotTask newTask = new SnapshotTask(null, replica.getBackendId(), task.getTabletId(), + task.getJobId(), task.getDbId(), tbl.getId(), task.getPartitionId(), + task.getIndexId(), task.getTabletId(), + task.getVersion(), + task.getSchemaHash(), timeoutMs, false /* not restore task */); + AgentBatchTask batchTask = new AgentBatchTask(); + batchTask.addTask(newTask); + unfinishedTaskIds.put(tablet.getId(), replica.getBackendId()); + + //send task + AgentTaskQueue.addTask(newTask); + AgentTaskExecutor.submit(batchTask); + + } finally { + tbl.readUnlock(); + } + + return true; + } + + public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) { Preconditions.checkState(task.getJobId() == jobId); @@ -171,6 +226,20 @@ public class BackupJob extends AbstractJob { "make snapshot failed, version already merged"); cancelInternal(); } + + if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING + && !tryNewTabletSnapshotTask(task)) { + status = new Status(ErrCode.NOT_FOUND, + "make snapshot failed, failed to ge tablet, table will be droped or truncated"); + cancelInternal(); + } + + if (request.getTaskStatus().getStatusCode() == TStatusCode.NOT_IMPLEMENTED_ERROR) { + status = new Status(ErrCode.COMMON_ERROR, + "make snapshot failed, currently not support backup tablet with cooldowned remote data"); + cancelInternal(); + } + return false; } diff --git a/regression-test/suites/backup_restore/test_backup_cancelled.groovy b/regression-test/suites/backup_restore/test_backup_cancelled.groovy new file mode 100644 index 00000000000..d9753af75e2 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_cancelled.groovy @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_cancelled", "backup_cancelled") { + String suiteName = "test_backup_cancelled" + String repoName = "${suiteName}_repo" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String snapshotName_1 = "${suiteName}_snapshot1" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0") + AGGREGATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + List<String> values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + def result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + result = sql_return_maparray """show tablets from ${dbName}.${tableName}""" + assertNotNull(result) + def tabletId = null + for (def res : result) { + tabletId = res.TabletId + break + } + + // test failed to get tablet when truncate or drop table + + GetDebugPoint().enableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure", [tablet_id:"${tabletId}", execute:3]); + + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + while (syncer.checkSnapshotFinish(dbName) == false) { + Thread.sleep(3000) + } + + + GetDebugPoint().disableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure") + + + + + // test missing versions when compaction or balance + + GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure", [tablet_id:"${tabletId}", execute:1]); + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName_1} + TO `${repoName}` + ON (${tableName}) + """ + + while (syncer.checkSnapshotFinish(dbName) == false) { + Thread.sleep(3000) + } + + GetDebugPoint().disableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure"); + + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + sql "TRUNCATE TABLE ${dbName}.${tableName}" + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + while (syncer.checkSnapshotFinish(dbName) == false) { + Thread.sleep(3000) + } + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} + + +suite("test_backup_cooldown_cancelled", "backup_cooldown_cancelled") { + + String suiteName = "test_backup_cooldown_cancelled" + String resource_name = "resource_${suiteName}" + String policy_name= "policy_${suiteName}" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name} + PROPERTIES( + "storage_resource" = "${resource_name}", + "cooldown_ttl" = "300" + ) + """ + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48) + ) + DUPLICATE KEY(k1) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "storage_policy" = "${policy_name}", + "replication_allocation" = "tag.location.default: 1" + ); + """ + + + // test backup cooldown table and should be cancelled + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + while (syncer.checkSnapshotFinish(dbName) == false) { + Thread.sleep(3000) + } + + //cleanup + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + sql """ + drop storage policy ${policy_name}; + """ + + sql """ + drop resource ${resource_name}; + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org