This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1d74a0ea609 [chore](cloud) support large operation log (#58100)
1d74a0ea609 is described below
commit 1d74a0ea60966f15ab26768edf9a95813d421941
Author: walter <[email protected]>
AuthorDate: Wed Nov 19 11:33:21 2025 +0800
[chore](cloud) support large operation log (#58100)
---
cloud/src/meta-service/meta_service.cpp | 14 ++---
cloud/src/meta-service/meta_service_job.cpp | 31 ++---------
cloud/src/meta-service/meta_service_partition.cpp | 51 ++++++------------
cloud/src/meta-service/meta_service_txn.cpp | 35 +++---------
cloud/src/recycler/recycler.h | 8 +--
cloud/src/recycler/recycler_operation_log.cpp | 66 ++++++++++++++++-------
cloud/test/meta_service_operation_log_test.cpp | 66 ++++++++++++++---------
cloud/test/recycler_operation_log_test.cpp | 56 +++++++++++++------
8 files changed, 163 insertions(+), 164 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 9018c9b7837..03ce61ecfb0 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1186,16 +1186,10 @@ void
MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
if (is_versioned_write && update_tablet_log.tablet_ids_size() > 0) {
OperationLogPB log;
log.mutable_update_tablet()->Swap(&update_tablet_log);
- std::string update_log_key = versioned::log_key(instance_id);
- std::string operation_log_value;
- if (!log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = "failed to serialize update tablet log";
- return;
- }
- versioned_put(txn.get(), update_log_key, operation_log_value);
- LOG(INFO) << "put versioned update tablet log, key=" <<
hex(update_log_key)
- << " instance_id=" << instance_id << " log_size=" <<
operation_log_value.size();
+ std::string log_key = versioned::log_key(instance_id);
+ versioned::blob_put(txn.get(), log_key, log);
+ LOG(INFO) << "put update tablet operation log, key=" << hex(log_key)
+ << " instance_id=" << instance_id;
}
err = txn->commit();
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 46cd7f44278..7a7df579fa1 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -37,6 +37,7 @@
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
+#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
@@ -1274,28 +1275,17 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
if (!compaction_log.recycle_rowsets().empty() && is_versioned_write) {
size_t num_recycled_rowsets = compaction_log.recycle_rowsets().size();
std::string operation_log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_compaction()->Swap(&compaction_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
- LOG_WARNING(msg)
- .tag("instance_id", instance_id)
- .tag("table_id", request->job().idx().table_id());
- return;
- }
- // Put versioned operation log for compaction to track recycling
- LOG_INFO("put versioned operation log key")
+ versioned::blob_put(txn.get(), operation_log_key, operation_log);
+ LOG_INFO("put compaction operation log key")
.tag("instance_id", instance_id)
.tag("operation_log_key", hex(operation_log_key))
.tag("tablet_id", tablet_id)
- .tag("value_size", operation_log_value.size())
.tag("recycle_rowsets_count", num_recycled_rowsets);
- versioned_put(txn.get(), operation_log_key, operation_log_value);
}
}
@@ -1882,29 +1872,18 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
if (is_versioned_write) {
std::string operation_log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_schema_change()->Swap(&schema_change_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
- LOG_WARNING(msg)
- .tag("instance_id", instance_id)
- .tag("table_id", request->job().idx().table_id());
- return;
- }
- // Put versioned operation log for compaction to track recycling
- LOG_INFO("put versioned operation log key")
+ versioned::blob_put(txn.get(), operation_log_key, operation_log);
+ LOG_INFO("put schema change operation log key")
.tag("instance_id", instance_id)
.tag("operation_log_key", hex(operation_log_key))
.tag("tablet_id", tablet_id)
.tag("new_tablet_id", new_tablet_id)
- .tag("value_size", operation_log_value.size())
.tag("recycle_rowsets_count",
schema_change_log.recycle_rowsets().size());
- versioned_put(txn.get(), operation_log_key, operation_log_value);
}
}
diff --git a/cloud/src/meta-service/meta_service_partition.cpp
b/cloud/src/meta-service/meta_service_partition.cpp
index 1321b6aee9f..f347272be27 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -24,6 +24,7 @@
#include "common/logging.h"
#include "common/stats.h"
#include "meta-service/meta_service_helper.h"
+#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
@@ -295,19 +296,16 @@ void
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
if (commit_index_log.index_ids_size() > 0 &&
is_version_write_enabled(instance_id)) {
std::string operation_log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_commit_index()->Swap(&commit_index_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
- LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id",
request->table_id());
- return;
- }
- versioned_put(txn.get(), operation_log_key, operation_log_value);
+ versioned::blob_put(txn.get(), operation_log_key, operation_log);
+ LOG_INFO("put commit index operation log key")
+ .tag("instance_id", instance_id)
+ .tag("table_id", request->table_id())
+ .tag("operation_log_key", hex(operation_log_key));
}
err = txn->commit();
@@ -419,23 +417,15 @@ void
MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
if (drop_index_log.index_ids_size() > 0 && is_versioned_write) {
std::string operation_log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_drop_index()->Swap(&drop_index_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
- LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id",
request->table_id());
- return;
- }
- versioned_put(txn.get(), operation_log_key, operation_log_value);
+ versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG(INFO) << "put drop index operation log"
<< " instance_id=" << instance_id << " table_id=" <<
request->table_id()
- << " index_ids=" << drop_index_log.index_ids_size()
- << " log_size=" << operation_log_value.size();
+ << " index_ids=" << drop_index_log.index_ids_size();
}
err = txn->commit();
@@ -719,19 +709,16 @@ void
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
if (commit_partition_log.partition_ids_size() > 0 &&
is_version_write_enabled(instance_id)) {
std::string operation_log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_commit_partition()->Swap(&commit_partition_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
- LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id",
request->table_id());
- return;
- }
- versioned_put(txn.get(), operation_log_key, operation_log_value);
+ versioned::blob_put(txn.get(), operation_log_key, operation_log);
+ LOG_INFO("put commit partition operation log key")
+ .tag("instance_id", instance_id)
+ .tag("table_id", request->table_id())
+ .tag("operation_log_key", hex(operation_log_key));
}
err = txn->commit();
@@ -864,23 +851,15 @@ void
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
drop_partition_log.partition_ids_size() > 0) &&
is_versioned_write) {
std::string operation_log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_drop_partition()->Swap(&drop_partition_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
- LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id",
request->table_id());
- return;
- }
- versioned_put(txn.get(), operation_log_key, operation_log_value);
+ versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG(INFO) << "put drop partition operation log"
<< " instance_id=" << instance_id << " table_id=" <<
request->table_id()
- << " partition_ids=" <<
drop_partition_log.partition_ids_size()
- << " log_size=" << operation_log_value.size();
+ << " partition_ids=" <<
drop_partition_log.partition_ids_size();
}
err = txn->commit();
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index dbb73383163..afd51268e56 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -32,6 +32,7 @@
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
+#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
@@ -1829,17 +1830,9 @@ void MetaServiceImpl::commit_txn_immediately(
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
- std::string operation_log_value;
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize operation_log, txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
+ versioned::blob_put(txn.get(), log_key, operation_log);
LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
- << " txn_id=" << txn_id
- << " operation_log_size=" << operation_log_value.size();
- versioned_put(txn.get(), log_key, operation_log_value);
+ << " txn_id=" << txn_id;
} else {
std::string recycle_val;
if (!recycle_pb.SerializeToString(&recycle_val)) {
@@ -2358,16 +2351,9 @@ void MetaServiceImpl::commit_txn_eventually(
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
- std::string operation_log_value;
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize operation_log, txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
- versioned_put(txn.get(), log_key, operation_log_value);
+ versioned::blob_put(txn.get(), log_key, operation_log);
LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
- << " txn_id=" << txn_id << " log_size=" <<
operation_log_value.size();
+ << " txn_id=" << txn_id;
}
VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" <<
txn->delete_bytes()
@@ -2880,21 +2866,14 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
if (is_versioned_write) {
commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
std::string log_key = versioned::log_key({instance_id});
- std::string operation_log_value;
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
- if (!operation_log.SerializeToString(&operation_log_value)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize operation_log, txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
- versioned_put(txn.get(), log_key, operation_log_value);
+ versioned::blob_put(txn.get(), log_key, operation_log);
LOG(INFO) << "put commit txn operation log key=" <<
hex(recycle_key)
- << " txn_id=" << txn_id << " log_size=" <<
operation_log_value.size();
+ << " txn_id=" << txn_id;
} else {
if (!recycle_pb.SerializeToString(&recycle_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 463f5e9cb6b..12365c8668c 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -422,11 +422,11 @@ private:
// for scan all rs of tablet and statistics metrics
int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext&
metrics_context);
- // Recycle operation log and the log key.
+ // Recycle operation log and the log keys. The log keys are specified by
`raw_keys`.
//
- // The log_key is constructed from the log_version and instance_id.
- // Both `operation_log` and `log_key` will be removed in the same
transaction, to ensure atomicity.
- int recycle_operation_log(Versionstamp log_version, OperationLogPB
operation_log);
+ // Both `operation_log` and `raw_keys` will be removed in the same
transaction, to ensure atomicity.
+ int recycle_operation_log(Versionstamp log_version, const
std::vector<std::string>& raw_keys,
+ OperationLogPB operation_log);
// Recycle rowset meta and data, return 0 for success otherwise error
//
diff --git a/cloud/src/recycler/recycler_operation_log.cpp
b/cloud/src/recycler/recycler_operation_log.cpp
index c5a95901afa..c9ea3f0ba99 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -38,6 +38,7 @@
#include "common/util.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_schema.h"
+#include "meta-store/blob_message.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
@@ -128,8 +129,12 @@ bool OperationLogRecycleChecker::can_recycle(const
Versionstamp& log_versionstam
// A recycler for operation logs.
class OperationLogRecycler {
public:
- OperationLogRecycler(std::string_view instance_id, TxnKv* txn_kv,
Versionstamp log_version)
- : instance_id_(instance_id), txn_kv_(txn_kv),
log_version_(log_version) {}
+ OperationLogRecycler(std::string_view instance_id, TxnKv* txn_kv,
Versionstamp log_version,
+ const std::vector<std::string>& raw_keys)
+ : instance_id_(instance_id),
+ txn_kv_(txn_kv),
+ log_version_(log_version),
+ raw_keys_(raw_keys) {}
OperationLogRecycler(const OperationLogRecycler&) = delete;
OperationLogRecycler& operator=(const OperationLogRecycler&) = delete;
@@ -164,6 +169,7 @@ private:
std::string_view instance_id_;
TxnKv* txn_kv_;
Versionstamp log_version_;
+ const std::vector<std::string>& raw_keys_;
std::unique_ptr<Transaction> txn_;
};
@@ -372,9 +378,10 @@ int OperationLogRecycler::begin() {
int OperationLogRecycler::commit() {
// Remove the operation log entry itself after recycling its contents
- std::string log_key =
encode_versioned_key(versioned::log_key(instance_id_), log_version_);
LOG_INFO("remove operation log key").tag("log_version",
log_version_.to_string());
- txn_->remove(log_key);
+ for (const auto& raw_key : raw_keys_) {
+ txn_->remove(raw_key);
+ }
TxnErrorCode err = txn_->commit();
if (err != TxnErrorCode::TXN_OK) {
@@ -629,7 +636,8 @@ int InstanceRecycler::recycle_operation_logs() {
}
auto scan_and_recycle_operation_log = [&](const std::string_view& key,
- const std::string_view& value) {
+ const std::vector<std::string>&
raw_keys,
+ OperationLogPB operation_log) {
std::string_view log_key(key);
Versionstamp log_versionstamp;
if (!decode_versioned_key(&log_key, &log_versionstamp)) {
@@ -638,28 +646,22 @@ int InstanceRecycler::recycle_operation_logs() {
return -1;
}
- OperationLogPB operation_log;
- if (!operation_log.ParseFromArray(value.data(), value.size())) {
- LOG_WARNING("failed to parse OperationLogPB from operation log
key")
- .tag("key", hex(key));
- return -1;
- }
-
+ size_t value_size = operation_log.ByteSizeLong();
if (recycle_checker.can_recycle(log_versionstamp,
operation_log.min_timestamp())) {
AnnotateTag tag("log_key", hex(key));
- int res = recycle_operation_log(log_versionstamp,
std::move(operation_log));
+ int res = recycle_operation_log(log_versionstamp, raw_keys,
std::move(operation_log));
if (res != 0) {
LOG_WARNING("failed to recycle operation
log").tag("error_code", res);
return res;
}
recycled_operation_logs++;
- recycled_operation_log_data_size += value.size();
+ recycled_operation_log_data_size += value_size;
}
total_operation_logs++;
- operation_log_data_size += value.size();
- max_operation_log_data_size = std::max(max_operation_log_data_size,
value.size());
+ operation_log_data_size += value_size;
+ max_operation_log_data_size = std::max(max_operation_log_data_size,
value_size);
return 0;
};
@@ -699,15 +701,39 @@ int InstanceRecycler::recycle_operation_logs() {
std::string log_key_prefix = versioned::log_key(instance_id_);
std::string begin_key = encode_versioned_key(log_key_prefix,
Versionstamp::min());
std::string end_key = encode_versioned_key(log_key_prefix,
Versionstamp::max());
- return scan_and_recycle(std::move(begin_key), end_key,
- std::move(scan_and_recycle_operation_log),
- std::move(is_multi_version_status_changed));
+
+ std::unique_ptr<BlobIterator> iter = blob_get_range(txn_kv_, begin_key,
end_key);
+ for (size_t i = 0; iter->valid(); iter->next(), i++) {
+ std::string_view key = iter->key();
+ OperationLogPB operation_log;
+ if (!iter->parse_value(&operation_log)) {
+ LOG_WARNING("failed to parse OperationLogPB from operation log
key")
+ .tag("key", hex(key));
+ return -1;
+ }
+
+ int res = scan_and_recycle_operation_log(key, iter->raw_keys(),
std::move(operation_log));
+ if (res != 0) {
+ return res;
+ }
+
+ if (i % 1000 == 0 && is_multi_version_status_changed() != 0) {
+ return -1;
+ }
+ }
+ if (iter->error_code() != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("error occurred during scanning operation logs")
+ .tag("error_code", iter->error_code());
+ return -1;
+ }
+ return 0;
}
int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
+ const std::vector<std::string>&
raw_keys,
OperationLogPB operation_log) {
int recycle_log_count = 0;
- OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(),
log_version);
+ OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(),
log_version, raw_keys);
RETURN_ON_FAILURE(log_recycler.begin());
#define RECYCLE_OPERATION_LOG(log_type, method_name) \
diff --git a/cloud/test/meta_service_operation_log_test.cpp
b/cloud/test/meta_service_operation_log_test.cpp
index 66dac8bef54..af2da9b23f9 100644
--- a/cloud/test/meta_service_operation_log_test.cpp
+++ b/cloud/test/meta_service_operation_log_test.cpp
@@ -30,12 +30,14 @@
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
+#include "meta-store/blob_message.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
+#include "meta-store/versionstamp.h"
namespace doris::cloud {
// External functions from meta_service_test.cpp
@@ -103,6 +105,31 @@ static std::string dump_range(TxnKv* txn_kv,
std::string_view begin = "",
return buffer;
}
+// It will get the latest versioned values.
+TxnErrorCode read_operation_log(Transaction* txn, std::string_view log_key,
+ Versionstamp* log_version, OperationLogPB*
operation_log) {
+ std::string begin_key = encode_versioned_key(log_key, Versionstamp::min());
+ std::string end_key = encode_versioned_key(log_key, Versionstamp::max());
+ auto iter = blob_get_range(txn, begin_key, end_key);
+ if (!iter->valid()) {
+ TxnErrorCode err = iter->error_code();
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return TxnErrorCode::TXN_KEY_NOT_FOUND;
+ }
+ for (; iter->valid(); iter->next()) {
+ std::string_view key = iter->key();
+ if (!decode_versioned_key(&key, log_version)) {
+ return TxnErrorCode::TXN_INVALID_DATA;
+ }
+ if (!iter->parse_value(operation_log)) {
+ return TxnErrorCode::TXN_INVALID_DATA;
+ }
+ }
+ return iter->error_code();
+}
+
TEST(MetaServiceOperationLogTest, CommitPartitionLog) {
auto meta_service = get_meta_service(false);
std::string instance_id = "commit_partition_log";
@@ -200,10 +227,9 @@ TEST(MetaServiceOperationLogTest, CommitPartitionLog) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value),
TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &version2,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_commit_partition());
}
@@ -371,10 +397,9 @@ TEST(MetaServiceOperationLogTest, DropPartitionLog) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value),
TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &version2,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_drop_partition());
ASSERT_EQ(operation_log.drop_partition().partition_ids_size(), 1);
ASSERT_EQ(operation_log.drop_partition().partition_ids(0),
partition_id + 3);
@@ -477,10 +502,9 @@ TEST(MetaServiceOperationLogTest, CommitIndexLog) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value),
TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &version2,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_commit_index());
}
@@ -712,10 +736,9 @@ TEST(MetaServiceOperationLogTest, DropIndexLog) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &version, &value),
TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &version,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_drop_index());
ASSERT_EQ(operation_log.drop_index().index_ids_size(), 1);
ASSERT_EQ(operation_log.drop_index().index_ids(0), index_id + 3);
@@ -906,10 +929,9 @@ TEST(MetaServiceOperationLogTest, CommitTxn) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &version, &value),
TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &version,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_commit_txn());
const auto& commit_log = operation_log.commit_txn();
@@ -1055,11 +1077,9 @@ TEST(MetaServiceOperationLogTest, CommitTxnEventually) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &commit_versionstamp,
&value),
- TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &commit_versionstamp,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_commit_txn());
const auto& commit_log = operation_log.commit_txn();
@@ -1349,11 +1369,9 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &commit_versionstamp,
&value),
- TxnErrorCode::TXN_OK);
OperationLogPB operation_log;
- ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &commit_versionstamp,
&operation_log),
+ TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_commit_txn());
const auto& commit_log = operation_log.commit_txn();
@@ -1473,7 +1491,7 @@ TEST(MetaServiceOperationLogTest,
UpdateVersionedTabletMeta) {
std::string log_key = versioned::log_key(instance_id);
OperationLogPB operation_log;
TxnErrorCode err =
- versioned::document_get(txn.get(), log_key, &operation_log,
&log_versionstamp);
+ read_operation_log(txn.get(), log_key, &log_versionstamp,
&operation_log);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(operation_log.has_update_tablet());
EXPECT_EQ(operation_log.update_tablet().tablet_ids_size(), 2);
diff --git a/cloud/test/recycler_operation_log_test.cpp
b/cloud/test/recycler_operation_log_test.cpp
index 8b59321135e..90e49b9ac4e 100644
--- a/cloud/test/recycler_operation_log_test.cpp
+++ b/cloud/test/recycler_operation_log_test.cpp
@@ -29,6 +29,7 @@
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
+#include "meta-store/blob_message.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
@@ -125,6 +126,31 @@ static void remove_instance_info(TxnKv* txn_kv) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit
transaction";
}
+// It will get the latest versioned values.
+TxnErrorCode read_operation_log(Transaction* txn, std::string_view log_key,
+ Versionstamp* log_version, OperationLogPB*
operation_log) {
+ std::string begin_key = encode_versioned_key(log_key, Versionstamp::min());
+ std::string end_key = encode_versioned_key(log_key, Versionstamp::max());
+ auto iter = blob_get_range(txn, begin_key, end_key);
+ if (!iter->valid()) {
+ TxnErrorCode err = iter->error_code();
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return TxnErrorCode::TXN_KEY_NOT_FOUND;
+ }
+ for (; iter->valid(); iter->next()) {
+ std::string_view key = iter->key();
+ if (!decode_versioned_key(&key, log_version)) {
+ return TxnErrorCode::TXN_INVALID_DATA;
+ }
+ if (!iter->parse_value(operation_log)) {
+ return TxnErrorCode::TXN_INVALID_DATA;
+ }
+ }
+ return iter->error_code();
+}
+
TEST(RecycleOperationLogTest, RecycleOneOperationLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
txn_kv->update_commit_version(1000);
@@ -152,14 +178,13 @@ TEST(RecycleOperationLogTest, RecycleOneOperationLog) {
// Put a empty operation log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
- std::string log_key_with_versionstamp = encode_versioned_key(log_key,
versionstamp);
OperationLogPB operation_log;
operation_log.set_min_timestamp(versionstamp.version());
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- txn->put(log_key_with_versionstamp, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, versionstamp, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -218,7 +243,7 @@ TEST(RecycleOperationLogTest, RecycleCommitPartitionLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -248,7 +273,7 @@ TEST(RecycleOperationLogTest, RecycleCommitPartitionLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -312,7 +337,7 @@ TEST(RecycleOperationLogTest, RecycleDropPartitionLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -363,7 +388,7 @@ TEST(RecycleOperationLogTest, RecycleDropPartitionLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -481,7 +506,7 @@ TEST(RecycleOperationLogTest, RecycleCommitIndexLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -510,7 +535,7 @@ TEST(RecycleOperationLogTest, RecycleCommitIndexLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -556,7 +581,7 @@ TEST(RecycleOperationLogTest, RecycleDropIndexLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -723,7 +748,7 @@ TEST(RecycleOperationLogTest, RecycleCommitTxnLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -848,7 +873,7 @@ TEST(RecycleOperationLogTest,
RecycleCommitTxnLogWhenTxnIsNotVisible) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -929,7 +954,7 @@ TEST(RecycleOperationLogTest, RecycleUpdateTabletLog) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
@@ -1235,10 +1260,9 @@ TEST(RecycleOperationLogTest, RecycleCompactionLog) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string log_key = versioned::log_key({test_instance_id});
- std::string value;
- ASSERT_EQ(versioned_get(txn.get(), log_key, &log_version, &value),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(read_operation_log(txn.get(), log_key, &log_version,
&operation_log),
+ TxnErrorCode::TXN_OK);
- ASSERT_TRUE(operation_log.ParseFromString(value));
ASSERT_TRUE(operation_log.has_compaction());
const auto& compaction_log = operation_log.compaction();
@@ -2202,7 +2226,7 @@ TEST(RecycleOperationLogTest, RecycleDeletedInstance) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+ versioned::blob_put(txn.get(), log_key, operation_log);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]