This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1e5b338007b [chore](cloud) ms http util supports versioned keys --
encode decode (#58953)
1e5b338007b is described below
commit 1e5b338007b9fd39d30e33d01ed923386677df92
Author: walter <[email protected]>
AuthorDate: Mon Dec 15 19:06:06 2025 +0800
[chore](cloud) ms http util supports versioned keys -- encode decode
(#58953)
encode versioned key (versionstamp is optional):
```
$ curl
'localhost:5000/MetaService/http/encode_key?token=greedisgood9999&key_type=VersionedPartitionVersionKey&instance_id=gavin-instance&partition_id=10086'
┌─────────────────────────────────────────────────────────────────────────────────
0. key space: 3
│
┌───────────────────────────────────────────────────────────────────────────────
1. version
│ │
┌─────────────────────────────────────────────────────────── 2. gavin-instance
│ │ │
┌───────────────────────── 3. partition
│ │ │ │
┌─ 4. 10086
│ │ │ │
│
▼ ▼ ▼ ▼
▼
031076657273696f6e000110676176696e2d696e7374616e6365000110706172746974696f6e0001120000000000002766
\x03\x10\x76\x65\x72\x73\x69\x6f\x6e\x00\x01\x10\x67\x61\x76\x69\x6e\x2d\x69\x6e\x73\x74\x61\x6e\x63\x65\x00\x01\x10\x70\x61\x72\x74\x69\x74\x69\x6f\x6e\x00\x01\x12\x00\x00\x00\x00\x00\x00\x27\x66
```
encode versioned key with versionstamp:
```
$ curl
'localhost:5000/MetaService/http/encode_key?token=greedisgood9999&key_type=VersionedPartitionVersionKey&instance_id=gavin-instance&partition_id=10086&versionstamp=00000000000000001010'
┌───────────────────────────────────────────────────────────────────────────────────────────────────
0. key space: 3
│
┌─────────────────────────────────────────────────────────────────────────────────────────────────
1. version
│ │
┌─────────────────────────────────────────────────────────────────────────────
2. gavin-instance
│ │ │
┌─────────────────────────────────────────── 3. partition
│ │ │ │
┌─────────────────── 4. 10086
│ │ │ │
│ ┌─ 5. versionstamp: 00000000000000001010
│ │ │ │
│ │
▼ ▼ ▼ ▼
▼ ▼
031076657273696f6e000110676176696e2d696e7374616e6365000110706172746974696f6e00011200000000000027661300000000000000001010ff
\x03\x10\x76\x65\x72\x73\x69\x6f\x6e\x00\x01\x10\x67\x61\x76\x69\x6e\x2d\x69\x6e\x73\x74\x61\x6e\x63\x65\x00\x01\x10\x70\x61\x72\x74\x69\x74\x69\x6f\x6e\x00\x01\x12\x00\x00\x00\x00\x00\x00\x27\x66\x13\x00\x00\x00\x00\x00\x00\x00\x00\x10\x10\xff
```
decode a key with versionstamp:
```
$ curl
'localhost:5000/MetaService/http/decode_key?token=greedisgood9999&key_type=VersionedPartitionVersionKey&key=031076657273696f6e000110676176696e2d696e7374616e6365000110706172746974696f6e00011200000000000027661300000000000000001010ff'
┌───────────────────────────────────────────────────────────────────────────────────────────────────
0. key space: 3
│
┌─────────────────────────────────────────────────────────────────────────────────────────────────
1. version
│ │
┌─────────────────────────────────────────────────────────────────────────────
2. gavin-instance
│ │ │
┌─────────────────────────────────────────── 3. partition
│ │ │ │
┌─────────────────── 4. 10086
│ │ │ │
│ ┌─ 5. versionstamp: 00000000000000001010
│ │ │ │
│ │
▼ ▼ ▼ ▼
▼ ▼
031076657273696f6e000110676176696e2d696e7374616e6365000110706172746974696f6e00011200000000000027661300000000000000001010ff
```
For get/set_value, if the versionstamp is not set, get the max version
or put it with auto allocated versionstamp.
---
cloud/src/meta-service/http_encode_key.cpp | 288 +++++++++++++++++--
cloud/test/http_encode_key_test.cpp | 438 +++++++++++++++++++++++++++++
2 files changed, 708 insertions(+), 18 deletions(-)
diff --git a/cloud/src/meta-service/http_encode_key.cpp
b/cloud/src/meta-service/http_encode_key.cpp
index 4ee76af5c9f..08a4c779b31 100644
--- a/cloud/src/meta-service/http_encode_key.cpp
+++ b/cloud/src/meta-service/http_encode_key.cpp
@@ -22,7 +22,6 @@
#include <google/protobuf/message.h>
#include <google/protobuf/util/json_util.h>
-#include <bit>
#include <chrono>
#include <fstream>
#include <iomanip>
@@ -34,7 +33,6 @@
#include <utility>
#include <vector>
-#include "common/config.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
@@ -43,10 +41,11 @@
#include "meta-service/meta_service_schema.h"
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/blob_message.h"
-#include "meta-store/codec.h"
+#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
namespace doris::cloud {
@@ -112,6 +111,13 @@ static std::string parse(const ValueBuf& buf) {
return proto_to_json(pb);
}
+template <class ProtoType>
+static std::string parse_to_json(const std::string& buf) {
+ ProtoType pb;
+ if (!pb.ParseFromString(buf)) return "";
+ return proto_to_json(pb);
+}
+
static std::string parse_txn_label(const ValueBuf& buf) {
std::string value;
if (buf.iters.size() != 1 || buf.iters[0]->size() != 1) {
@@ -199,6 +205,38 @@ static std::string parse_tablet_stats(const ValueBuf& buf)
{
return json;
}
+static std::string parse_empty_value(const std::string& buf) {
+ return "";
+}
+
+static std::string parse_int64_value(const std::string& buf) {
+ if (buf.size() != sizeof(int64_t)) {
+ return "";
+ }
+ int64_t value = 0;
+ std::memcpy(&value, buf.data(), sizeof(int64_t));
+ return std::to_string(value);
+}
+
+// Helper function to parse versionstamp from URI query parameter
+static std::optional<Versionstamp> parse_versionstamp_from_uri(const
brpc::URI& uri) {
+ std::string_view vs_str = http_query(uri, "versionstamp");
+ if (vs_str.empty()) {
+ return std::nullopt;
+ }
+
+ // versionstamp can be in hex format or direct bytes
+ std::string vs_bytes = unhex(vs_str);
+ if (vs_bytes.size() != 10) {
+ LOG(WARNING) << "Invalid versionstamp size: " << vs_bytes.size();
+ return std::nullopt;
+ }
+
+ std::array<uint8_t, 10> vs_array;
+ std::memcpy(vs_array.data(), vs_bytes.data(), 10);
+ return Versionstamp(vs_array);
+}
+
// See keys.h to get all types of key, e.g: MetaRowsetKeyInfo
// key_type -> {{param_name1, param_name2 ...}, key_encoding_func,
serialized_pb_to_json_parsing_func, json_to_proto_parsing_func}
// where param names are the input for key_encoding_func
@@ -238,6 +276,35 @@ static std::unordered_map<std::string_view,
{"MetaServiceRegistryKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_registry_key();
}, parse<ServiceRegistryPB> ,
parse_json<ServiceRegistryPB>}},
{"MetaServiceArnInfoKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_arn_info_key();
}, parse<RamUserPB> , parse_json<RamUserPB>}},
{"MetaServiceEncryptionKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_encryption_key_info_key();
}, parse<EncryptionKeyInfoPB> ,
parse_json<EncryptionKeyInfoPB>}},
+ {"PackedFileKey", {{"instance_id", "packed_file_path"},
[](param_type& p) { return
packed_file_key(KeyInfoSetter<PackedFileKeyInfo>{p}.get());
}, parse<PackedFileInfoPB> , parse_json<PackedFileInfoPB>}},
+};
+
+// Versioned key support (0x03 space)
+// key_type -> {{param_name1, param_name2 ...}, key_encoding_func,
is_versioned_key, parse_func, json_to_proto_func}
+static std::unordered_map<std::string_view,
+ std::tuple<std::vector<std::string_view>,
+ std::function<std::string(param_type&)>,
+ bool,
+ std::function<std::string(const
std::string&)>,
+
std::function<std::shared_ptr<google::protobuf::Message>(const std::string&)>>>
versioned_param_set {
+ {"VersionedPartitionVersionKey", {{"instance_id", "partition_id"},
[](param_type& p) { return
versioned::partition_version_key(KeyInfoSetter<versioned::PartitionVersionKeyInfo>{p}.get());
}, true, parse_to_json<VersionPB>, parse_json<VersionPB>}},
+ {"VersionedTableVersionKey", {{"instance_id", "table_id"},
[](param_type& p) { return
versioned::table_version_key(KeyInfoSetter<versioned::TableVersionKeyInfo>{p}.get());
}, true, parse_empty_value, nullptr}},
+ {"VersionedPartitionIndexKey", {{"instance_id", "partition_id"},
[](param_type& p) { return
versioned::partition_index_key(KeyInfoSetter<versioned::PartitionIndexKeyInfo>{p}.get());
}, false, parse_to_json<PartitionIndexPB>,
parse_json<PartitionIndexPB>}},
+ {"VersionedPartitionInvertedIndexKey", {{"instance_id", "db_id",
"table_id", "partition_id"}, [](param_type& p) { return
versioned::partition_inverted_index_key(KeyInfoSetter<versioned::PartitionInvertedIndexKeyInfo>{p}.get());
}, false, parse_empty_value, nullptr}},
+ {"VersionedTabletIndexKey", {{"instance_id", "tablet_id"},
[](param_type& p) { return
versioned::tablet_index_key(KeyInfoSetter<versioned::TabletIndexKeyInfo>{p}.get());
}, false, parse_to_json<TabletIndexPB>,
parse_json<TabletIndexPB>}},
+ {"VersionedTabletInvertedIndexKey", {{"instance_id", "db_id",
"table_id", "index_id", "partition_id", "tablet_id"}, [](param_type& p) {
return
versioned::tablet_inverted_index_key(KeyInfoSetter<versioned::TabletInvertedIndexKeyInfo>{p}.get());
}, false, parse_empty_value, nullptr}},
+ {"VersionedIndexIndexKey", {{"instance_id", "index_id"},
[](param_type& p) { return
versioned::index_index_key(KeyInfoSetter<versioned::IndexIndexKeyInfo>{p}.get());
}, false, parse_to_json<IndexIndexPB>,
parse_json<IndexIndexPB>}},
+ {"VersionedIndexInvertedKey", {{"instance_id", "db_id",
"table_id", "index_id"}, [](param_type& p) { return
versioned::index_inverted_key(KeyInfoSetter<versioned::IndexInvertedKeyInfo>{p}.get());
}, false, parse_empty_value, nullptr}},
+ {"VersionedTabletLoadStatsKey", {{"instance_id", "tablet_id"},
[](param_type& p) { return
versioned::tablet_load_stats_key(KeyInfoSetter<versioned::TabletLoadStatsKeyInfo>{p}.get());
}, true, parse_to_json<TabletStatsPB>, parse_json<TabletStatsPB>}},
+ {"VersionedTabletCompactStatsKey", {{"instance_id", "tablet_id"},
[](param_type& p) { return
versioned::tablet_compact_stats_key(KeyInfoSetter<versioned::TabletCompactStatsKeyInfo>{p}.get());
}, true, parse_to_json<TabletStatsPB>, parse_json<TabletStatsPB>}},
+ {"VersionedMetaPartitionKey", {{"instance_id", "partition_id"},
[](param_type& p) { return
versioned::meta_partition_key(KeyInfoSetter<versioned::MetaPartitionKeyInfo>{p}.get());
}, true, parse_empty_value, nullptr}},
+ {"VersionedMetaIndexKey", {{"instance_id", "index_id"},
[](param_type& p) { return
versioned::meta_index_key(KeyInfoSetter<versioned::MetaIndexKeyInfo>{p}.get());
}, true, parse_empty_value, nullptr}},
+ {"VersionedMetaTabletKey", {{"instance_id", "tablet_id"},
[](param_type& p) { return
versioned::meta_tablet_key(KeyInfoSetter<versioned::MetaTabletKeyInfo>{p}.get());
}, true, parse_to_json<doris::TabletMetaCloudPB>,
parse_json<doris::TabletMetaCloudPB>}},
+ {"VersionedMetaSchemaKey", {{"instance_id", "index_id",
"schema_version"}, [](param_type& p) { return
versioned::meta_schema_key(KeyInfoSetter<versioned::MetaSchemaKeyInfo>{p}.get());
}, false, parse_to_json<doris::TabletSchemaCloudPB>,
parse_json<doris::TabletSchemaCloudPB>}},
+ {"VersionedMetaRowsetLoadKey", {{"instance_id", "tablet_id",
"version"}, [](param_type& p) { return
versioned::meta_rowset_load_key(KeyInfoSetter<versioned::MetaRowsetLoadKeyInfo>{p}.get());
}, true, [](const std::string& s) { return ""; },
parse_json<doris::RowsetMetaCloudPB>}},
+ {"VersionedMetaRowsetCompactKey", {{"instance_id", "tablet_id",
"version"}, [](param_type& p) { return
versioned::meta_rowset_compact_key(KeyInfoSetter<versioned::MetaRowsetCompactKeyInfo>{p}.get());
}, true, [](const std::string& s) { return ""; },
parse_json<doris::RowsetMetaCloudPB>}},
+ {"VersionedMetaDeleteBitmapKey", {{"instance_id", "tablet_id",
"rowset_id"}, [](param_type& p) { return
versioned::meta_delete_bitmap_key(KeyInfoSetter<versioned::MetaDeleteBitmapInfo>{p}.get());
}, false, parse_to_json<DeleteBitmapStoragePB>,
parse_json<DeleteBitmapStoragePB>}},
+ {"VersionedDataRowsetRefCountKey", {{"instance_id", "tablet_id",
"rowset_id"}, [](param_type& p) { return
versioned::data_rowset_ref_count_key(KeyInfoSetter<versioned::DataRowsetRefCountKeyInfo>{p}.get());
}, false, parse_int64_value, nullptr}},
};
// clang-format on
@@ -247,26 +314,49 @@ static MetaServiceResponseStatus encode_key(const
brpc::URI& uri, std::string* k
status.set_code(MetaServiceCode::OK);
std::string_view kt = http_query(uri, "key_type");
if (key_type != nullptr) *key_type = kt;
+
+ // Try single version key first
auto it = param_set.find(kt);
- if (it == param_set.end()) {
- status.set_code(MetaServiceCode::INVALID_ARGUMENT);
- status.set_msg(fmt::format("key_type not supported: {}", (kt.empty() ?
"(empty)" : kt)));
+ if (it != param_set.end()) {
+ auto& key_params = std::get<0>(it->second);
+ std::remove_cv_t<param_type> params;
+ params.reserve(key_params.size());
+ for (auto& i : key_params) {
+ auto p = uri.GetQuery(i.data());
+ if (p == nullptr || p->empty()) {
+ status.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ status.set_msg(fmt::format("{} is not given or empty", i));
+ return status;
+ }
+ params.emplace_back(p);
+ }
+ auto& key_encoding_function = std::get<1>(it->second);
+ *key = key_encoding_function(params);
return status;
}
- auto& key_params = std::get<0>(it->second);
- std::remove_cv_t<param_type> params;
- params.reserve(key_params.size());
- for (auto& i : key_params) {
- auto p = uri.GetQuery(i.data());
- if (p == nullptr || p->empty()) {
- status.set_code(MetaServiceCode::INVALID_ARGUMENT);
- status.set_msg(fmt::format("{} is not given or empty", i));
- return status;
+
+ // Try versioned key
+ auto vit = versioned_param_set.find(kt);
+ if (vit != versioned_param_set.end()) {
+ auto& key_params = std::get<0>(vit->second);
+ std::remove_cv_t<param_type> params;
+ params.reserve(key_params.size());
+ for (auto& i : key_params) {
+ auto p = uri.GetQuery(i.data());
+ if (p == nullptr || p->empty()) {
+ status.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ status.set_msg(fmt::format("{} is not given or empty", i));
+ return status;
+ }
+ params.emplace_back(p);
}
- params.emplace_back(p);
+ auto& key_encoding_function = std::get<1>(vit->second);
+ *key = key_encoding_function(params);
+ return status;
}
- auto& key_encoding_function = std::get<1>(it->second);
- *key = key_encoding_function(params);
+
+ status.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ status.set_msg(fmt::format("key_type not supported: {}", (kt.empty() ?
"(empty)" : kt)));
return status;
}
@@ -282,6 +372,7 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const
brpc::URI& uri) {
return http_json_reply(st);
}
}
+
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) [[unlikely]] {
@@ -289,6 +380,75 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const
brpc::URI& uri) {
fmt::format("failed to create txn, err={}",
err));
}
+ // Check if it's a versioned key
+ auto vit = versioned_param_set.find(key_type);
+ if (vit != versioned_param_set.end()) {
+ bool is_versioned_key = std::get<2>(vit->second);
+ auto& parse_function = std::get<3>(vit->second);
+
+ // Get versionstamp parameter, default to Versionstamp::max() if not
provided
+ Versionstamp snapshot_version = Versionstamp::max();
+ auto vs_opt = parse_versionstamp_from_uri(uri);
+ if (vs_opt.has_value()) {
+ snapshot_version = *vs_opt;
+ }
+
+ std::string value_json;
+ Versionstamp actual_version;
+ if (is_versioned_key) {
+ if (key_type == "VersionedMetaRowsetLoadKey" ||
+ key_type == "VersionedMetaRowsetCompactKey") {
+ doris::RowsetMetaCloudPB pb;
+ err = versioned::document_get(txn.get(), key,
snapshot_version, &pb,
+ &actual_version, false);
+ if (err == TxnErrorCode::TXN_OK) {
+ value_json = proto_to_json(pb);
+ }
+ } else if (key_type == "VersionedMetaSchemaKey") {
+ doris::TabletSchemaCloudPB pb;
+ err = versioned::document_get(txn.get(), key,
snapshot_version, &pb,
+ &actual_version, false);
+ if (err == TxnErrorCode::TXN_OK) {
+ value_json = proto_to_json(pb);
+ }
+ } else {
+ std::string value_str;
+ err = versioned_get(txn.get(), key, snapshot_version,
&actual_version, &value_str,
+ false);
+ if (err == TxnErrorCode::TXN_OK) {
+ value_json = parse_function(value_str);
+ }
+ }
+ if (!value_json.empty()) {
+ value_json += fmt::format("\\nversionstamp={}",
actual_version.to_string());
+ }
+ } else {
+ ValueBuf value_buf;
+ err = cloud::blob_get(txn.get(), key, &value_buf, true);
+ if (err == TxnErrorCode::TXN_OK) {
+ value_json = parse_function(value_buf.value());
+ }
+ }
+
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
+ fmt::format("versioned key not found,
key={}", hex(key)));
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ return http_json_reply(
+ MetaServiceCode::KV_TXN_GET_ERR,
+ fmt::format("failed to get versioned key, key={}, err={}",
hex(key), err));
+ }
+ if (value_json.empty()) {
+ return http_json_reply(
+ MetaServiceCode::PROTOBUF_PARSE_ERR,
+ fmt::format("failed to parse versioned value, key={}",
hex(key)));
+ }
+
+ return http_text_reply(MetaServiceCode::OK, "", value_json);
+ }
+
+ // Handle single version key (original logic)
auto it = param_set.find(key_type);
if (it == param_set.end()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
@@ -386,6 +546,93 @@ HttpResponse process_http_set_value(TxnKv* txn_kv,
brpc::Controller* cntl) {
fmt::format("failed to create txn, err={}",
err));
}
+ // Check if it's a versioned key
+ auto vit = versioned_param_set.find(key_type);
+ if (vit != versioned_param_set.end()) {
+ bool is_versioned_key = std::get<2>(vit->second);
+ auto& json_to_proto_function = std::get<4>(vit->second);
+
+ std::shared_ptr<google::protobuf::Message> pb_to_save;
+ if (json_to_proto_function) {
+ pb_to_save = json_to_proto_function(body);
+ }
+
+ // Check if versionstamp parameter is provided
+ auto vs_opt = parse_versionstamp_from_uri(uri);
+ if (is_versioned_key) {
+ if (key_type == "VersionedMetaRowsetLoadKey" ||
+ key_type == "VersionedMetaRowsetCompactKey" ||
+ key_type == "VersionedMetaSchemaKey") {
+ bool success = false;
+ if (vs_opt.has_value()) {
+ success = versioned::document_put(txn.get(), key, *vs_opt,
+ std::move(*pb_to_save));
+ } else {
+ success = versioned::document_put(txn.get(), key,
std::move(*pb_to_save));
+ }
+ if (!success) {
+ return http_json_reply(
+ MetaServiceCode::UNDEFINED_ERR,
+ fmt::format("failed to put versioned document,
key={}", hex(key)));
+ }
+ } else {
+ std::string value_to_save;
+ if (json_to_proto_function) {
+ value_to_save = pb_to_save->SerializeAsString();
+ if (value_to_save.empty()) {
+ return http_json_reply(
+ MetaServiceCode::UNDEFINED_ERR,
+ fmt::format("failed to serialize pb, key={}",
hex(key)));
+ }
+ } else {
+ value_to_save = body;
+ }
+
+ // Put the value
+ if (vs_opt.has_value()) {
+ versioned_put(txn.get(), key, *vs_opt, value_to_save);
+ } else {
+ versioned_put(txn.get(), key, value_to_save);
+ }
+ }
+ } else {
+ std::string value_to_save;
+ if (key_type == "VersionedDataRowsetRefCountKey") {
+ // Parse int64 from body
+ try {
+ int64_t count = std::stoll(body);
+ value_to_save.resize(sizeof(int64_t));
+ std::memcpy(value_to_save.data(), &count, sizeof(int64_t));
+ } catch (...) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ "failed to parse int64 value from
body");
+ }
+ } else if (json_to_proto_function) {
+ value_to_save = pb_to_save->SerializeAsString();
+ if (value_to_save.empty()) {
+ return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+ fmt::format("failed to serialize
pb, key={}", hex(key)));
+ }
+ } else {
+ value_to_save = body;
+ }
+ txn->put(key, value_to_save);
+ }
+
+ LOG(WARNING) << "set_value saved, key=" << hex(key);
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ return http_json_reply(
+ MetaServiceCode::UNDEFINED_ERR,
+ fmt::format("failed to commit versioned key, key={},
err={}", hex(key), err));
+ }
+
+ LOG(WARNING) << "set_value saved versioned key, key=" << hex(key);
+ return http_text_reply(MetaServiceCode::OK, "",
+ fmt::format("versioned key saved successfully,
key={}", hex(key)));
+ }
+
auto it = param_set.find(key_type);
if (it == param_set.end()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
@@ -479,6 +726,11 @@ HttpResponse process_http_encode_key(const brpc::URI& uri)
{
return http_json_reply(st);
}
+ // Append versionstamp to key if versionstamp parameter is provided
+ if (auto vs_opt = parse_versionstamp_from_uri(uri); vs_opt.has_value()) {
+ key = encode_versioned_key(key, *vs_opt);
+ }
+
// Print to ensure
bool unicode = !(uri.GetQuery("unicode") != nullptr &&
*uri.GetQuery("unicode") == "false");
diff --git a/cloud/test/http_encode_key_test.cpp
b/cloud/test/http_encode_key_test.cpp
index de1ffe42ffe..7b64a67dc8c 100644
--- a/cloud/test/http_encode_key_test.cpp
+++ b/cloud/test/http_encode_key_test.cpp
@@ -568,6 +568,18 @@ merged_stats:
{"idx":{"table_id":"10086","index_id":"100010","partition_id":"100
},
R"({"items":[{"key_id":"23456","key":"key_1"}]})",
},
+ Input {
+ "PackedFileKey",
+ "instance_id=gavin-instance&packed_file_path=/path/to/file",
+
{"01106d657461000110676176696e2d696e7374616e63650001107061636b65645f66696c650001102f706174682f746f2f66696c650001"},
+ []() -> std::vector<std::string> {
+ PackedFileInfoPB pb;
+ pb.set_resource_id("resource_id");
+ pb.set_ref_cnt(5);
+ return {pb.SerializeAsString()};
+ },
+ R"({"ref_cnt":"5","resource_id":"resource_id"})",
+ },
};
// clang-format on
@@ -638,3 +650,429 @@ TEST(HttpGetValueTest,
process_http_get_value_test_cover_all_template) {
EXPECT_EQ(http_res.body, input.value);
}
}
+
+// Test data for versioned keys
+// clang-format off
+static auto versioned_test_inputs = std::array {
+ Input {
+ "VersionedPartitionVersionKey",
+ "instance_id=gavin-instance&partition_id=10086",
+
{"031076657273696f6e000110676176696e2d696e7374616e6365000110706172746974696f6e0001120000000000002766"},
+ []() -> std::vector<std::string> {
+ VersionPB pb;
+ pb.set_version(100);
+ return {pb.SerializeAsString()};
+ },
+ R"({"version":"100"})",
+ },
+ Input {
+ "VersionedTableVersionKey",
+ "instance_id=gavin-instance&table_id=10086",
+
{"031076657273696f6e000110676176696e2d696e7374616e63650001107461626c650001120000000000002766"},
+ []() -> std::vector<std::string> {
+ return {""};
+ },
+ "",
+ },
+ Input {
+ "VersionedPartitionIndexKey",
+ "instance_id=gavin-instance&partition_id=10086",
+
{"0310696e646578000110676176696e2d696e7374616e6365000110706172746974696f6e0001120000000000002766"},
+ []() -> std::vector<std::string> {
+ PartitionIndexPB pb;
+ pb.set_db_id(10000);
+ pb.set_table_id(20000);
+ return {pb.SerializeAsString()};
+ },
+ R"({"db_id":"10000","table_id":"20000"})",
+ },
+ Input {
+ "VersionedPartitionInvertedIndexKey",
+
"instance_id=gavin-instance&db_id=10000&table_id=20000&partition_id=10086",
+
{"0310696e646578000110676176696e2d696e7374616e6365000110706172746974696f6e5f696e7665727465640001120000000000002710120000000000004e20120000000000002766"},
+ []() -> std::vector<std::string> {
+ return {""};
+ },
+ "",
+ },
+ Input {
+ "VersionedTabletIndexKey",
+ "instance_id=gavin-instance&tablet_id=10086",
+
{"0310696e646578000110676176696e2d696e7374616e63650001107461626c65740001120000000000002766"},
+ []() -> std::vector<std::string> {
+ TabletIndexPB pb;
+ pb.set_table_id(20000);
+ pb.set_index_id(30000);
+ pb.set_partition_id(10086);
+ pb.set_tablet_id(10086);
+ return {pb.SerializeAsString()};
+ },
+
R"({"table_id":"20000","index_id":"30000","partition_id":"10086","tablet_id":"10086"})",
+ },
+ Input {
+ "VersionedTabletInvertedIndexKey",
+
"instance_id=gavin-instance&db_id=10000&table_id=20000&index_id=30000&partition_id=10086&tablet_id=10087",
+
{"0310696e646578000110676176696e2d696e7374616e63650001107461626c65745f696e7665727465640001120000000000002710120000000000004e20120000000000007530120000000000002766120000000000002767"},
+ []() -> std::vector<std::string> {
+ return {""};
+ },
+ "",
+ },
+ Input {
+ "VersionedIndexIndexKey",
+ "instance_id=gavin-instance&index_id=10086",
+
{"0310696e646578000110676176696e2d696e7374616e6365000110696e6465780001120000000000002766"},
+ []() -> std::vector<std::string> {
+ IndexIndexPB pb;
+ pb.set_db_id(10000);
+ pb.set_table_id(20000);
+ return {pb.SerializeAsString()};
+ },
+ R"({"db_id":"10000","table_id":"20000"})",
+ },
+ Input {
+ "VersionedIndexInvertedKey",
+ "instance_id=gavin-instance&db_id=10000&table_id=20000&index_id=10086",
+
{"0310696e646578000110676176696e2d696e7374616e6365000110696e6465785f696e7665727465640001120000000000002710120000000000004e20120000000000002766"},
+ []() -> std::vector<std::string> {
+ return {""};
+ },
+ "",
+ },
+ Input {
+ "VersionedTabletLoadStatsKey",
+ "instance_id=gavin-instance&tablet_id=10086",
+
{"03107374617473000110676176696e2d696e7374616e63650001107461626c65745f6c6f61640001120000000000002766"},
+ []() -> std::vector<std::string> {
+ TabletStatsPB pb;
+ pb.set_data_size(1024);
+ pb.set_num_rows(100);
+ pb.set_num_rowsets(10);
+ pb.set_num_segments(20);
+ return {pb.SerializeAsString()};
+ },
+
R"({"data_size":"1024","num_rows":"100","num_rowsets":"10","num_segments":"20"})",
+ },
+ Input {
+ "VersionedTabletCompactStatsKey",
+ "instance_id=gavin-instance&tablet_id=10086",
+
{"03107374617473000110676176696e2d696e7374616e63650001107461626c65745f636f6d706163740001120000000000002766"},
+ []() -> std::vector<std::string> {
+ TabletStatsPB pb;
+ pb.set_data_size(2048);
+ pb.set_num_rows(200);
+ pb.set_num_rowsets(20);
+ pb.set_num_segments(40);
+ return {pb.SerializeAsString()};
+ },
+
R"({"data_size":"2048","num_rows":"200","num_rowsets":"20","num_segments":"40"})",
+ },
+ Input {
+ "VersionedMetaPartitionKey",
+ "instance_id=gavin-instance&partition_id=10086",
+
{"03106d657461000110676176696e2d696e7374616e6365000110706172746974696f6e0001120000000000002766"},
+ []() -> std::vector<std::string> {
+ return {""};
+ },
+ "",
+ },
+ Input {
+ "VersionedMetaIndexKey",
+ "instance_id=gavin-instance&index_id=10086",
+
{"03106d657461000110676176696e2d696e7374616e6365000110696e6465780001120000000000002766"},
+ []() -> std::vector<std::string> {
+ return {""};
+ },
+ "",
+ },
+ Input {
+ "VersionedMetaTabletKey",
+ "instance_id=gavin-instance&tablet_id=10086",
+
{"03106d657461000110676176696e2d696e7374616e63650001107461626c65740001120000000000002766"},
+ []() -> std::vector<std::string> {
+ doris::TabletMetaCloudPB pb;
+ pb.set_table_id(20000);
+ pb.set_index_id(30000);
+ pb.set_partition_id(10086);
+ pb.set_tablet_id(10086);
+ return {pb.SerializeAsString()};
+ },
+
R"({"table_id":"20000","partition_id":"10086","tablet_id":"10086","index_id":"30000"})",
+ },
+ Input {
+ "VersionedMetaSchemaKey",
+ "instance_id=gavin-instance&index_id=10086&schema_version=100",
+
{"03106d657461000110676176696e2d696e7374616e6365000110736368656d610001120000000000002766120000000000000064"},
+ []() -> std::vector<std::string> {
+ doris::TabletSchemaCloudPB pb;
+ pb.set_schema_version(100);
+ auto col = pb.add_column();
+ col->set_unique_id(1);
+ col->set_name("col_1");
+ col->set_type("INT");
+ return {pb.SerializeAsString()};
+ },
+
R"({"column":[{"unique_id":1,"name":"col_1","type":"INT"}],"schema_version":100})",
+ },
+ Input {
+ "VersionedMetaRowsetLoadKey",
+ "instance_id=gavin-instance&tablet_id=10086&version=100",
+
{"03106d657461000110676176696e2d696e7374616e6365000110726f777365745f6c6f61640001120000000000002766120000000000000064"},
+ []() -> std::vector<std::string> {
+ doris::RowsetMetaCloudPB pb;
+ pb.set_rowset_id(0);
+ pb.set_rowset_id_v2("rowset_1");
+ pb.set_tablet_id(10086);
+ pb.set_start_version(100);
+ pb.set_end_version(100);
+ return {pb.SerializeAsString()};
+ },
+
R"({"rowset_id":"0","tablet_id":"10086","start_version":"100","end_version":"100","rowset_id_v2":"rowset_1"})",
+ },
+ Input {
+ "VersionedMetaRowsetCompactKey",
+ "instance_id=gavin-instance&tablet_id=10086&version=100",
+
{"03106d657461000110676176696e2d696e7374616e6365000110726f777365745f636f6d706163740001120000000000002766120000000000000064"},
+ []() -> std::vector<std::string> {
+ doris::RowsetMetaCloudPB pb;
+ pb.set_rowset_id(0);
+ pb.set_rowset_id_v2("rowset_2");
+ pb.set_tablet_id(10086);
+ pb.set_start_version(100);
+ pb.set_end_version(100);
+ return {pb.SerializeAsString()};
+ },
+
R"({"rowset_id":"0","tablet_id":"10086","start_version":"100","end_version":"100","rowset_id_v2":"rowset_2"})",
+ },
+ Input {
+ "VersionedMetaDeleteBitmapKey",
+ "instance_id=gavin-instance&tablet_id=10086&rowset_id=rowset_1",
+
{"03106d657461000110676176696e2d696e7374616e636500011064656c6574655f6269746d6170000112000000000000276610726f777365745f310001"},
+ []() -> std::vector<std::string> {
+ DeleteBitmapStoragePB pb;
+ pb.set_store_in_fdb(false);
+ return {pb.SerializeAsString()};
+ },
+ R"({"store_in_fdb":false})",
+ },
+ Input {
+ "VersionedDataRowsetRefCountKey",
+ "instance_id=gavin-instance&tablet_id=10086&rowset_id=rowset_1",
+
{"031064617461000110676176696e2d696e7374616e6365000110726f777365745f7265665f636f756e74000112000000000000276610726f777365745f310001"},
+ []() -> std::vector<std::string> {
+ int64_t count = 42;
+ std::string value;
+ value.resize(sizeof(int64_t));
+ std::memcpy(value.data(), &count, sizeof(int64_t));
+ return {value};
+ },
+ "42",
+ },
+};
+// clang-format on
+
+TEST(HttpEncodeKeyTest, process_http_encode_versioned_key_test) {
+ static auto format_fdb_key = [](const std::string& s) {
+ std::stringstream r;
+ for (size_t i = 0; i < s.size(); ++i) {
+ if (!(i % 2)) r << "\\x";
+ r << s[i];
+ }
+ return r.str();
+ };
+ brpc::URI uri;
+ (void)uri.get_query_map(); // initialize query map
+ for (auto&& input : versioned_test_inputs) {
+ std::stringstream url;
+ url << "localhost:5000/MetaService/http?key_type=" << input.key_type;
+ if (!input.param.empty()) {
+ url << "&" << input.param;
+ }
+ EXPECT_EQ(uri.SetHttpURL(url.str()), 0); // clear and set query string
+ auto http_res = process_http_encode_key(uri);
+ EXPECT_EQ(http_res.status_code, 200) << "Failed for key_type: " <<
input.key_type;
+ EXPECT_NE(http_res.body.find(input.key[0]), std::string::npos)
+ << "real full text: " << http_res.body << "\nexpect contains:
" << input.key[0]
+ << "\n"
+ << format_fdb_key(input.key[0]) << "\nkey_type: " <<
input.key_type
+ << "\nurl: " << url.str();
+ }
+}
+
+TEST(HttpEncodeKeyTest, process_http_encode_versioned_key_with_versionstamp) {
+ brpc::URI uri;
+ (void)uri.get_query_map();
+
+ // Test encoding a versioned key with versionstamp parameter
+ uri.SetQuery("key_type", "VersionedPartitionVersionKey");
+ uri.SetQuery("instance_id", "gavin-instance");
+ uri.SetQuery("partition_id", "10086");
+ // Versionstamp in hex format: 10 bytes
+ uri.SetQuery("versionstamp", "00000000000000010001");
+
+ auto http_res = process_http_encode_key(uri);
+ EXPECT_EQ(http_res.status_code, 200);
+ // The encoded key should contain both the base key and the versionstamp
+
EXPECT_NE(http_res.body.find("031076657273696f6e000110676176696e2d696e7374616e63650001107061727"
+ "46974696f6e0001120000000000002766"),
+ std::string::npos);
+}
+
+TEST(HttpGetValueTest, process_http_get_value_versioned_key_test) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ // Prepare test data for versioned keys
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ // Test VersionedPartitionIndexKey (non-versioned value)
+ std::string partition_index_key =
+
unhex("0310696e646578000110676176696e2d696e7374616e6365000110706172746974696f6e00011200"
+ "00000000002766");
+ PartitionIndexPB partition_index_pb;
+ partition_index_pb.set_db_id(10000);
+ partition_index_pb.set_table_id(20000);
+ txn->put(partition_index_key, partition_index_pb.SerializeAsString());
+
+ // Test VersionedTabletIndexKey (non-versioned value)
+ std::string tablet_index_key =
+
unhex("0310696e646578000110676176696e2d696e7374616e63650001107461626c657400011200000000"
+ "00002766");
+ TabletIndexPB tablet_index_pb;
+ tablet_index_pb.set_table_id(20000);
+ tablet_index_pb.set_index_id(30000);
+ tablet_index_pb.set_partition_id(10086);
+ tablet_index_pb.set_tablet_id(10086);
+ txn->put(tablet_index_key, tablet_index_pb.SerializeAsString());
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ brpc::URI uri;
+ (void)uri.get_query_map();
+
+ // Test getting VersionedPartitionIndexKey
+ uri.SetQuery("key_type", "VersionedPartitionIndexKey");
+ uri.SetQuery("instance_id", "gavin-instance");
+ uri.SetQuery("partition_id", "10086");
+ auto http_res = process_http_get_value(txn_kv.get(), uri);
+ EXPECT_EQ(http_res.status_code, 200);
+ EXPECT_NE(http_res.body.find("\"db_id\":\"10000\""), std::string::npos);
+ EXPECT_NE(http_res.body.find("\"table_id\":\"20000\""), std::string::npos);
+
+ // Test getting VersionedTabletIndexKey
+ uri.SetQuery("key_type", "VersionedTabletIndexKey");
+ uri.SetQuery("tablet_id", "10086");
+ http_res = process_http_get_value(txn_kv.get(), uri);
+ EXPECT_EQ(http_res.status_code, 200);
+ EXPECT_NE(http_res.body.find("\"table_id\":\"20000\""), std::string::npos)
<< http_res.body;
+ EXPECT_NE(http_res.body.find("\"index_id\":\"30000\""), std::string::npos);
+
+ // Test getting VersionedDataRowsetRefCountKey
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string ref_count_key =
+
unhex("031064617461000110676176696e2d696e7374616e6365000110726f777365745f7265665f636f75"
+ "6e74000112000000000000276610726f777365745f310001");
+ int64_t count = 42;
+ std::string count_value;
+ count_value.resize(sizeof(int64_t));
+ std::memcpy(count_value.data(), &count, sizeof(int64_t));
+ txn->put(ref_count_key, count_value);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ uri.SetQuery("key_type", "VersionedDataRowsetRefCountKey");
+ uri.SetQuery("tablet_id", "10086");
+ uri.SetQuery("rowset_id", "rowset_1");
+ http_res = process_http_get_value(txn_kv.get(), uri);
+ EXPECT_EQ(http_res.status_code, 200);
+ EXPECT_EQ(http_res.body, "42");
+}
+
+TEST(HttpSetValueTest, process_http_set_value_versioned_key_test) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ brpc::Controller cntl;
+ brpc::URI uri;
+ (void)uri.get_query_map();
+
+ // Test setting VersionedPartitionIndexKey (non-versioned value)
+ uri.SetQuery("key_type", "VersionedPartitionIndexKey");
+ uri.SetQuery("instance_id", "gavin-instance");
+ uri.SetQuery("partition_id", "10086");
+ cntl.http_request().uri() = uri;
+
+ PartitionIndexPB pb;
+ pb.set_db_id(10000);
+ pb.set_table_id(20000);
+ std::string json_body = proto_to_json(pb);
+ cntl.request_attachment().append(json_body);
+
+ auto http_res = process_http_set_value(txn_kv.get(), &cntl);
+ EXPECT_EQ(http_res.status_code, 200) << http_res.body;
+
+ // Verify the value was set correctly
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key =
+
unhex("0310696e646578000110676176696e2d696e7374616e6365000110706172746974696f6e00011200"
+ "00000000002766");
+ std::string value;
+ ASSERT_EQ(txn->get(key, &value), TxnErrorCode::TXN_OK);
+ PartitionIndexPB retrieved_pb;
+ ASSERT_TRUE(retrieved_pb.ParseFromString(value));
+ EXPECT_EQ(retrieved_pb.db_id(), 10000);
+ EXPECT_EQ(retrieved_pb.table_id(), 20000);
+
+ // Test setting VersionedDataRowsetRefCountKey (int64 value)
+ brpc::Controller cntl2;
+ brpc::URI uri2;
+ (void)uri2.get_query_map();
+ uri2.SetQuery("key_type", "VersionedDataRowsetRefCountKey");
+ uri2.SetQuery("instance_id", "gavin-instance");
+ uri2.SetQuery("tablet_id", "10086");
+ uri2.SetQuery("rowset_id", "rowset_1");
+ cntl2.http_request().uri() = uri2;
+ cntl2.request_attachment().append("42");
+
+ http_res = process_http_set_value(txn_kv.get(), &cntl2);
+ EXPECT_EQ(http_res.status_code, 200) << http_res.body;
+
+ // Verify the int64 value was set correctly
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string ref_count_key =
+
unhex("031064617461000110676176696e2d696e7374616e6365000110726f777365745f7265665f636f75"
+ "6e74000112000000000000276610726f777365745f310001");
+ std::string ref_count_value;
+ ASSERT_EQ(txn->get(ref_count_key, &ref_count_value), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(ref_count_value.size(), sizeof(int64_t));
+ int64_t retrieved_count = 0;
+ std::memcpy(&retrieved_count, ref_count_value.data(), sizeof(int64_t));
+ EXPECT_EQ(retrieved_count, 42);
+}
+
+TEST(HttpEncodeKeyTest, parse_versionstamp_from_uri_test) {
+ brpc::URI uri;
+ (void)uri.get_query_map();
+
+ // Test valid versionstamp
+ uri.SetQuery("versionstamp", "00000000000000010001");
+ uri.SetQuery("key_type", "VersionedPartitionVersionKey");
+ uri.SetQuery("instance_id", "test");
+ uri.SetQuery("partition_id", "1");
+
+ auto http_res = process_http_encode_key(uri);
+ EXPECT_EQ(http_res.status_code, 200);
+
+ // Test invalid versionstamp (wrong size)
+ brpc::URI uri2;
+ (void)uri2.get_query_map();
+ uri2.SetQuery("versionstamp", "0000000000"); // Only 5 bytes, should be 10
+ uri2.SetQuery("key_type", "VersionedPartitionVersionKey");
+ uri2.SetQuery("instance_id", "test");
+ uri2.SetQuery("partition_id", "1");
+
+ http_res = process_http_encode_key(uri2);
+ // Should still succeed in encoding the key, versionstamp will be ignored
if invalid
+ EXPECT_EQ(http_res.status_code, 200);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]