This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d304f1738c [feat](cloud) Ignore the staled clusters of the rollbacked 
instance (#57478)
0d304f1738c is described below

commit 0d304f1738ca939ef272c639c1bd93b308c1ee0c
Author: walter <[email protected]>
AuthorDate: Mon Nov 3 14:36:40 2025 +0800

    [feat](cloud) Ignore the staled clusters of the rollbacked instance (#57478)
    
    during the instance refreshing
---
 cloud/src/resource-manager/resource_manager.cpp | 17 ++++-
 cloud/test/resource_test.cpp                    | 97 +++++++++++++++++++++++++
 2 files changed, 111 insertions(+), 3 deletions(-)

diff --git a/cloud/src/resource-manager/resource_manager.cpp 
b/cloud/src/resource-manager/resource_manager.cpp
index 58e7daf02fc..ad412fb57f8 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -1413,16 +1413,27 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::refresh_instance(
 
 void ResourceManager::refresh_instance(const std::string& instance_id,
                                        const InstanceInfoPB& instance) {
+    bool is_succeed_instance = instance.has_original_instance_id();
+    std::string source_instance_id = is_succeed_instance ? 
instance.source_instance_id() : "";
+
     std::lock_guard l(mtx_);
     for (auto i = node_info_.begin(); i != node_info_.end();) {
-        if (i->second.instance_id != instance_id) {
+        // erase all nodes belong to this instance_id
+        if (i->second.instance_id != instance_id &&
+            // ... or, if is_succeed_instance, erase nodes belong to 
source_instance_id
+            (!is_succeed_instance || i->second.instance_id != 
source_instance_id)) {
             ++i;
             continue;
         }
         i = node_info_.erase(i);
     }
-    for (int i = 0; i < instance.clusters_size(); ++i) {
-        add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
+
+    // If succeed_instance_id is set, it means this instance has a succeeded 
instance,
+    // so we do not need to add its clusters to the index again.
+    if (!instance.has_succeed_instance_id()) {
+        for (int i = 0; i < instance.clusters_size(); ++i) {
+            add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
+        }
     }
 
     if (instance.has_multi_version_status()) {
diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp
index 13b0bcb191c..0e198ddb542 100644
--- a/cloud/test/resource_test.cpp
+++ b/cloud/test/resource_test.cpp
@@ -61,6 +61,9 @@ int main(int argc, char** argv) {
 
 namespace doris::cloud {
 
+extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& 
rc_mgr,
+                                   const std::string& cloud_unique_id);
+
 static std::shared_ptr<TxnKv> create_txn_kv() {
     // MemKv
     int ret = 0;
@@ -852,4 +855,98 @@ TEST(AkSkCascadeTest, ChildWithoutObjInfo) {
     sp->clear_all_call_backs();
 }
 
+TEST(ResourceTest, RollbackInstance) {
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+        auto* ret = try_any_cast<int*>(args[0]);
+        *ret = 0;
+        auto* key = try_any_cast<std::string*>(args[1]);
+        *key = "test";
+        auto* key_id = try_any_cast<int64_t*>(args[2]);
+        *key_id = 1;
+    });
+    sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
+        auto* key = try_any_cast<std::string*>(args[0]);
+        *key = "test";
+        auto* ret = try_any_cast<int*>(args[1]);
+        *ret = 0;
+    });
+    sp->enable_processing();
+
+    auto txn_kv = create_txn_kv();
+    auto meta_service = get_meta_service(txn_kv);
+
+    // Create original instance
+    std::string original_instance_id = "original_instance";
+    create_instance(meta_service.get(), original_instance_id);
+    create_cluster(meta_service.get(), original_instance_id, "cluster_id_1", 
"cluster_1",
+                   ClusterPB::COMPUTE);
+
+    // Add a node to original instance
+    std::vector<NodeInfo> to_add;
+    std::vector<NodeInfo> to_del;
+    auto ni = NodeInfo {.role = Role::COMPUTE_NODE,
+                        .instance_id = original_instance_id,
+                        .cluster_name = "cluster_1",
+                        .cluster_id = "cluster_id_1",
+                        .node_info = NodeInfoPB {}};
+    ni.node_info.set_ip("127.0.0.1");
+    ni.node_info.set_cloud_unique_id("cloud_unique_id_1");
+    ni.node_info.set_heartbeat_port(9999);
+    to_add.push_back(ni);
+
+    auto r = meta_service->resource_mgr()->modify_nodes(original_instance_id, 
to_add, to_del);
+    ASSERT_EQ(r, "");
+
+    // Simulate rollback operation: create new instance with source_instance_id
+    std::string new_instance_id = "new_instance_after_rollback";
+    create_instance(meta_service.get(), new_instance_id);
+
+    // Set source_instance_id and original_instance_id for new instance
+    // Set succeed_instance_id for old instance
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+
+        // Update new instance with source info
+        InstanceInfoPB new_instance;
+        std::string new_key = instance_key(new_instance_id);
+        std::string new_val;
+        ASSERT_EQ(txn->get(new_key, &new_val), TxnErrorCode::TXN_OK);
+        ASSERT_TRUE(new_instance.ParseFromString(new_val));
+
+        new_instance.set_source_instance_id(original_instance_id);
+        new_instance.set_original_instance_id(original_instance_id);
+        new_instance.set_source_snapshot_id("00000000000000000000"); // Valid 
versionstamp format
+
+        // Inherit cluster from original instance
+        InstanceInfoPB original_instance;
+        std::string original_key = instance_key(original_instance_id);
+        std::string original_val;
+        ASSERT_EQ(txn->get(original_key, &original_val), TxnErrorCode::TXN_OK);
+        ASSERT_TRUE(original_instance.ParseFromString(original_val));
+
+        
new_instance.mutable_clusters()->CopyFrom(original_instance.clusters());
+        txn->put(new_key, new_instance.SerializeAsString());
+
+        // Update original instance with succeed_instance_id
+        original_instance.set_succeed_instance_id(new_instance_id);
+        txn->put(original_key, original_instance.SerializeAsString());
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    meta_service->resource_mgr()->refresh_instance(new_instance_id);
+
+    std::string instance_id = get_instance_id(meta_service->resource_mgr(), 
"cloud_unique_id_1");
+    ASSERT_EQ(instance_id, new_instance_id);
+
+    meta_service->resource_mgr()->refresh_instance(original_instance_id);
+    instance_id = get_instance_id(meta_service->resource_mgr(), 
"cloud_unique_id_1");
+    ASSERT_EQ(instance_id, new_instance_id);
+
+    sp->disable_processing();
+    sp->clear_all_call_backs();
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to