This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new f985b998149 branch-4.1: [feat](cloud) drop instance should recycle the 
ancestors #61882 (#61933)
f985b998149 is described below

commit f985b998149dbf45e0e1b3fccb49e7082aa623d5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 31 19:00:13 2026 +0800

    branch-4.1: [feat](cloud) drop instance should recycle the ancestors #61882 
(#61933)
    
    Cherry-picked from #61882
    
    Co-authored-by: walter <[email protected]>
---
 cloud/src/meta-service/meta_service_resource.cpp | 247 +++++++++++++++++------
 cloud/src/recycler/recycler.h                    |   3 +-
 cloud/src/recycler/recycler_snapshot.cpp         |   8 +-
 cloud/src/snapshot/snapshot_manager.cpp          |  96 +++++++++
 cloud/src/snapshot/snapshot_manager.h            |   8 +
 5 files changed, 300 insertions(+), 62 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index ada3f12b543..57127e642d0 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -2148,6 +2148,175 @@ std::pair<MetaServiceCode, std::string> 
handle_snapshot_intervals(const std::str
     return std::make_pair(MetaServiceCode::OK, "");
 }
 
+static std::pair<MetaServiceCode, std::string> get_instance_info(Transaction* 
txn,
+                                                                 const 
std::string& instance_id,
+                                                                 
InstanceInfoPB* instance) {
+    std::string instance_val;
+    TxnErrorCode err = txn->get(instance_key({instance_id}), &instance_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        return {cast_as<ErrCategory::READ>(err),
+                fmt::format("failed to get instance, instance_id={}, err={}", 
instance_id, err)};
+    }
+    if (!instance->ParseFromString(instance_val)) {
+        return {MetaServiceCode::PROTOBUF_PARSE_ERR,
+                fmt::format("failed to parse instance proto, instance_id={}", 
instance_id)};
+    }
+    return {MetaServiceCode::OK, ""};
+}
+
+// Traverses the predecessor chain of the given instance, populating 
`predecessors` with the
+// instance info of each predecessor.
+static std::pair<MetaServiceCode, std::string> get_predecessor_instances(
+        Transaction* txn, const InstanceInfoPB& instance,
+        std::vector<InstanceInfoPB>* predecessors) {
+    std::string instance_id = instance.original_instance_id();
+    while (!instance_id.empty() && instance_id != instance.instance_id()) {
+        InstanceInfoPB current_instance;
+        auto [code, msg] = get_instance_info(txn, instance_id, 
&current_instance);
+        if (code != MetaServiceCode::OK) {
+            return {code, std::move(msg)};
+        }
+        instance_id = current_instance.successor_instance_id();
+        predecessors->emplace_back(std::move(current_instance));
+    }
+    return {MetaServiceCode::OK, ""};
+}
+
+static std::pair<MetaServiceCode, std::string> ensure_all_snapshot_recycled(
+        Transaction* txn, const std::string& instance_id,
+        std::optional<std::reference_wrapper<std::unordered_set<std::string>>>
+                exclude_snapshot_set) {
+    MetaReader meta_reader(instance_id);
+    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
+    TxnErrorCode err = meta_reader.get_snapshots(txn, &snapshots);
+    if (err != TxnErrorCode::TXN_OK) {
+        std::string msg = fmt::format("failed to get snapshots, err={}", err);
+        LOG(WARNING) << msg << " instance_id=" << instance_id;
+        return {cast_as<ErrCategory::READ>(err), std::move(msg)};
+    }
+    for (auto& [snapshot, versionstamp] : snapshots) {
+        std::string snapshot_id = 
SnapshotManager::serialize_snapshot_id(versionstamp);
+        if (exclude_snapshot_set.has_value() &&
+            exclude_snapshot_set.value().get().count(snapshot_id) > 0) {
+            continue;
+        }
+        if (snapshot.status() != SnapshotStatus::SNAPSHOT_RECYCLED) {
+            std::string msg = fmt::format(
+                    "failed to drop instance, instance has snapshots, 
snapshot_id={}", snapshot_id);
+            LOG(WARNING) << msg << " instance_id=" << instance_id
+                         << " snapshot=" << proto_to_json(snapshot);
+            return {MetaServiceCode::INVALID_ARGUMENT, std::move(msg)};
+        }
+    }
+    return {MetaServiceCode::OK, ""};
+}
+
+static std::pair<MetaServiceCode, std::string> drop_single_instance(const 
std::string& instance_id,
+                                                                    
Transaction* txn,
+                                                                    
InstanceInfoPB* instance) {
+    auto [code, msg] = ensure_all_snapshot_recycled(txn, instance_id, 
std::nullopt);
+    if (code != MetaServiceCode::OK) {
+        return {code, std::move(msg)};
+    }
+
+    instance->set_status(InstanceInfoPB::DELETED);
+    
instance->set_mtime(duration_cast<seconds>(system_clock::now().time_since_epoch()).count());
+
+    std::string serialized = instance->SerializeAsString();
+    if (serialized.empty()) {
+        std::string msg = "failed to serialize";
+        LOG(ERROR) << msg;
+        return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, std::move(msg)};
+    }
+    LOG(INFO) << "drop instance_id=" << instance_id << " json=" << 
proto_to_json(*instance);
+
+    if (instance->has_source_instance_id() && 
instance->has_source_snapshot_id() &&
+        !instance->source_instance_id().empty() && 
!instance->source_snapshot_id().empty()) {
+        Versionstamp snapshot_versionstamp;
+        if 
(!SnapshotManager::parse_snapshot_versionstamp(instance->source_snapshot_id(),
+                                                          
&snapshot_versionstamp)) {
+            std::string msg = "failed to parse snapshot_id to versionstamp, 
snapshot_id=" +
+                              instance->source_snapshot_id();
+            LOG(WARNING) << msg;
+            return {MetaServiceCode::INVALID_ARGUMENT, std::move(msg)};
+        }
+        std::string snapshot_ref_key = versioned::snapshot_reference_key(
+                {instance->source_instance_id(), snapshot_versionstamp, 
instance_id});
+        txn->remove(snapshot_ref_key);
+    }
+    return {MetaServiceCode::OK, std::move(serialized)};
+}
+
+// Handles the instance-chain drop: all instances from the original up to and 
including the tail
+// are marked DELETED atomically within the same transaction.
+static std::pair<MetaServiceCode, std::string> drop_instance_chain(
+        const std::string& tail_instance_id, Transaction* txn, InstanceInfoPB* 
tail_instance) {
+    std::vector<InstanceInfoPB> predecessors;
+    if (auto [code, err] = get_predecessor_instances(txn, *tail_instance, 
&predecessors);
+        code != MetaServiceCode::OK) {
+        LOG(WARNING) << "drop instance chain: " << err;
+        return {code, std::move(err)};
+    }
+
+    std::unordered_set<std::string> exclude_snapshot_set;
+    if (auto [code, err] = ensure_all_snapshot_recycled(txn, tail_instance_id, 
std::nullopt);
+        code != MetaServiceCode::OK) {
+        return {code, std::move(err)};
+    }
+    exclude_snapshot_set.insert(tail_instance->source_snapshot_id());
+
+    auto remove_snapshot_reference_key = [&](const InstanceInfoPB& instance) {
+        Versionstamp vs;
+        if 
(SnapshotManager::parse_snapshot_versionstamp(instance.source_snapshot_id(), 
&vs)) {
+            txn->remove(versioned::snapshot_reference_key(
+                    {instance.source_instance_id(), vs, 
instance.instance_id()}));
+        }
+    };
+    remove_snapshot_reference_key(*tail_instance);
+    for (auto& instance : predecessors) {
+        if (instance.has_source_snapshot_id() && 
!instance.source_snapshot_id().empty()) {
+            exclude_snapshot_set.insert(instance.source_snapshot_id());
+            remove_snapshot_reference_key(instance);
+        }
+    }
+
+    for (auto& instance : predecessors) {
+        if (auto [code, err] =
+                    ensure_all_snapshot_recycled(txn, instance.instance_id(), 
exclude_snapshot_set);
+            code != MetaServiceCode::OK) {
+            return {code, std::move(err)};
+        }
+    }
+
+    int64_t now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+
+    for (auto& instance : predecessors) {
+        instance.set_status(InstanceInfoPB::DELETED);
+        instance.set_mtime(now);
+        std::string serialized;
+        if (!instance.SerializeToString(&serialized)) {
+            std::string msg =
+                    fmt::format("failed to serialize instance_id={}", 
instance.instance_id());
+            LOG(ERROR) << msg;
+            return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, std::move(msg)};
+        }
+        txn->put(instance_key({instance.instance_id()}), serialized);
+        LOG(INFO) << "marking chain predecessor as DELETED, instance_id=" << 
instance.instance_id();
+    }
+
+    tail_instance->set_status(InstanceInfoPB::DELETED);
+    tail_instance->set_mtime(now);
+    std::string serialized = tail_instance->SerializeAsString();
+    if (serialized.empty()) {
+        std::string msg = "failed to serialize";
+        LOG(ERROR) << msg;
+        return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, std::move(msg)};
+    }
+    LOG(INFO) << "drop instance_id=" << tail_instance_id << " and " << 
predecessors.size()
+              << " predecessor instances, json=" << 
proto_to_json(*tail_instance);
+    return {MetaServiceCode::OK, std::move(serialized)};
+}
+
 void MetaServiceImpl::alter_instance(google::protobuf::RpcController* 
controller,
                                      const AlterInstanceRequest* request,
                                      AlterInstanceResponse* response,
@@ -2175,65 +2344,29 @@ void 
MetaServiceImpl::alter_instance(google::protobuf::RpcController* controller
     std::pair<MetaServiceCode, std::string> ret;
     switch (request->op()) {
     case AlterInstanceRequest::DROP: {
-        ret = alter_instance(
-                request, [&request, &instance_id](Transaction* txn, 
InstanceInfoPB* instance) {
-                    std::string msg;
-                    // check instance doesn't have any cluster.
-                    if (instance->clusters_size() != 0) {
-                        msg = "failed to drop instance, instance has clusters";
-                        LOG(WARNING) << msg;
-                        return 
std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
-                    }
-
-                    // check instance doesn't have any snapshot.
-                    MetaReader meta_reader(instance_id);
-                    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
-                    TxnErrorCode err = meta_reader.get_snapshots(txn, 
&snapshots);
-                    if (err != TxnErrorCode::TXN_OK) {
-                        msg = "failed to get snapshots";
-                        LOG(WARNING) << msg << " err=" << err;
-                        return std::make_pair(cast_as<ErrCategory::READ>(err), 
msg);
-                    }
-                    for (auto& [snapshot, _] : snapshots) {
-                        if (snapshot.status() != 
SnapshotStatus::SNAPSHOT_RECYCLED) {
-                            // still has snapshots, cannot drop
-                            msg = "failed to drop instance, instance has 
snapshots";
-                            LOG(WARNING) << msg;
-                            return 
std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
-                        }
-                    }
+        ret = alter_instance(request, [&instance_id](Transaction* txn, 
InstanceInfoPB* instance) {
+            // check instance doesn't have any cluster.
+            if (instance->clusters_size() != 0) {
+                std::string msg = "failed to drop instance, instance has 
clusters";
+                LOG(WARNING) << msg;
+                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
+            }
 
-                    instance->set_status(InstanceInfoPB::DELETED);
-                    instance->set_mtime(
-                            
duration_cast<seconds>(system_clock::now().time_since_epoch()).count());
+            if (instance->has_successor_instance_id() &&
+                !instance->successor_instance_id().empty()) {
+                std::string msg = fmt::format(
+                        "failed to drop instance, instance has a successor; 
please drop "
+                        "the chain tail instead, successor_instance_id={}",
+                        instance->successor_instance_id());
+                LOG(WARNING) << msg;
+                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, 
std::move(msg));
+            }
 
-                    std::string ret = instance->SerializeAsString();
-                    if (ret.empty()) {
-                        msg = "failed to serialize";
-                        LOG(ERROR) << msg;
-                        return 
std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
-                    }
-                    LOG(INFO) << "put instance_id=" << request->instance_id()
-                              << "drop instance json=" << 
proto_to_json(*instance);
-
-                    if (instance->has_source_instance_id() && 
instance->has_source_snapshot_id() &&
-                        !instance->source_instance_id().empty() &&
-                        !instance->source_snapshot_id().empty()) {
-                        Versionstamp snapshot_versionstamp;
-                        if (!SnapshotManager::parse_snapshot_versionstamp(
-                                    instance->source_snapshot_id(), 
&snapshot_versionstamp)) {
-                            msg = "failed to parse snapshot_id to 
versionstamp, snapshot_id=" +
-                                  instance->source_snapshot_id();
-                            LOG(WARNING) << msg;
-                            return 
std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
-                        }
-                        versioned::SnapshotReferenceKeyInfo ref_key_info {
-                                instance->source_instance_id(), 
snapshot_versionstamp, instance_id};
-                        std::string reference_key = 
versioned::snapshot_reference_key(ref_key_info);
-                        txn->remove(reference_key);
-                    }
-                    return std::make_pair(MetaServiceCode::OK, ret);
-                });
+            bool is_chain = instance->has_original_instance_id() &&
+                            !instance->original_instance_id().empty();
+            return is_chain ? drop_instance_chain(instance_id, txn, instance)
+                            : drop_single_instance(instance_id, txn, instance);
+        });
     } break;
     case AlterInstanceRequest::RENAME: {
         ret = alter_instance(request, [&request](Transaction* txn, 
InstanceInfoPB* instance) {
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 6598a44aff7..cf5cd4ea034 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -415,7 +415,8 @@ public:
     }
 
     // Recycle snapshot meta and data, return 0 for success otherwise error.
-    int recycle_snapshot_meta_and_data(const std::string& resource_id,
+    int recycle_snapshot_meta_and_data(const std::string& instance_id,
+                                       const std::string& resource_id,
                                        Versionstamp snapshot_version,
                                        const SnapshotPB& snapshot_pb);
 
diff --git a/cloud/src/recycler/recycler_snapshot.cpp 
b/cloud/src/recycler/recycler_snapshot.cpp
index 17bc4a19338..6b8203dbc10 100644
--- a/cloud/src/recycler/recycler_snapshot.cpp
+++ b/cloud/src/recycler/recycler_snapshot.cpp
@@ -43,19 +43,19 @@ int InstanceRecycler::recycle_cluster_snapshots() {
     return snapshot_manager_->recycle_snapshots(this);
 }
 
-int InstanceRecycler::recycle_snapshot_meta_and_data(const std::string& 
resource_id,
+int InstanceRecycler::recycle_snapshot_meta_and_data(const std::string& 
instance_id,
+                                                     const std::string& 
resource_id,
                                                      Versionstamp 
snapshot_version,
                                                      const SnapshotPB& 
snapshot_pb) {
     auto it = accessor_map_.find(resource_id);
     if (it == accessor_map_.end()) {
         LOG(WARNING) << "no accessor for resource, cannot recycle snapshot 
data"
-                     << ", instance_id=" << instance_id_
-                     << ", resource_id=" << instance_info_.resource_ids(0);
+                     << ", instance_id=" << instance_id << ", resource_id=" << 
resource_id;
         return -1;
     }
 
     return snapshot_manager_->recycle_snapshot_meta_and_data(
-            instance_id_, resource_id, it->second.get(), snapshot_version, 
snapshot_pb);
+            instance_id, resource_id, it->second.get(), snapshot_version, 
snapshot_pb);
 }
 
 int InstanceRecycler::has_cluster_snapshots(bool* any) {
diff --git a/cloud/src/snapshot/snapshot_manager.cpp 
b/cloud/src/snapshot/snapshot_manager.cpp
index ac2b9e22d44..2ba49d2adde 100644
--- a/cloud/src/snapshot/snapshot_manager.cpp
+++ b/cloud/src/snapshot/snapshot_manager.cpp
@@ -20,7 +20,9 @@
 #include <fmt/format.h>
 
 #include "common/logging.h"
+#include "meta-service/meta_service_helper.h"
 #include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
 #include "meta-store/versionstamp.h"
 #include "recycler/checker.h"
 #include "recycler/recycler.h"
@@ -262,4 +264,98 @@ int 
SnapshotManager::compact_snapshot_chains(InstanceChainCompactor* compactor)
     return -1;
 }
 
+static std::pair<MetaServiceCode, std::string> get_instance(Transaction* txn,
+                                                            std::string_view 
instance_id,
+                                                            InstanceInfoPB* 
instance_info) {
+    InstanceKeyInfo instance_key_info {instance_id};
+    std::string key = instance_key(instance_key_info);
+    std::string val;
+    TxnErrorCode err = txn->get(key, &val);
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        return {MetaServiceCode::INVALID_ARGUMENT,
+                fmt::format("instance not found, instance_id={}", 
instance_id)};
+    } else if (err != TxnErrorCode::TXN_OK) {
+        return {cast_as<ErrCategory::READ>(err),
+                fmt::format("failed to get instance, instance_id={}, err={}", 
instance_id, err)};
+    }
+
+    if (!instance_info->ParseFromString(val)) {
+        return {MetaServiceCode::INVALID_ARGUMENT, "failed to parse instance 
info"};
+    }
+    return {MetaServiceCode::OK, ""};
+}
+
+std::pair<MetaServiceCode, std::string> SnapshotManager::get_all_snapshots(
+        Transaction* txn, std::string_view instance_id, std::string_view 
required_snapshot_id,
+        std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) {
+    Versionstamp required_snapshot_versionstamp;
+    if (!required_snapshot_id.empty()) {
+        if (!parse_snapshot_versionstamp(required_snapshot_id, 
&required_snapshot_versionstamp)) {
+            return {MetaServiceCode::INVALID_ARGUMENT, "invalid snapshot_id 
format"};
+        }
+    }
+
+    InstanceInfoPB instance_info;
+    auto [code, error_msg] = get_instance(txn, instance_id, &instance_info);
+    if (code != MetaServiceCode::OK) {
+        return {code, error_msg};
+    }
+    std::string current_instance_id(instance_id);
+    if (instance_info.has_original_instance_id() && 
!instance_info.original_instance_id().empty()) {
+        // the earliest instance_id for rollback
+        current_instance_id = instance_info.original_instance_id();
+    }
+
+    std::unordered_set<std::string> visited;
+    do {
+        MetaReader meta_reader(current_instance_id);
+        if (required_snapshot_id.empty()) {
+            TxnErrorCode err = meta_reader.get_snapshots(txn, snapshots);
+            if (err != TxnErrorCode::TXN_OK) {
+                return {cast_as<ErrCategory::READ>(err), "failed to get 
snapshots"};
+            }
+        } else {
+            SnapshotPB snapshot_pb;
+            TxnErrorCode err =
+                    meta_reader.get_snapshot(txn, 
required_snapshot_versionstamp, &snapshot_pb);
+            if (err == TxnErrorCode::TXN_OK) {
+                snapshots->emplace_back(snapshot_pb, 
required_snapshot_versionstamp);
+                return {MetaServiceCode::OK, ""};
+            } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                return {cast_as<ErrCategory::READ>(err), "failed to get 
snapshot"};
+            }
+        }
+        if (current_instance_id == instance_id) {
+            break;
+        }
+        auto [code, error_msg] = get_instance(txn, current_instance_id, 
&instance_info);
+        if (code != MetaServiceCode::OK) {
+            std::string msg = fmt::format("failed to get ancestor instance {}: 
{}",
+                                          current_instance_id, error_msg);
+            LOG_WARNING(msg);
+            return {code, msg};
+        }
+        if (!instance_info.has_successor_instance_id() ||
+            instance_info.successor_instance_id().empty()) {
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            error_msg = fmt::format(
+                    "successor_instance_id is empty for current 
instance_id={}, instance_id={}",
+                    current_instance_id, instance_id);
+            LOG_WARNING(error_msg);
+            return {code, error_msg};
+        }
+        if (visited.count(current_instance_id) > 0) {
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            error_msg = fmt::format(
+                    "cycle detected in instance chain, current instance_id={}, 
instance_id={}",
+                    current_instance_id, instance_id);
+            LOG_WARNING(error_msg);
+            return {code, error_msg};
+        }
+        visited.insert(current_instance_id);
+        current_instance_id = instance_info.successor_instance_id();
+    } while (true);
+    return {MetaServiceCode::OK, ""};
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/snapshot/snapshot_manager.h 
b/cloud/src/snapshot/snapshot_manager.h
index 29e8771c8c4..996558b4e5f 100644
--- a/cloud/src/snapshot/snapshot_manager.h
+++ b/cloud/src/snapshot/snapshot_manager.h
@@ -94,6 +94,14 @@ public:
     static bool parse_snapshot_versionstamp(std::string_view snapshot_id,
                                             Versionstamp* versionstamp);
 
+    // Get all snapshots of the specific instance.
+    //
+    // If the instance is created by rollback, also get the snapshots of all 
its predecessor instances.
+    // If the required_snapshot_id is not empty, only get the snapshot with 
the specific snapshot_id.
+    static std::pair<MetaServiceCode, std::string> get_all_snapshots(
+            Transaction* txn, std::string_view instance_id, std::string_view 
required_snapshot_id,
+            std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots);
+
     // Migrate the single version keys to multi-version keys for the instance.
     // Return 0 for success otherwise error.
     virtual int migrate_to_versioned_keys(InstanceDataMigrator* migrator);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to