This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7630b62e72f branch-3.0: [opt](meta-service) Implement set_value API for meta-servce #49052 (#49359) 7630b62e72f is described below commit 7630b62e72fb00611bc41014fae9c57e997328e4 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Mon Mar 24 11:32:02 2025 +0800 branch-3.0: [opt](meta-service) Implement set_value API for meta-servce #49052 (#49359) Cherry-picked from #49052 Co-authored-by: Gavin Chou <ga...@selectdb.com> --- cloud/src/common/util.cpp | 24 ++++ cloud/src/common/util.h | 6 + cloud/src/meta-service/http_encode_key.cpp | 208 ++++++++++++++++++++++----- cloud/src/meta-service/meta_service_http.cpp | 16 ++- cloud/src/meta-service/meta_service_http.h | 3 + cloud/test/meta_service_http_test.cpp | 73 ++++++++++ 6 files changed, 289 insertions(+), 41 deletions(-) diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp index 8d1c8fed983..50f29afb0ba 100644 --- a/cloud/src/common/util.cpp +++ b/cloud/src/common/util.cpp @@ -247,6 +247,30 @@ bool ValueBuf::to_pb(google::protobuf::Message* pb) const { return pb->ParseFromZeroCopyStream(&merge_stream); } +std::string ValueBuf::value() const { + butil::IOBuf merge; + for (auto&& it : iters) { + it->reset(); + while (it->has_next()) { + auto [k, v] = it->next(); + merge.append_user_data((void*)v.data(), v.size(), +[](void*) {}); + } + } + return merge.to_string(); +} + +std::vector<std::string> ValueBuf::keys() const { + std::vector<std::string> ret; + for (auto&& it : iters) { + it->reset(); + while (it->has_next()) { + auto [k, _] = it->next(); + ret.push_back({k.data(), k.size()}); + } + } + return ret; +} + void ValueBuf::remove(Transaction* txn) const { for (auto&& it : iters) { it->reset(); diff --git a/cloud/src/common/util.h b/cloud/src/common/util.h index de37c2f4d9b..8f2e8aa077e 100644 --- a/cloud/src/common/util.h +++ b/cloud/src/common/util.h @@ -89,6 +89,12 @@ struct ValueBuf { // Return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error. TxnErrorCode get(Transaction* txn, std::string_view key, bool snapshot = false); + // return the merged value in ValueBuf + std::string value() const; + + // return all keys in ValueBuf, if the value is not splitted, size of keys is 1 + std::vector<std::string> keys() const; + std::vector<std::unique_ptr<RangeGetIterator>> iters; int8_t ver {-1}; }; diff --git a/cloud/src/meta-service/http_encode_key.cpp b/cloud/src/meta-service/http_encode_key.cpp index 4d05f0121b0..728b52df2d8 100644 --- a/cloud/src/meta-service/http_encode_key.cpp +++ b/cloud/src/meta-service/http_encode_key.cpp @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include <brpc/controller.h> #include <brpc/uri.h> #include <fmt/format.h> #include <gen_cpp/cloud.pb.h> +#include <google/protobuf/message.h> +#include <google/protobuf/util/json_util.h> #include <bit> #include <iomanip> @@ -29,6 +32,7 @@ #include <utility> #include <vector> +#include "common/logging.h" #include "common/util.h" #include "cpp/sync_point.h" #include "meta-service/codec.h" @@ -80,6 +84,21 @@ struct KeyInfoSetter { }; // clang-format on +template <typename Message> +static std::shared_ptr<Message> parse_json(const std::string& json) { + static_assert(std::is_base_of_v<google::protobuf::Message, Message>); + auto ret = std::make_shared<Message>(); + auto st = google::protobuf::util::JsonStringToMessage(json, ret.get()); + if (st.ok()) return ret; + std::string err = "failed to strictly parse json message, error: " + st.ToString(); + LOG_WARNING(err).tag("json", json); + // ignore unknown fields + // google::protobuf::util::JsonParseOptions json_parse_options; + // json_parse_options.ignore_unknown_fields = true; + // return google::protobuf::util::JsonStringToMessage(body, req, json_parse_options); + return nullptr; +} + using param_type = const std::vector<const std::string*>; template <class ProtoType> @@ -177,41 +196,45 @@ static std::string parse_tablet_stats(const ValueBuf& buf) { } // See keys.h to get all types of key, e.g: MetaRowsetKeyInfo -// key_type -> {{param1, param2 ...}, key_encoding_func, value_parsing_func} -// where params are the input for key_encoding_func +// key_type -> {{param_name1, param_name2 ...}, key_encoding_func, serialized_pb_to_json_parsing_func, json_to_proto_parsing_func} +// where param names are the input for key_encoding_func // clang-format off +// key_type static std::unordered_map<std::string_view, - std::tuple<std::vector<std::string_view>, std::function<std::string(param_type&)>, std::function<std::string(const ValueBuf&)>>> param_set { - {"InstanceKey", {{"instance_id"}, [](param_type& p) { return instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); } , parse<InstanceInfoPB>}} , - {"TxnLabelKey", {{"instance_id", "db_id", "label"}, [](param_type& p) { return txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); } , parse_txn_label}} , - {"TxnInfoKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); } , parse<TxnInfoPB>}} , - {"TxnIndexKey", {{"instance_id", "txn_id"}, [](param_type& p) { return txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); } , parse<TxnIndexPB>}} , - {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); } , parse<TxnRunningPB>}} , - {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); } , parse<VersionPB>}} , - {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"}, [](param_type& p) { return table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); } , parse<VersionPB>}} , - {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"}, [](param_type& p) { return meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); } , parse<doris::RowsetMetaCloudPB>}} , - {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"}, [](param_type& p) { return meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); } , parse<doris::RowsetMetaCloudPB>}} , - {"MetaTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); } , parse<doris::TabletMetaCloudPB>}} , - {"MetaTabletIdxKey", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); } , parse<TabletIndexPB>}} , - {"RecycleIndexKey", {{"instance_id", "index_id"}, [](param_type& p) { return recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); } , parse<RecycleIndexPB>}} , - {"RecyclePartKey", {{"instance_id", "part_id"}, [](param_type& p) { return recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); } , parse<RecyclePartitionPB>}} , - {"RecycleRowsetKey", {{"instance_id", "tablet_id", "rowset_id"}, [](param_type& p) { return recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); } , parse<RecycleRowsetPB>}} , - {"RecycleTxnKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); } , parse<RecycleTxnPB>}} , - {"StatsTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); } , parse_tablet_stats}} , - {"JobTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); } , parse<TabletJobInfoPB>}} , - {"CopyJobKey", {{"instance_id", "stage_id", "table_id", "copy_id", "group_id"}, [](param_type& p) { return copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); } , parse<CopyJobPB>}} , - {"CopyFileKey", {{"instance_id", "stage_id", "table_id", "obj_key", "obj_etag"}, [](param_type& p) { return copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); } , parse<CopyFilePB>}} , - {"RecycleStageKey", {{"instance_id", "stage_id"}, [](param_type& p) { return recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); } , parse<RecycleStagePB>}} , - {"JobRecycleKey", {{"instance_id"}, [](param_type& p) { return job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); } , parse<JobRecyclePB>}} , - {"MetaSchemaKey", {{"instance_id", "index_id", "schema_version"}, [](param_type& p) { return meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); } , parse_tablet_schema}} , - {"MetaDeleteBitmap", {{"instance_id", "tablet_id", "rowest_id", "version", "seg_id"}, [](param_type& p) { return meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); } , parse_delete_bitmap}} , - {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", "partition_id"}, [](param_type& p) { return meta_delete_bitmap_update_lock_key( KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); }, parse<DeleteBitmapUpdateLockPB>}}, - {"MetaPendingDeleteBitmap", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_pending_delete_bitmap_key( KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); } , parse<PendingDeleteBitmapPB>}} , - {"RLJobProgressKey", {{"instance_id", "db_id", "job_id"}, [](param_type& p) { return rl_job_progress_key_info( KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); } , parse<RoutineLoadProgressPB>}} , - {"MetaServiceRegistryKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_registry_key(); } , parse<ServiceRegistryPB>}} , - {"MetaServiceArnInfoKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_arn_info_key(); } , parse<RamUserPB>}} , - {"MetaServiceEncryptionKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_encryption_key_info_key(); } , parse<EncryptionKeyInfoPB>}} , - {"StorageVaultKey", {{"instance_id", "vault_id"}, [](param_type& p) { return storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); } , parse<StorageVaultPB>}} ,}; + // params key_encoding_func serialized_pb_to_json_parsing_func json_to_proto_parsing_func + std::tuple<std::vector<std::string_view>, std::function<std::string(param_type&)>, std::function<std::string(const ValueBuf&)>, std::function<std::shared_ptr<google::protobuf::Message>(const std::string&)>>> param_set { + {"InstanceKey", {{"instance_id"}, [](param_type& p) { return instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); }, parse<InstanceInfoPB> , parse_json<InstanceInfoPB>}}, + {"TxnLabelKey", {{"instance_id", "db_id", "label"}, [](param_type& p) { return txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); }, parse_txn_label , parse_json<TxnLabelPB>}}, + {"TxnInfoKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); }, parse<TxnInfoPB> , parse_json<TxnInfoPB>}}, + {"TxnIndexKey", {{"instance_id", "txn_id"}, [](param_type& p) { return txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); }, parse<TxnIndexPB> , parse_json<TxnIndexPB>}}, + {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); }, parse<TxnRunningPB> , parse_json<TxnRunningPB>}}, + {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); }, parse<VersionPB> , parse_json<VersionPB>}}, + {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"}, [](param_type& p) { return table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); }, parse<VersionPB> , parse_json<VersionPB>}}, + {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"}, [](param_type& p) { return meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); }, parse<doris::RowsetMetaCloudPB> , parse_json<doris::RowsetMetaCloudPB>}}, + {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"}, [](param_type& p) { return meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); }, parse<doris::RowsetMetaCloudPB> , parse_json<doris::RowsetMetaCloudPB>}}, + {"MetaTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); }, parse<doris::TabletMetaCloudPB> , parse_json<doris::TabletMetaCloudPB>}}, + {"MetaTabletIdxKey", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); }, parse<TabletIndexPB> , parse_json<TabletIndexPB>}}, + {"RecycleIndexKey", {{"instance_id", "index_id"}, [](param_type& p) { return recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); }, parse<RecycleIndexPB> , parse_json<RecycleIndexPB>}}, + {"RecyclePartKey", {{"instance_id", "part_id"}, [](param_type& p) { return recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); }, parse<RecyclePartitionPB> , parse_json<RecyclePartitionPB>}}, + {"RecycleRowsetKey", {{"instance_id", "tablet_id", "rowset_id"}, [](param_type& p) { return recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); }, parse<RecycleRowsetPB> , parse_json<RecycleRowsetPB>}}, + {"RecycleTxnKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); }, parse<RecycleTxnPB> , parse_json<RecycleTxnPB>}}, + {"StatsTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); }, parse_tablet_stats , parse_json<TabletStatsPB>}}, + {"JobTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); }, parse<TabletJobInfoPB> , parse_json<TabletJobInfoPB>}}, + {"CopyJobKey", {{"instance_id", "stage_id", "table_id", "copy_id", "group_id"}, [](param_type& p) { return copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); }, parse<CopyJobPB> , parse_json<CopyJobPB>}}, + {"CopyFileKey", {{"instance_id", "stage_id", "table_id", "obj_key", "obj_etag"}, [](param_type& p) { return copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); }, parse<CopyFilePB> , parse_json<CopyFilePB>}}, + {"RecycleStageKey", {{"instance_id", "stage_id"}, [](param_type& p) { return recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); }, parse<RecycleStagePB> , parse_json<RecycleStagePB>}}, + {"JobRecycleKey", {{"instance_id"}, [](param_type& p) { return job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); }, parse<JobRecyclePB> , parse_json<JobRecyclePB>}}, + {"MetaSchemaKey", {{"instance_id", "index_id", "schema_version"}, [](param_type& p) { return meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); }, parse_tablet_schema , parse_json<doris::TabletSchemaCloudPB>}}, + {"MetaDeleteBitmap", {{"instance_id", "tablet_id", "rowest_id", "version", "seg_id"}, [](param_type& p) { return meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); }, parse_delete_bitmap , parse_json<DeleteBitmapPB>}}, + {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", "partition_id"}, [](param_type& p) { return meta_delete_bitmap_update_lock_key(KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); }, parse<DeleteBitmapUpdateLockPB> , parse_json<DeleteBitmapUpdateLockPB>}}, + {"MetaPendingDeleteBitmap", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_pending_delete_bitmap_key(KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); }, parse<PendingDeleteBitmapPB> , parse_json<PendingDeleteBitmapPB>}}, + {"RLJobProgressKey", {{"instance_id", "db_id", "job_id"}, [](param_type& p) { return rl_job_progress_key_info(KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); }, parse<RoutineLoadProgressPB> , parse_json<RoutineLoadProgressPB>}}, + {"StorageVaultKey", {{"instance_id", "vault_id"}, [](param_type& p) { return storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); }, parse<StorageVaultPB> , parse_json<StorageVaultPB>}}, + {"MetaSchemaPBDictionaryKey", {{"instance_id", "index_id"}, [](param_type& p) { return meta_schema_pb_dictionary_key(KeyInfoSetter<MetaSchemaPBDictionaryInfo>{p}.get()); }, parse<SchemaCloudDictionary> , parse_json<SchemaCloudDictionary>}}, + {"MetaServiceRegistryKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_registry_key(); }, parse<ServiceRegistryPB> , parse_json<ServiceRegistryPB>}}, + {"MetaServiceArnInfoKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_arn_info_key(); }, parse<RamUserPB> , parse_json<RamUserPB>}}, + {"MetaServiceEncryptionKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_encryption_key_info_key(); }, parse<EncryptionKeyInfoPB> , parse_json<EncryptionKeyInfoPB>}}, +}; // clang-format on static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& key) { @@ -225,9 +248,10 @@ static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& k (key_type.empty() ? "(empty)" : key_type))); return status; } + auto& key_params = std::get<0>(it->second); std::remove_cv_t<param_type> params; - params.reserve(std::get<0>(it->second).size()); - for (auto& i : std::get<0>(it->second)) { + params.reserve(key_params.size()); + for (auto& i : key_params) { auto p = uri.GetQuery(i.data()); if (p == nullptr || p->empty()) { status.set_code(MetaServiceCode::INVALID_ARGUMENT); @@ -236,7 +260,8 @@ static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& k } params.emplace_back(p); } - key = std::get<1>(it->second)(params); + auto& key_encoding_function = std::get<1>(it->second); + key = key_encoding_function(params); return status; } @@ -292,7 +317,8 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri) { return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR, fmt::format("failed to get kv, key={}", hex(key))); } - auto readable_value = std::get<2>(it->second)(value); + auto& value_parsing_function = std::get<2>(it->second); + auto readable_value = value_parsing_function(value); if (readable_value.empty()) [[unlikely]] { return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, fmt::format("failed to parse value, key={}", hex(key))); @@ -300,6 +326,114 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri) { return http_text_reply(MetaServiceCode::OK, "", readable_value); } +HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* cntl) { + const brpc::URI& uri = cntl->http_request().uri(); + std::string body = cntl->request_attachment().to_string(); + LOG(INFO) << "set value, body=" << body; + + std::string key; + if (auto hex_key = http_query(uri, "key"); !hex_key.empty()) { + key = unhex(hex_key); + } else { // Encode key from params + auto st = encode_key(uri, key); + if (st.code() != MetaServiceCode::OK) { + return http_json_reply(st); + } + } + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) [[unlikely]] { + return http_json_reply(MetaServiceCode::KV_TXN_CREATE_ERR, + fmt::format("failed to create txn, err={}", err)); + } + + std::string_view key_type = http_query(uri, "key_type"); + auto it = param_set.find(key_type); + if (it == param_set.end()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("key_type not supported: {}", + (key_type.empty() ? "(empty)" : key_type))); + } + auto& json_parsing_function = std::get<3>(it->second); + std::shared_ptr<google::protobuf::Message> pb_to_save = json_parsing_function(body); + if (pb_to_save == nullptr) { + LOG(WARNING) << "invalid input json value for key_type=" << key_type; + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid input json value, cannot parse json to pb, key_type={}", + key_type)); + } + + LOG(INFO) << "parsed pb to save key_type=" << key_type << " key=" << hex(key) + << " pb_to_save=" << proto_to_json(*pb_to_save); + + // ATTN: + // StatsTabletKey is special, it has a series of keys, we only handle the base stat key + // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are splited in to multiple KV + ValueBuf value; + err = cloud::get(txn.get(), key, &value, true); + + bool kv_found = true; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + // it is possible the key-value to set is non-existed + kv_found = false; + } else if (err != TxnErrorCode::TXN_OK) { + return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR, + fmt::format("failed to get kv, key={}", hex(key))); + } + + auto concat = [](const std::vector<std::string>& keys) { + std::stringstream s; + for (auto& i : keys) s << hex(i) << ", "; + return s.str(); + }; + LOG(INFO) << "set value, key_type=" << key_type << " " << value.keys().size() << " keys=[" + << concat(value.keys()) << "]"; + + std::string original_value_json; + if (kv_found) { + auto& serialized_value_to_json_parsing_function = std::get<2>(it->second); + original_value_json = serialized_value_to_json_parsing_function(value); + if (original_value_json.empty()) [[unlikely]] { + LOG(WARNING) << "failed to parse value, key=" << hex(key) + << " val=" << hex(value.value()); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, + fmt::format("failed to parse value, key={}", hex(key))); + } else { + LOG(INFO) << "original_value_json=" << original_value_json; + } + } + std::string serialized_value_to_save = pb_to_save->SerializeAsString(); + if (serialized_value_to_save.empty()) { + LOG(WARNING) << "failed to serialize, key=" << hex(key); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, + fmt::format("failed to serialize, key={}", hex(key))); + } + // we need to remove the original KVs, it may be a range of keys + // and the number of final keys may be less than the initial number of keys + if (kv_found) value.remove(txn.get()); + + // TODO(gavin): use cloud::put() to deal with split-multi-kv and special keys + // StatsTabletKey is special, it has a series of keys, we only handle the base stat key + // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are splited in to multiple KV + txn->put(key, serialized_value_to_save); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + std::stringstream ss; + ss << "failed to commit txn when set value, err=" << err << " key=" << hex(key); + LOG(WARNING) << ss.str(); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, ss.str()); + } + LOG(WARNING) << "set_value saved, key=" << hex(key); + + std::stringstream final_json; + final_json << "original_value_hex=" << hex(value.value()) << "\n" + << "key_hex=" << hex(key) << "\n" + << "original_value_json=" << original_value_json << "\n"; + + return http_text_reply(MetaServiceCode::OK, "", final_json.str()); +} + HttpResponse process_http_encode_key(const brpc::URI& uri) { std::string key; auto st = encode_key(uri, key); diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index c6eb07010c4..53d9dbbef83 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -158,7 +158,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, return {status_code, msg, sb.GetString()}; } -static std::string format_http_request(const brpc::HttpHeader& request) { +static std::string format_http_request(brpc::Controller* cntl) { + const brpc::HttpHeader& request = cntl->http_request(); auto& unresolved_path = request.unresolved_path(); auto& uri = request.uri(); std::stringstream ss; @@ -173,6 +174,8 @@ static std::string format_http_request(const brpc::HttpHeader& request) { for (auto it = request.HeaderBegin(); it != request.HeaderEnd(); ++it) { ss << "\n" << it->first << ":" << it->second; } + std::string body = cntl->request_attachment().to_string(); + ss << "\nbody=" << (body.empty() ? "(empty)" : body); return ss.str(); } @@ -509,6 +512,10 @@ static HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); } +static HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_set_value(service->txn_kv().get(), ctrl); +} + // show all key ranges and their count. static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { auto txn_kv = std::dynamic_pointer_cast<FdbTxnKv>(service->txn_kv()); @@ -737,6 +744,7 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"decode_key", process_decode_key}, {"encode_key", process_encode_key}, {"get_value", process_get_value}, + {"set_value", process_set_value}, {"show_meta_ranges", process_show_meta_ranges}, {"txn_lazy_commit", process_txn_lazy_commit}, {"fix_tablet_stats", process_fix_tablet_stats}, @@ -744,11 +752,11 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"v1/decode_key", process_decode_key}, {"v1/encode_key", process_encode_key}, {"v1/get_value", process_get_value}, + {"v1/set_value", process_set_value}, {"v1/show_meta_ranges", process_show_meta_ranges}, {"v1/txn_lazy_commit", process_txn_lazy_commit}, {"v1/injection_point", process_injection_point}, - // for get - {"get_instance", process_get_instance_info}, + {"v1/fix_tablet_stats", process_fix_tablet_stats}, // for get {"get_instance", process_get_instance_info}, {"get_obj_store_info", process_get_obj_store_info}, @@ -785,7 +793,7 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, // Prepare input request info LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << cntl->http_request().uri().path(); - std::string http_request = format_http_request(cntl->http_request()); + std::string http_request = format_http_request(cntl); // Auth auto token = http_query(cntl->http_request().uri(), "token"); diff --git a/cloud/src/meta-service/meta_service_http.h b/cloud/src/meta-service/meta_service_http.h index ead53f0630f..1dca1d3d64d 100644 --- a/cloud/src/meta-service/meta_service_http.h +++ b/cloud/src/meta-service/meta_service_http.h @@ -17,6 +17,7 @@ #pragma once +#include <brpc/controller.h> #include <brpc/uri.h> #include <gen_cpp/cloud.pb.h> @@ -41,6 +42,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri); +HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* ctrl); + HttpResponse process_http_encode_key(const brpc::URI& uri); /// Return the query value or an empty string if not exists. diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 3afb2f66bf6..e51e6fa819b 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "meta-service/meta_service_http.h" + #include <brpc/channel.h> #include <brpc/controller.h> #include <brpc/server.h> @@ -1761,4 +1763,75 @@ TEST(MetaServiceHttpTest, UpdateConfig) { } } +TEST(HttpEncodeKeyTest, ProcessHttpSetValue) { + auto txn_kv = std::make_shared<MemTxnKv>(); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + // Create and serialize initial RowsetMeta + RowsetMetaCloudPB initial_rowset_meta; + initial_rowset_meta.set_rowset_id_v2("12345"); + initial_rowset_meta.set_rowset_id(0); + initial_rowset_meta.set_tablet_id(67890); + initial_rowset_meta.set_num_rows(100); + initial_rowset_meta.set_data_disk_size(1024); + std::string serialized_initial = initial_rowset_meta.SerializeAsString(); + + // Generate proper rowset meta key + std::string instance_id = "test_instance"; + int64_t tablet_id = 67890; + int64_t version = 10086; + + // Generate proper rowset meta key + MetaRowsetKeyInfo key_info {instance_id, tablet_id, version}; + std::string initial_key = meta_rowset_key(key_info); + + // Store initial RowsetMeta in TxnKv + txn->put(initial_key, serialized_initial); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + // Create new RowsetMeta to update + RowsetMetaCloudPB new_rowset_meta; + new_rowset_meta.set_rowset_id_v2("12345"); + new_rowset_meta.set_rowset_id(0); + new_rowset_meta.set_tablet_id(67890); + new_rowset_meta.set_num_rows(200); // Updated row count + new_rowset_meta.set_data_disk_size(2048); // Updated size + std::string json_value = proto_to_json(new_rowset_meta); + + // Initialize cntl URI with required parameters + brpc::URI cntl_uri; + cntl_uri._path = "/meta-service/http/set_value"; + cntl_uri.SetQuery("key_type", "MetaRowsetKey"); + cntl_uri.SetQuery("instance_id", instance_id); + cntl_uri.SetQuery("tablet_id", std::to_string(tablet_id)); + cntl_uri.SetQuery("version", std::to_string(version)); + + brpc::Controller cntl; + cntl.request_attachment().append(json_value); + cntl.http_request().uri() = cntl_uri; + + // Test update + auto response = process_http_set_value(txn_kv.get(), &cntl); + EXPECT_EQ(response.status_code, 200) << response.msg; + std::stringstream final_json; + final_json << "original_value_hex=" << hex(initial_rowset_meta.SerializeAsString()) << "\n" + << "key_hex=" << hex(initial_key) << "\n" + << "original_value_json=" << proto_to_json(initial_rowset_meta) << "\n"; + // std::cout << "xxx " << final_json.str() << std::endl; + EXPECT_EQ(response.body, final_json.str()); + + // Verify update + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string updated_value; + ASSERT_EQ(txn->get(initial_key, &updated_value), TxnErrorCode::TXN_OK); + + RowsetMetaCloudPB updated_rowset_meta; + ASSERT_TRUE(updated_rowset_meta.ParseFromString(updated_value)); + EXPECT_EQ(updated_rowset_meta.rowset_id_v2(), "12345"); + EXPECT_EQ(updated_rowset_meta.tablet_id(), 67890); + EXPECT_EQ(updated_rowset_meta.num_rows(), 200); + EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048); +} + } // namespace doris::cloud --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org