This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new f985b998149 branch-4.1: [feat](cloud) drop instance should recycle the
ancestors #61882 (#61933)
f985b998149 is described below
commit f985b998149dbf45e0e1b3fccb49e7082aa623d5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 31 19:00:13 2026 +0800
branch-4.1: [feat](cloud) drop instance should recycle the ancestors #61882
(#61933)
Cherry-picked from #61882
Co-authored-by: walter <[email protected]>
---
cloud/src/meta-service/meta_service_resource.cpp | 247 +++++++++++++++++------
cloud/src/recycler/recycler.h | 3 +-
cloud/src/recycler/recycler_snapshot.cpp | 8 +-
cloud/src/snapshot/snapshot_manager.cpp | 96 +++++++++
cloud/src/snapshot/snapshot_manager.h | 8 +
5 files changed, 300 insertions(+), 62 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index ada3f12b543..57127e642d0 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -2148,6 +2148,175 @@ std::pair<MetaServiceCode, std::string>
handle_snapshot_intervals(const std::str
return std::make_pair(MetaServiceCode::OK, "");
}
+static std::pair<MetaServiceCode, std::string> get_instance_info(Transaction*
txn,
+ const
std::string& instance_id,
+
InstanceInfoPB* instance) {
+ std::string instance_val;
+ TxnErrorCode err = txn->get(instance_key({instance_id}), &instance_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ return {cast_as<ErrCategory::READ>(err),
+ fmt::format("failed to get instance, instance_id={}, err={}",
instance_id, err)};
+ }
+ if (!instance->ParseFromString(instance_val)) {
+ return {MetaServiceCode::PROTOBUF_PARSE_ERR,
+ fmt::format("failed to parse instance proto, instance_id={}",
instance_id)};
+ }
+ return {MetaServiceCode::OK, ""};
+}
+
+// Traverses the predecessor chain of the given instance, populating
`predecessors` with the
+// instance info of each predecessor.
+static std::pair<MetaServiceCode, std::string> get_predecessor_instances(
+ Transaction* txn, const InstanceInfoPB& instance,
+ std::vector<InstanceInfoPB>* predecessors) {
+ std::string instance_id = instance.original_instance_id();
+ while (!instance_id.empty() && instance_id != instance.instance_id()) {
+ InstanceInfoPB current_instance;
+ auto [code, msg] = get_instance_info(txn, instance_id,
¤t_instance);
+ if (code != MetaServiceCode::OK) {
+ return {code, std::move(msg)};
+ }
+ instance_id = current_instance.successor_instance_id();
+ predecessors->emplace_back(std::move(current_instance));
+ }
+ return {MetaServiceCode::OK, ""};
+}
+
+static std::pair<MetaServiceCode, std::string> ensure_all_snapshot_recycled(
+ Transaction* txn, const std::string& instance_id,
+ std::optional<std::reference_wrapper<std::unordered_set<std::string>>>
+ exclude_snapshot_set) {
+ MetaReader meta_reader(instance_id);
+ std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
+ TxnErrorCode err = meta_reader.get_snapshots(txn, &snapshots);
+ if (err != TxnErrorCode::TXN_OK) {
+ std::string msg = fmt::format("failed to get snapshots, err={}", err);
+ LOG(WARNING) << msg << " instance_id=" << instance_id;
+ return {cast_as<ErrCategory::READ>(err), std::move(msg)};
+ }
+ for (auto& [snapshot, versionstamp] : snapshots) {
+ std::string snapshot_id =
SnapshotManager::serialize_snapshot_id(versionstamp);
+ if (exclude_snapshot_set.has_value() &&
+ exclude_snapshot_set.value().get().count(snapshot_id) > 0) {
+ continue;
+ }
+ if (snapshot.status() != SnapshotStatus::SNAPSHOT_RECYCLED) {
+ std::string msg = fmt::format(
+ "failed to drop instance, instance has snapshots,
snapshot_id={}", snapshot_id);
+ LOG(WARNING) << msg << " instance_id=" << instance_id
+ << " snapshot=" << proto_to_json(snapshot);
+ return {MetaServiceCode::INVALID_ARGUMENT, std::move(msg)};
+ }
+ }
+ return {MetaServiceCode::OK, ""};
+}
+
+static std::pair<MetaServiceCode, std::string> drop_single_instance(const
std::string& instance_id,
+
Transaction* txn,
+
InstanceInfoPB* instance) {
+ auto [code, msg] = ensure_all_snapshot_recycled(txn, instance_id,
std::nullopt);
+ if (code != MetaServiceCode::OK) {
+ return {code, std::move(msg)};
+ }
+
+ instance->set_status(InstanceInfoPB::DELETED);
+
instance->set_mtime(duration_cast<seconds>(system_clock::now().time_since_epoch()).count());
+
+ std::string serialized = instance->SerializeAsString();
+ if (serialized.empty()) {
+ std::string msg = "failed to serialize";
+ LOG(ERROR) << msg;
+ return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, std::move(msg)};
+ }
+ LOG(INFO) << "drop instance_id=" << instance_id << " json=" <<
proto_to_json(*instance);
+
+ if (instance->has_source_instance_id() &&
instance->has_source_snapshot_id() &&
+ !instance->source_instance_id().empty() &&
!instance->source_snapshot_id().empty()) {
+ Versionstamp snapshot_versionstamp;
+ if
(!SnapshotManager::parse_snapshot_versionstamp(instance->source_snapshot_id(),
+
&snapshot_versionstamp)) {
+ std::string msg = "failed to parse snapshot_id to versionstamp,
snapshot_id=" +
+ instance->source_snapshot_id();
+ LOG(WARNING) << msg;
+ return {MetaServiceCode::INVALID_ARGUMENT, std::move(msg)};
+ }
+ std::string snapshot_ref_key = versioned::snapshot_reference_key(
+ {instance->source_instance_id(), snapshot_versionstamp,
instance_id});
+ txn->remove(snapshot_ref_key);
+ }
+ return {MetaServiceCode::OK, std::move(serialized)};
+}
+
+// Handles the instance-chain drop: all instances from the original up to and
including the tail
+// are marked DELETED atomically within the same transaction.
+static std::pair<MetaServiceCode, std::string> drop_instance_chain(
+ const std::string& tail_instance_id, Transaction* txn, InstanceInfoPB*
tail_instance) {
+ std::vector<InstanceInfoPB> predecessors;
+ if (auto [code, err] = get_predecessor_instances(txn, *tail_instance,
&predecessors);
+ code != MetaServiceCode::OK) {
+ LOG(WARNING) << "drop instance chain: " << err;
+ return {code, std::move(err)};
+ }
+
+ std::unordered_set<std::string> exclude_snapshot_set;
+ if (auto [code, err] = ensure_all_snapshot_recycled(txn, tail_instance_id,
std::nullopt);
+ code != MetaServiceCode::OK) {
+ return {code, std::move(err)};
+ }
+ exclude_snapshot_set.insert(tail_instance->source_snapshot_id());
+
+ auto remove_snapshot_reference_key = [&](const InstanceInfoPB& instance) {
+ Versionstamp vs;
+ if
(SnapshotManager::parse_snapshot_versionstamp(instance.source_snapshot_id(),
&vs)) {
+ txn->remove(versioned::snapshot_reference_key(
+ {instance.source_instance_id(), vs,
instance.instance_id()}));
+ }
+ };
+ remove_snapshot_reference_key(*tail_instance);
+ for (auto& instance : predecessors) {
+ if (instance.has_source_snapshot_id() &&
!instance.source_snapshot_id().empty()) {
+ exclude_snapshot_set.insert(instance.source_snapshot_id());
+ remove_snapshot_reference_key(instance);
+ }
+ }
+
+ for (auto& instance : predecessors) {
+ if (auto [code, err] =
+ ensure_all_snapshot_recycled(txn, instance.instance_id(),
exclude_snapshot_set);
+ code != MetaServiceCode::OK) {
+ return {code, std::move(err)};
+ }
+ }
+
+ int64_t now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+
+ for (auto& instance : predecessors) {
+ instance.set_status(InstanceInfoPB::DELETED);
+ instance.set_mtime(now);
+ std::string serialized;
+ if (!instance.SerializeToString(&serialized)) {
+ std::string msg =
+ fmt::format("failed to serialize instance_id={}",
instance.instance_id());
+ LOG(ERROR) << msg;
+ return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, std::move(msg)};
+ }
+ txn->put(instance_key({instance.instance_id()}), serialized);
+ LOG(INFO) << "marking chain predecessor as DELETED, instance_id=" <<
instance.instance_id();
+ }
+
+ tail_instance->set_status(InstanceInfoPB::DELETED);
+ tail_instance->set_mtime(now);
+ std::string serialized = tail_instance->SerializeAsString();
+ if (serialized.empty()) {
+ std::string msg = "failed to serialize";
+ LOG(ERROR) << msg;
+ return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, std::move(msg)};
+ }
+ LOG(INFO) << "drop instance_id=" << tail_instance_id << " and " <<
predecessors.size()
+ << " predecessor instances, json=" <<
proto_to_json(*tail_instance);
+ return {MetaServiceCode::OK, std::move(serialized)};
+}
+
void MetaServiceImpl::alter_instance(google::protobuf::RpcController*
controller,
const AlterInstanceRequest* request,
AlterInstanceResponse* response,
@@ -2175,65 +2344,29 @@ void
MetaServiceImpl::alter_instance(google::protobuf::RpcController* controller
std::pair<MetaServiceCode, std::string> ret;
switch (request->op()) {
case AlterInstanceRequest::DROP: {
- ret = alter_instance(
- request, [&request, &instance_id](Transaction* txn,
InstanceInfoPB* instance) {
- std::string msg;
- // check instance doesn't have any cluster.
- if (instance->clusters_size() != 0) {
- msg = "failed to drop instance, instance has clusters";
- LOG(WARNING) << msg;
- return
std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
- }
-
- // check instance doesn't have any snapshot.
- MetaReader meta_reader(instance_id);
- std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
- TxnErrorCode err = meta_reader.get_snapshots(txn,
&snapshots);
- if (err != TxnErrorCode::TXN_OK) {
- msg = "failed to get snapshots";
- LOG(WARNING) << msg << " err=" << err;
- return std::make_pair(cast_as<ErrCategory::READ>(err),
msg);
- }
- for (auto& [snapshot, _] : snapshots) {
- if (snapshot.status() !=
SnapshotStatus::SNAPSHOT_RECYCLED) {
- // still has snapshots, cannot drop
- msg = "failed to drop instance, instance has
snapshots";
- LOG(WARNING) << msg;
- return
std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
- }
- }
+ ret = alter_instance(request, [&instance_id](Transaction* txn,
InstanceInfoPB* instance) {
+ // check instance doesn't have any cluster.
+ if (instance->clusters_size() != 0) {
+ std::string msg = "failed to drop instance, instance has
clusters";
+ LOG(WARNING) << msg;
+ return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
+ }
- instance->set_status(InstanceInfoPB::DELETED);
- instance->set_mtime(
-
duration_cast<seconds>(system_clock::now().time_since_epoch()).count());
+ if (instance->has_successor_instance_id() &&
+ !instance->successor_instance_id().empty()) {
+ std::string msg = fmt::format(
+ "failed to drop instance, instance has a successor;
please drop "
+ "the chain tail instead, successor_instance_id={}",
+ instance->successor_instance_id());
+ LOG(WARNING) << msg;
+ return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
std::move(msg));
+ }
- std::string ret = instance->SerializeAsString();
- if (ret.empty()) {
- msg = "failed to serialize";
- LOG(ERROR) << msg;
- return
std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
- }
- LOG(INFO) << "put instance_id=" << request->instance_id()
- << "drop instance json=" <<
proto_to_json(*instance);
-
- if (instance->has_source_instance_id() &&
instance->has_source_snapshot_id() &&
- !instance->source_instance_id().empty() &&
- !instance->source_snapshot_id().empty()) {
- Versionstamp snapshot_versionstamp;
- if (!SnapshotManager::parse_snapshot_versionstamp(
- instance->source_snapshot_id(),
&snapshot_versionstamp)) {
- msg = "failed to parse snapshot_id to
versionstamp, snapshot_id=" +
- instance->source_snapshot_id();
- LOG(WARNING) << msg;
- return
std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
- }
- versioned::SnapshotReferenceKeyInfo ref_key_info {
- instance->source_instance_id(),
snapshot_versionstamp, instance_id};
- std::string reference_key =
versioned::snapshot_reference_key(ref_key_info);
- txn->remove(reference_key);
- }
- return std::make_pair(MetaServiceCode::OK, ret);
- });
+ bool is_chain = instance->has_original_instance_id() &&
+ !instance->original_instance_id().empty();
+ return is_chain ? drop_instance_chain(instance_id, txn, instance)
+ : drop_single_instance(instance_id, txn, instance);
+ });
} break;
case AlterInstanceRequest::RENAME: {
ret = alter_instance(request, [&request](Transaction* txn,
InstanceInfoPB* instance) {
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 6598a44aff7..cf5cd4ea034 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -415,7 +415,8 @@ public:
}
// Recycle snapshot meta and data, return 0 for success otherwise error.
- int recycle_snapshot_meta_and_data(const std::string& resource_id,
+ int recycle_snapshot_meta_and_data(const std::string& instance_id,
+ const std::string& resource_id,
Versionstamp snapshot_version,
const SnapshotPB& snapshot_pb);
diff --git a/cloud/src/recycler/recycler_snapshot.cpp
b/cloud/src/recycler/recycler_snapshot.cpp
index 17bc4a19338..6b8203dbc10 100644
--- a/cloud/src/recycler/recycler_snapshot.cpp
+++ b/cloud/src/recycler/recycler_snapshot.cpp
@@ -43,19 +43,19 @@ int InstanceRecycler::recycle_cluster_snapshots() {
return snapshot_manager_->recycle_snapshots(this);
}
-int InstanceRecycler::recycle_snapshot_meta_and_data(const std::string&
resource_id,
+int InstanceRecycler::recycle_snapshot_meta_and_data(const std::string&
instance_id,
+ const std::string&
resource_id,
Versionstamp
snapshot_version,
const SnapshotPB&
snapshot_pb) {
auto it = accessor_map_.find(resource_id);
if (it == accessor_map_.end()) {
LOG(WARNING) << "no accessor for resource, cannot recycle snapshot
data"
- << ", instance_id=" << instance_id_
- << ", resource_id=" << instance_info_.resource_ids(0);
+ << ", instance_id=" << instance_id << ", resource_id=" <<
resource_id;
return -1;
}
return snapshot_manager_->recycle_snapshot_meta_and_data(
- instance_id_, resource_id, it->second.get(), snapshot_version,
snapshot_pb);
+ instance_id, resource_id, it->second.get(), snapshot_version,
snapshot_pb);
}
int InstanceRecycler::has_cluster_snapshots(bool* any) {
diff --git a/cloud/src/snapshot/snapshot_manager.cpp
b/cloud/src/snapshot/snapshot_manager.cpp
index ac2b9e22d44..2ba49d2adde 100644
--- a/cloud/src/snapshot/snapshot_manager.cpp
+++ b/cloud/src/snapshot/snapshot_manager.cpp
@@ -20,7 +20,9 @@
#include <fmt/format.h>
#include "common/logging.h"
+#include "meta-service/meta_service_helper.h"
#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
#include "meta-store/versionstamp.h"
#include "recycler/checker.h"
#include "recycler/recycler.h"
@@ -262,4 +264,98 @@ int
SnapshotManager::compact_snapshot_chains(InstanceChainCompactor* compactor)
return -1;
}
+static std::pair<MetaServiceCode, std::string> get_instance(Transaction* txn,
+ std::string_view
instance_id,
+ InstanceInfoPB*
instance_info) {
+ InstanceKeyInfo instance_key_info {instance_id};
+ std::string key = instance_key(instance_key_info);
+ std::string val;
+ TxnErrorCode err = txn->get(key, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return {MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("instance not found, instance_id={}",
instance_id)};
+ } else if (err != TxnErrorCode::TXN_OK) {
+ return {cast_as<ErrCategory::READ>(err),
+ fmt::format("failed to get instance, instance_id={}, err={}",
instance_id, err)};
+ }
+
+ if (!instance_info->ParseFromString(val)) {
+ return {MetaServiceCode::INVALID_ARGUMENT, "failed to parse instance
info"};
+ }
+ return {MetaServiceCode::OK, ""};
+}
+
+std::pair<MetaServiceCode, std::string> SnapshotManager::get_all_snapshots(
+ Transaction* txn, std::string_view instance_id, std::string_view
required_snapshot_id,
+ std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) {
+ Versionstamp required_snapshot_versionstamp;
+ if (!required_snapshot_id.empty()) {
+ if (!parse_snapshot_versionstamp(required_snapshot_id,
&required_snapshot_versionstamp)) {
+ return {MetaServiceCode::INVALID_ARGUMENT, "invalid snapshot_id
format"};
+ }
+ }
+
+ InstanceInfoPB instance_info;
+ auto [code, error_msg] = get_instance(txn, instance_id, &instance_info);
+ if (code != MetaServiceCode::OK) {
+ return {code, error_msg};
+ }
+ std::string current_instance_id(instance_id);
+ if (instance_info.has_original_instance_id() &&
!instance_info.original_instance_id().empty()) {
+ // the earliest instance_id for rollback
+ current_instance_id = instance_info.original_instance_id();
+ }
+
+ std::unordered_set<std::string> visited;
+ do {
+ MetaReader meta_reader(current_instance_id);
+ if (required_snapshot_id.empty()) {
+ TxnErrorCode err = meta_reader.get_snapshots(txn, snapshots);
+ if (err != TxnErrorCode::TXN_OK) {
+ return {cast_as<ErrCategory::READ>(err), "failed to get
snapshots"};
+ }
+ } else {
+ SnapshotPB snapshot_pb;
+ TxnErrorCode err =
+ meta_reader.get_snapshot(txn,
required_snapshot_versionstamp, &snapshot_pb);
+ if (err == TxnErrorCode::TXN_OK) {
+ snapshots->emplace_back(snapshot_pb,
required_snapshot_versionstamp);
+ return {MetaServiceCode::OK, ""};
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return {cast_as<ErrCategory::READ>(err), "failed to get
snapshot"};
+ }
+ }
+ if (current_instance_id == instance_id) {
+ break;
+ }
+ auto [code, error_msg] = get_instance(txn, current_instance_id,
&instance_info);
+ if (code != MetaServiceCode::OK) {
+ std::string msg = fmt::format("failed to get ancestor instance {}:
{}",
+ current_instance_id, error_msg);
+ LOG_WARNING(msg);
+ return {code, msg};
+ }
+ if (!instance_info.has_successor_instance_id() ||
+ instance_info.successor_instance_id().empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ error_msg = fmt::format(
+ "successor_instance_id is empty for current
instance_id={}, instance_id={}",
+ current_instance_id, instance_id);
+ LOG_WARNING(error_msg);
+ return {code, error_msg};
+ }
+ if (visited.count(current_instance_id) > 0) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ error_msg = fmt::format(
+ "cycle detected in instance chain, current instance_id={},
instance_id={}",
+ current_instance_id, instance_id);
+ LOG_WARNING(error_msg);
+ return {code, error_msg};
+ }
+ visited.insert(current_instance_id);
+ current_instance_id = instance_info.successor_instance_id();
+ } while (true);
+ return {MetaServiceCode::OK, ""};
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/snapshot/snapshot_manager.h
b/cloud/src/snapshot/snapshot_manager.h
index 29e8771c8c4..996558b4e5f 100644
--- a/cloud/src/snapshot/snapshot_manager.h
+++ b/cloud/src/snapshot/snapshot_manager.h
@@ -94,6 +94,14 @@ public:
static bool parse_snapshot_versionstamp(std::string_view snapshot_id,
Versionstamp* versionstamp);
+ // Get all snapshots of the specific instance.
+ //
+ // If the instance is created by rollback, also get the snapshots of all
its predecessor instances.
+ // If the required_snapshot_id is not empty, only get the snapshot with
the specific snapshot_id.
+ static std::pair<MetaServiceCode, std::string> get_all_snapshots(
+ Transaction* txn, std::string_view instance_id, std::string_view
required_snapshot_id,
+ std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots);
+
// Migrate the single version keys to multi-version keys for the instance.
// Return 0 for success otherwise error.
virtual int migrate_to_versioned_keys(InstanceDataMigrator* migrator);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]