This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0d304f1738c [feat](cloud) Ignore the staled clusters of the rollbacked
instance (#57478)
0d304f1738c is described below
commit 0d304f1738ca939ef272c639c1bd93b308c1ee0c
Author: walter <[email protected]>
AuthorDate: Mon Nov 3 14:36:40 2025 +0800
[feat](cloud) Ignore the staled clusters of the rollbacked instance (#57478)
during the instance refreshing
---
cloud/src/resource-manager/resource_manager.cpp | 17 ++++-
cloud/test/resource_test.cpp | 97 +++++++++++++++++++++++++
2 files changed, 111 insertions(+), 3 deletions(-)
diff --git a/cloud/src/resource-manager/resource_manager.cpp
b/cloud/src/resource-manager/resource_manager.cpp
index 58e7daf02fc..ad412fb57f8 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -1413,16 +1413,27 @@ std::pair<MetaServiceCode, std::string>
ResourceManager::refresh_instance(
void ResourceManager::refresh_instance(const std::string& instance_id,
const InstanceInfoPB& instance) {
+ bool is_succeed_instance = instance.has_original_instance_id();
+ std::string source_instance_id = is_succeed_instance ?
instance.source_instance_id() : "";
+
std::lock_guard l(mtx_);
for (auto i = node_info_.begin(); i != node_info_.end();) {
- if (i->second.instance_id != instance_id) {
+ // erase all nodes belong to this instance_id
+ if (i->second.instance_id != instance_id &&
+ // ... or, if is_succeed_instance, erase nodes belong to
source_instance_id
+ (!is_succeed_instance || i->second.instance_id !=
source_instance_id)) {
++i;
continue;
}
i = node_info_.erase(i);
}
- for (int i = 0; i < instance.clusters_size(); ++i) {
- add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
+
+ // If succeed_instance_id is set, it means this instance has a succeeded
instance,
+ // so we do not need to add its clusters to the index again.
+ if (!instance.has_succeed_instance_id()) {
+ for (int i = 0; i < instance.clusters_size(); ++i) {
+ add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
+ }
}
if (instance.has_multi_version_status()) {
diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp
index 13b0bcb191c..0e198ddb542 100644
--- a/cloud/test/resource_test.cpp
+++ b/cloud/test/resource_test.cpp
@@ -61,6 +61,9 @@ int main(int argc, char** argv) {
namespace doris::cloud {
+extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+
static std::shared_ptr<TxnKv> create_txn_kv() {
// MemKv
int ret = 0;
@@ -852,4 +855,98 @@ TEST(AkSkCascadeTest, ChildWithoutObjInfo) {
sp->clear_all_call_backs();
}
+TEST(ResourceTest, RollbackInstance) {
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 0;
+ auto* key = try_any_cast<std::string*>(args[1]);
+ *key = "test";
+ auto* key_id = try_any_cast<int64_t*>(args[2]);
+ *key_id = 1;
+ });
+ sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
+ auto* key = try_any_cast<std::string*>(args[0]);
+ *key = "test";
+ auto* ret = try_any_cast<int*>(args[1]);
+ *ret = 0;
+ });
+ sp->enable_processing();
+
+ auto txn_kv = create_txn_kv();
+ auto meta_service = get_meta_service(txn_kv);
+
+ // Create original instance
+ std::string original_instance_id = "original_instance";
+ create_instance(meta_service.get(), original_instance_id);
+ create_cluster(meta_service.get(), original_instance_id, "cluster_id_1",
"cluster_1",
+ ClusterPB::COMPUTE);
+
+ // Add a node to original instance
+ std::vector<NodeInfo> to_add;
+ std::vector<NodeInfo> to_del;
+ auto ni = NodeInfo {.role = Role::COMPUTE_NODE,
+ .instance_id = original_instance_id,
+ .cluster_name = "cluster_1",
+ .cluster_id = "cluster_id_1",
+ .node_info = NodeInfoPB {}};
+ ni.node_info.set_ip("127.0.0.1");
+ ni.node_info.set_cloud_unique_id("cloud_unique_id_1");
+ ni.node_info.set_heartbeat_port(9999);
+ to_add.push_back(ni);
+
+ auto r = meta_service->resource_mgr()->modify_nodes(original_instance_id,
to_add, to_del);
+ ASSERT_EQ(r, "");
+
+ // Simulate rollback operation: create new instance with source_instance_id
+ std::string new_instance_id = "new_instance_after_rollback";
+ create_instance(meta_service.get(), new_instance_id);
+
+ // Set source_instance_id and original_instance_id for new instance
+ // Set succeed_instance_id for old instance
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+
+ // Update new instance with source info
+ InstanceInfoPB new_instance;
+ std::string new_key = instance_key(new_instance_id);
+ std::string new_val;
+ ASSERT_EQ(txn->get(new_key, &new_val), TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(new_instance.ParseFromString(new_val));
+
+ new_instance.set_source_instance_id(original_instance_id);
+ new_instance.set_original_instance_id(original_instance_id);
+ new_instance.set_source_snapshot_id("00000000000000000000"); // Valid
versionstamp format
+
+ // Inherit cluster from original instance
+ InstanceInfoPB original_instance;
+ std::string original_key = instance_key(original_instance_id);
+ std::string original_val;
+ ASSERT_EQ(txn->get(original_key, &original_val), TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(original_instance.ParseFromString(original_val));
+
+
new_instance.mutable_clusters()->CopyFrom(original_instance.clusters());
+ txn->put(new_key, new_instance.SerializeAsString());
+
+ // Update original instance with succeed_instance_id
+ original_instance.set_succeed_instance_id(new_instance_id);
+ txn->put(original_key, original_instance.SerializeAsString());
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ meta_service->resource_mgr()->refresh_instance(new_instance_id);
+
+ std::string instance_id = get_instance_id(meta_service->resource_mgr(),
"cloud_unique_id_1");
+ ASSERT_EQ(instance_id, new_instance_id);
+
+ meta_service->resource_mgr()->refresh_instance(original_instance_id);
+ instance_id = get_instance_id(meta_service->resource_mgr(),
"cloud_unique_id_1");
+ ASSERT_EQ(instance_id, new_instance_id);
+
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]