This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7630b62e72f branch-3.0: [opt](meta-service) Implement set_value API 
for meta-servce #49052 (#49359)
7630b62e72f is described below

commit 7630b62e72fb00611bc41014fae9c57e997328e4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 24 11:32:02 2025 +0800

    branch-3.0: [opt](meta-service) Implement set_value API for meta-servce 
#49052 (#49359)
    
    Cherry-picked from #49052
    
    Co-authored-by: Gavin Chou <ga...@selectdb.com>
---
 cloud/src/common/util.cpp                    |  24 ++++
 cloud/src/common/util.h                      |   6 +
 cloud/src/meta-service/http_encode_key.cpp   | 208 ++++++++++++++++++++++-----
 cloud/src/meta-service/meta_service_http.cpp |  16 ++-
 cloud/src/meta-service/meta_service_http.h   |   3 +
 cloud/test/meta_service_http_test.cpp        |  73 ++++++++++
 6 files changed, 289 insertions(+), 41 deletions(-)

diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp
index 8d1c8fed983..50f29afb0ba 100644
--- a/cloud/src/common/util.cpp
+++ b/cloud/src/common/util.cpp
@@ -247,6 +247,30 @@ bool ValueBuf::to_pb(google::protobuf::Message* pb) const {
     return pb->ParseFromZeroCopyStream(&merge_stream);
 }
 
+std::string ValueBuf::value() const {
+    butil::IOBuf merge;
+    for (auto&& it : iters) {
+        it->reset();
+        while (it->has_next()) {
+            auto [k, v] = it->next();
+            merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
+        }
+    }
+    return merge.to_string();
+}
+
+std::vector<std::string> ValueBuf::keys() const {
+    std::vector<std::string> ret;
+    for (auto&& it : iters) {
+        it->reset();
+        while (it->has_next()) {
+            auto [k, _] = it->next();
+            ret.push_back({k.data(), k.size()});
+        }
+    }
+    return ret;
+}
+
 void ValueBuf::remove(Transaction* txn) const {
     for (auto&& it : iters) {
         it->reset();
diff --git a/cloud/src/common/util.h b/cloud/src/common/util.h
index de37c2f4d9b..8f2e8aa077e 100644
--- a/cloud/src/common/util.h
+++ b/cloud/src/common/util.h
@@ -89,6 +89,12 @@ struct ValueBuf {
     // Return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not 
found, otherwise for error.
     TxnErrorCode get(Transaction* txn, std::string_view key, bool snapshot = 
false);
 
+    // return the merged value in ValueBuf
+    std::string value() const;
+
+    // return all keys in ValueBuf, if the value is not splitted, size of keys 
is 1
+    std::vector<std::string> keys() const;
+
     std::vector<std::unique_ptr<RangeGetIterator>> iters;
     int8_t ver {-1};
 };
diff --git a/cloud/src/meta-service/http_encode_key.cpp 
b/cloud/src/meta-service/http_encode_key.cpp
index 4d05f0121b0..728b52df2d8 100644
--- a/cloud/src/meta-service/http_encode_key.cpp
+++ b/cloud/src/meta-service/http_encode_key.cpp
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <brpc/controller.h>
 #include <brpc/uri.h>
 #include <fmt/format.h>
 #include <gen_cpp/cloud.pb.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
 
 #include <bit>
 #include <iomanip>
@@ -29,6 +32,7 @@
 #include <utility>
 #include <vector>
 
+#include "common/logging.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/codec.h"
@@ -80,6 +84,21 @@ struct KeyInfoSetter {
 };
 // clang-format on
 
+template <typename Message>
+static std::shared_ptr<Message> parse_json(const std::string& json) {
+    static_assert(std::is_base_of_v<google::protobuf::Message, Message>);
+    auto ret = std::make_shared<Message>();
+    auto st = google::protobuf::util::JsonStringToMessage(json, ret.get());
+    if (st.ok()) return ret;
+    std::string err = "failed to strictly parse json message, error: " + 
st.ToString();
+    LOG_WARNING(err).tag("json", json);
+    // ignore unknown fields
+    // google::protobuf::util::JsonParseOptions json_parse_options;
+    // json_parse_options.ignore_unknown_fields = true;
+    // return google::protobuf::util::JsonStringToMessage(body, req, 
json_parse_options);
+    return nullptr;
+}
+
 using param_type = const std::vector<const std::string*>;
 
 template <class ProtoType>
@@ -177,41 +196,45 @@ static std::string parse_tablet_stats(const ValueBuf& 
buf) {
 }
 
 // See keys.h to get all types of key, e.g: MetaRowsetKeyInfo
-// key_type -> {{param1, param2 ...}, key_encoding_func, value_parsing_func}
-// where params are the input for key_encoding_func
+// key_type -> {{param_name1, param_name2 ...}, key_encoding_func, 
serialized_pb_to_json_parsing_func, json_to_proto_parsing_func}
+// where param names are the input for key_encoding_func
 // clang-format off
+//                        key_type
 static std::unordered_map<std::string_view,
-                          std::tuple<std::vector<std::string_view>, 
std::function<std::string(param_type&)>, std::function<std::string(const 
ValueBuf&)>>> param_set {
-    {"InstanceKey",                {{"instance_id"},                           
                      [](param_type& p) { return 
instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); }                        
              , parse<InstanceInfoPB>}}          ,
-    {"TxnLabelKey",                {{"instance_id", "db_id", "label"},         
                      [](param_type& p) { return 
txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); }                       
              , parse_txn_label}}                ,
-    {"TxnInfoKey",                 {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); }                         
              , parse<TxnInfoPB>}}               ,
-    {"TxnIndexKey",                {{"instance_id", "txn_id"},                 
                      [](param_type& p) { return 
txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); }                       
              , parse<TxnIndexPB>}}              ,
-    {"TxnRunningKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); }                   
              , parse<TxnRunningPB>}}            ,
-    {"PartitionVersionKey",        {{"instance_id", "db_id", "tbl_id", 
"partition_id"},              [](param_type& p) { return 
partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); }       
              , parse<VersionPB>}}               ,
-    {"TableVersionKey",            {{"instance_id", "db_id", "tbl_id"},        
                      [](param_type& p) { return 
table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); }               
              , parse<VersionPB>}}               ,
-    {"MetaRowsetKey",              {{"instance_id", "tablet_id", "version"},   
                      [](param_type& p) { return 
meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); }                   
              , parse<doris::RowsetMetaCloudPB>}}     ,
-    {"MetaRowsetTmpKey",           {{"instance_id", "txn_id", "tablet_id"},    
                      [](param_type& p) { return 
meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); }            
              , parse<doris::RowsetMetaCloudPB>}}     ,
-    {"MetaTabletKey",              {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); }                   
              , parse<doris::TabletMetaCloudPB>}}     ,
-    {"MetaTabletIdxKey",           {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); }            
              , parse<TabletIndexPB>}}           ,
-    {"RecycleIndexKey",            {{"instance_id", "index_id"},               
                      [](param_type& p) { return 
recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); }               
              , parse<RecycleIndexPB>}}          ,
-    {"RecyclePartKey",             {{"instance_id", "part_id"},                
                      [](param_type& p) { return 
recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); }            
              , parse<RecyclePartitionPB>}}      ,
-    {"RecycleRowsetKey",           {{"instance_id", "tablet_id", "rowset_id"}, 
                      [](param_type& p) { return 
recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); }             
              , parse<RecycleRowsetPB>}}         ,
-    {"RecycleTxnKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); }                   
              , parse<RecycleTxnPB>}}            ,
-    {"StatsTabletKey",             {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); }                 
              , parse_tablet_stats}}           ,
-    {"JobTabletKey",               {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); }                     
              , parse<TabletJobInfoPB>}}         ,
-    {"CopyJobKey",                 {{"instance_id", "stage_id", "table_id", 
"copy_id", "group_id"},  [](param_type& p) { return 
copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); }                         
              , parse<CopyJobPB>}}               ,
-    {"CopyFileKey",                {{"instance_id", "stage_id", "table_id", 
"obj_key", "obj_etag"},  [](param_type& p) { return 
copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); }                       
              , parse<CopyFilePB>}}              ,
-    {"RecycleStageKey",            {{"instance_id", "stage_id"},               
                      [](param_type& p) { return 
recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); }               
              , parse<RecycleStagePB>}}          ,
-    {"JobRecycleKey",              {{"instance_id"},                           
                      [](param_type& p) { return 
job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); }                     
              , parse<JobRecyclePB>}}            ,
-    {"MetaSchemaKey",              {{"instance_id", "index_id", 
"schema_version"},                   [](param_type& p) { return 
meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); }                   
              , parse_tablet_schema}}            ,
-    {"MetaDeleteBitmap",           {{"instance_id", "tablet_id", "rowest_id", 
"version", "seg_id"},  [](param_type& p) { return 
meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); }         
              , parse_delete_bitmap}}            ,
-    {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", 
"partition_id"},                     [](param_type& p) { return 
meta_delete_bitmap_update_lock_key( 
KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); }, 
parse<DeleteBitmapUpdateLockPB>}},
-    {"MetaPendingDeleteBitmap",    {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_pending_delete_bitmap_key( 
KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); }       , 
parse<PendingDeleteBitmapPB>}}   ,
-    {"RLJobProgressKey",           {{"instance_id", "db_id", "job_id"},        
                      [](param_type& p) { return rl_job_progress_key_info( 
KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); }                    , 
parse<RoutineLoadProgressPB>}}   ,
-    {"MetaServiceRegistryKey",     {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_registry_key(); }                                           
              , parse<ServiceRegistryPB>}}       ,
-    {"MetaServiceArnInfoKey",      {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_arn_info_key(); }                                           
              , parse<RamUserPB>}}               ,
-    {"MetaServiceEncryptionKey",   {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_encryption_key_info_key(); }                                
              , parse<EncryptionKeyInfoPB>}}     ,
-    {"StorageVaultKey",           {{"instance_id", "vault_id"},                
              [](param_type& p) { return 
storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); }               
     , parse<StorageVaultPB>}}   ,};
+                                     // params                      
key_encoding_func                        serialized_pb_to_json_parsing_func     
      json_to_proto_parsing_func
+                          std::tuple<std::vector<std::string_view>, 
std::function<std::string(param_type&)>, std::function<std::string(const 
ValueBuf&)>, std::function<std::shared_ptr<google::protobuf::Message>(const 
std::string&)>>> param_set {
+    {"InstanceKey",                {{"instance_id"},                           
                      [](param_type& p) { return 
instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get());                          
            }, parse<InstanceInfoPB>           , parse_json<InstanceInfoPB>}},
+    {"TxnLabelKey",                {{"instance_id", "db_id", "label"},         
                      [](param_type& p) { return 
txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get());                         
            }, parse_txn_label                 , parse_json<TxnLabelPB>}},
+    {"TxnInfoKey",                 {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get());                           
            }, parse<TxnInfoPB>                , parse_json<TxnInfoPB>}},
+    {"TxnIndexKey",                {{"instance_id", "txn_id"},                 
                      [](param_type& p) { return 
txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get());                         
            }, parse<TxnIndexPB>               , parse_json<TxnIndexPB>}},
+    {"TxnRunningKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get());                     
            }, parse<TxnRunningPB>             , parse_json<TxnRunningPB>}},
+    {"PartitionVersionKey",        {{"instance_id", "db_id", "tbl_id", 
"partition_id"},              [](param_type& p) { return 
partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get());         
            }, parse<VersionPB>                , parse_json<VersionPB>}},
+    {"TableVersionKey",            {{"instance_id", "db_id", "tbl_id"},        
                      [](param_type& p) { return 
table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get());                 
            }, parse<VersionPB>                , parse_json<VersionPB>}},
+    {"MetaRowsetKey",              {{"instance_id", "tablet_id", "version"},   
                      [](param_type& p) { return 
meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get());                     
            }, parse<doris::RowsetMetaCloudPB> , 
parse_json<doris::RowsetMetaCloudPB>}},
+    {"MetaRowsetTmpKey",           {{"instance_id", "txn_id", "tablet_id"},    
                      [](param_type& p) { return 
meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get());              
            }, parse<doris::RowsetMetaCloudPB> , 
parse_json<doris::RowsetMetaCloudPB>}},
+    {"MetaTabletKey",              {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get());                     
            }, parse<doris::TabletMetaCloudPB> , 
parse_json<doris::TabletMetaCloudPB>}},
+    {"MetaTabletIdxKey",           {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get());              
            }, parse<TabletIndexPB>            , parse_json<TabletIndexPB>}},
+    {"RecycleIndexKey",            {{"instance_id", "index_id"},               
                      [](param_type& p) { return 
recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get());                 
            }, parse<RecycleIndexPB>           , parse_json<RecycleIndexPB>}},
+    {"RecyclePartKey",             {{"instance_id", "part_id"},                
                      [](param_type& p) { return 
recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get());              
            }, parse<RecyclePartitionPB>       , 
parse_json<RecyclePartitionPB>}},
+    {"RecycleRowsetKey",           {{"instance_id", "tablet_id", "rowset_id"}, 
                      [](param_type& p) { return 
recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get());               
            }, parse<RecycleRowsetPB>          , parse_json<RecycleRowsetPB>}},
+    {"RecycleTxnKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get());                     
            }, parse<RecycleTxnPB>             , parse_json<RecycleTxnPB>}},
+    {"StatsTabletKey",             {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get());                   
            }, parse_tablet_stats              , parse_json<TabletStatsPB>}},
+    {"JobTabletKey",               {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get());                       
            }, parse<TabletJobInfoPB>          , parse_json<TabletJobInfoPB>}},
+    {"CopyJobKey",                 {{"instance_id", "stage_id", "table_id", 
"copy_id", "group_id"},  [](param_type& p) { return 
copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get());                           
            }, parse<CopyJobPB>                , parse_json<CopyJobPB>}},
+    {"CopyFileKey",                {{"instance_id", "stage_id", "table_id", 
"obj_key", "obj_etag"},  [](param_type& p) { return 
copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get());                         
            }, parse<CopyFilePB>               , parse_json<CopyFilePB>}},
+    {"RecycleStageKey",            {{"instance_id", "stage_id"},               
                      [](param_type& p) { return 
recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get());                 
            }, parse<RecycleStagePB>           , parse_json<RecycleStagePB>}},
+    {"JobRecycleKey",              {{"instance_id"},                           
                      [](param_type& p) { return 
job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get());                       
            }, parse<JobRecyclePB>             , parse_json<JobRecyclePB>}},
+    {"MetaSchemaKey",              {{"instance_id", "index_id", 
"schema_version"},                   [](param_type& p) { return 
meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get());                     
            }, parse_tablet_schema             , 
parse_json<doris::TabletSchemaCloudPB>}},
+    {"MetaDeleteBitmap",           {{"instance_id", "tablet_id", "rowest_id", 
"version", "seg_id"},  [](param_type& p) { return 
meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get());           
            }, parse_delete_bitmap             , parse_json<DeleteBitmapPB>}},
+    {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", 
"partition_id"},                     [](param_type& p) { return 
meta_delete_bitmap_update_lock_key(KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get());
 }, parse<DeleteBitmapUpdateLockPB> , parse_json<DeleteBitmapUpdateLockPB>}},
+    {"MetaPendingDeleteBitmap",    {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_pending_delete_bitmap_key(KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get());
        }, parse<PendingDeleteBitmapPB>    , 
parse_json<PendingDeleteBitmapPB>}},
+    {"RLJobProgressKey",           {{"instance_id", "db_id", "job_id"},        
                      [](param_type& p) { return 
rl_job_progress_key_info(KeyInfoSetter<RLJobProgressKeyInfo>{p}.get());         
            }, parse<RoutineLoadProgressPB>    , 
parse_json<RoutineLoadProgressPB>}},
+    {"StorageVaultKey",            {{"instance_id", "vault_id"},               
                      [](param_type& p) { return 
storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get());                 
            }, parse<StorageVaultPB>           , parse_json<StorageVaultPB>}},
+    {"MetaSchemaPBDictionaryKey",  {{"instance_id", "index_id"},               
                      [](param_type& p) { return 
meta_schema_pb_dictionary_key(KeyInfoSetter<MetaSchemaPBDictionaryInfo>{p}.get());
          }, parse<SchemaCloudDictionary>    , 
parse_json<SchemaCloudDictionary>}},
+    {"MetaServiceRegistryKey",     {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_registry_key();                                             
            }, parse<ServiceRegistryPB>        , 
parse_json<ServiceRegistryPB>}},
+    {"MetaServiceArnInfoKey",      {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_arn_info_key();                                             
            }, parse<RamUserPB>                , parse_json<RamUserPB>}},
+    {"MetaServiceEncryptionKey",   {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_encryption_key_info_key();                                  
            }, parse<EncryptionKeyInfoPB>      , 
parse_json<EncryptionKeyInfoPB>}},
+};
 // clang-format on
 
 static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& 
key) {
@@ -225,9 +248,10 @@ static MetaServiceResponseStatus encode_key(const 
brpc::URI& uri, std::string& k
                                    (key_type.empty() ? "(empty)" : key_type)));
         return status;
     }
+    auto& key_params = std::get<0>(it->second);
     std::remove_cv_t<param_type> params;
-    params.reserve(std::get<0>(it->second).size());
-    for (auto& i : std::get<0>(it->second)) {
+    params.reserve(key_params.size());
+    for (auto& i : key_params) {
         auto p = uri.GetQuery(i.data());
         if (p == nullptr || p->empty()) {
             status.set_code(MetaServiceCode::INVALID_ARGUMENT);
@@ -236,7 +260,8 @@ static MetaServiceResponseStatus encode_key(const 
brpc::URI& uri, std::string& k
         }
         params.emplace_back(p);
     }
-    key = std::get<1>(it->second)(params);
+    auto& key_encoding_function = std::get<1>(it->second);
+    key = key_encoding_function(params);
     return status;
 }
 
@@ -292,7 +317,8 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const 
brpc::URI& uri) {
         return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
                                fmt::format("failed to get kv, key={}", 
hex(key)));
     }
-    auto readable_value = std::get<2>(it->second)(value);
+    auto& value_parsing_function = std::get<2>(it->second);
+    auto readable_value = value_parsing_function(value);
     if (readable_value.empty()) [[unlikely]] {
         return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR,
                                fmt::format("failed to parse value, key={}", 
hex(key)));
@@ -300,6 +326,114 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const 
brpc::URI& uri) {
     return http_text_reply(MetaServiceCode::OK, "", readable_value);
 }
 
+HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* cntl) {
+    const brpc::URI& uri = cntl->http_request().uri();
+    std::string body = cntl->request_attachment().to_string();
+    LOG(INFO) << "set value, body=" << body;
+
+    std::string key;
+    if (auto hex_key = http_query(uri, "key"); !hex_key.empty()) {
+        key = unhex(hex_key);
+    } else { // Encode key from params
+        auto st = encode_key(uri, key);
+        if (st.code() != MetaServiceCode::OK) {
+            return http_json_reply(st);
+        }
+    }
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) [[unlikely]] {
+        return http_json_reply(MetaServiceCode::KV_TXN_CREATE_ERR,
+                               fmt::format("failed to create txn, err={}", 
err));
+    }
+
+    std::string_view key_type = http_query(uri, "key_type");
+    auto it = param_set.find(key_type);
+    if (it == param_set.end()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                               fmt::format("key_type not supported: {}",
+                                           (key_type.empty() ? "(empty)" : 
key_type)));
+    }
+    auto& json_parsing_function = std::get<3>(it->second);
+    std::shared_ptr<google::protobuf::Message> pb_to_save = 
json_parsing_function(body);
+    if (pb_to_save == nullptr) {
+        LOG(WARNING) << "invalid input json value for key_type=" << key_type;
+        return http_json_reply(
+                MetaServiceCode::INVALID_ARGUMENT,
+                fmt::format("invalid input json value, cannot parse json to 
pb, key_type={}",
+                            key_type));
+    }
+
+    LOG(INFO) << "parsed pb to save key_type=" << key_type << " key=" << 
hex(key)
+              << " pb_to_save=" << proto_to_json(*pb_to_save);
+
+    // ATTN:
+    // StatsTabletKey is special, it has a series of keys, we only handle the 
base stat key
+    // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are 
splited in to multiple KV
+    ValueBuf value;
+    err = cloud::get(txn.get(), key, &value, true);
+
+    bool kv_found = true;
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        // it is possible the key-value to set is non-existed
+        kv_found = false;
+    } else if (err != TxnErrorCode::TXN_OK) {
+        return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
+                               fmt::format("failed to get kv, key={}", 
hex(key)));
+    }
+
+    auto concat = [](const std::vector<std::string>& keys) {
+        std::stringstream s;
+        for (auto& i : keys) s << hex(i) << ", ";
+        return s.str();
+    };
+    LOG(INFO) << "set value, key_type=" << key_type << " " << 
value.keys().size() << " keys=["
+              << concat(value.keys()) << "]";
+
+    std::string original_value_json;
+    if (kv_found) {
+        auto& serialized_value_to_json_parsing_function = 
std::get<2>(it->second);
+        original_value_json = serialized_value_to_json_parsing_function(value);
+        if (original_value_json.empty()) [[unlikely]] {
+            LOG(WARNING) << "failed to parse value, key=" << hex(key)
+                         << " val=" << hex(value.value());
+            return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+                                   fmt::format("failed to parse value, 
key={}", hex(key)));
+        } else {
+            LOG(INFO) << "original_value_json=" << original_value_json;
+        }
+    }
+    std::string serialized_value_to_save = pb_to_save->SerializeAsString();
+    if (serialized_value_to_save.empty()) {
+        LOG(WARNING) << "failed to serialize, key=" << hex(key);
+        return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+                               fmt::format("failed to serialize, key={}", 
hex(key)));
+    }
+    // we need to remove the original KVs, it may be a range of keys
+    // and the number of final keys may be less than the initial number of keys
+    if (kv_found) value.remove(txn.get());
+
+    // TODO(gavin): use cloud::put() to deal with split-multi-kv and special 
keys
+    // StatsTabletKey is special, it has a series of keys, we only handle the 
base stat key
+    // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are 
splited in to multiple KV
+    txn->put(key, serialized_value_to_save);
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        std::stringstream ss;
+        ss << "failed to commit txn when set value, err=" << err << " key=" << 
hex(key);
+        LOG(WARNING) << ss.str();
+        return http_json_reply(MetaServiceCode::UNDEFINED_ERR, ss.str());
+    }
+    LOG(WARNING) << "set_value saved, key=" << hex(key);
+
+    std::stringstream final_json;
+    final_json << "original_value_hex=" << hex(value.value()) << "\n"
+               << "key_hex=" << hex(key) << "\n"
+               << "original_value_json=" << original_value_json << "\n";
+
+    return http_text_reply(MetaServiceCode::OK, "", final_json.str());
+}
+
 HttpResponse process_http_encode_key(const brpc::URI& uri) {
     std::string key;
     auto st = encode_key(uri, key);
diff --git a/cloud/src/meta-service/meta_service_http.cpp 
b/cloud/src/meta-service/meta_service_http.cpp
index c6eb07010c4..53d9dbbef83 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -158,7 +158,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const 
std::string& msg,
     return {status_code, msg, sb.GetString()};
 }
 
-static std::string format_http_request(const brpc::HttpHeader& request) {
+static std::string format_http_request(brpc::Controller* cntl) {
+    const brpc::HttpHeader& request = cntl->http_request();
     auto& unresolved_path = request.unresolved_path();
     auto& uri = request.uri();
     std::stringstream ss;
@@ -173,6 +174,8 @@ static std::string format_http_request(const 
brpc::HttpHeader& request) {
     for (auto it = request.HeaderBegin(); it != request.HeaderEnd(); ++it) {
         ss << "\n" << it->first << ":" << it->second;
     }
+    std::string body = cntl->request_attachment().to_string();
+    ss << "\nbody=" << (body.empty() ? "(empty)" : body);
     return ss.str();
 }
 
@@ -509,6 +512,10 @@ static HttpResponse process_get_value(MetaServiceImpl* 
service, brpc::Controller
     return process_http_get_value(service->txn_kv().get(), 
ctrl->http_request().uri());
 }
 
+static HttpResponse process_set_value(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
+    return process_http_set_value(service->txn_kv().get(), ctrl);
+}
+
 // show all key ranges and their count.
 static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
     auto txn_kv = std::dynamic_pointer_cast<FdbTxnKv>(service->txn_kv());
@@ -737,6 +744,7 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"decode_key", process_decode_key},
             {"encode_key", process_encode_key},
             {"get_value", process_get_value},
+            {"set_value", process_set_value},
             {"show_meta_ranges", process_show_meta_ranges},
             {"txn_lazy_commit", process_txn_lazy_commit},
             {"fix_tablet_stats", process_fix_tablet_stats},
@@ -744,11 +752,11 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"v1/decode_key", process_decode_key},
             {"v1/encode_key", process_encode_key},
             {"v1/get_value", process_get_value},
+            {"v1/set_value", process_set_value},
             {"v1/show_meta_ranges", process_show_meta_ranges},
             {"v1/txn_lazy_commit", process_txn_lazy_commit},
             {"v1/injection_point", process_injection_point},
-            // for get
-            {"get_instance", process_get_instance_info},
+            {"v1/fix_tablet_stats", process_fix_tablet_stats},
             // for get
             {"get_instance", process_get_instance_info},
             {"get_obj_store_info", process_get_obj_store_info},
@@ -785,7 +793,7 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
     // Prepare input request info
     LOG(INFO) << "rpc from " << cntl->remote_side()
               << " request: " << cntl->http_request().uri().path();
-    std::string http_request = format_http_request(cntl->http_request());
+    std::string http_request = format_http_request(cntl);
 
     // Auth
     auto token = http_query(cntl->http_request().uri(), "token");
diff --git a/cloud/src/meta-service/meta_service_http.h 
b/cloud/src/meta-service/meta_service_http.h
index ead53f0630f..1dca1d3d64d 100644
--- a/cloud/src/meta-service/meta_service_http.h
+++ b/cloud/src/meta-service/meta_service_http.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <brpc/controller.h>
 #include <brpc/uri.h>
 #include <gen_cpp/cloud.pb.h>
 
@@ -41,6 +42,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const 
std::string& msg,
 
 HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri);
 
+HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* ctrl);
+
 HttpResponse process_http_encode_key(const brpc::URI& uri);
 
 /// Return the query value or an empty string if not exists.
diff --git a/cloud/test/meta_service_http_test.cpp 
b/cloud/test/meta_service_http_test.cpp
index 3afb2f66bf6..e51e6fa819b 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "meta-service/meta_service_http.h"
+
 #include <brpc/channel.h>
 #include <brpc/controller.h>
 #include <brpc/server.h>
@@ -1761,4 +1763,75 @@ TEST(MetaServiceHttpTest, UpdateConfig) {
     }
 }
 
+TEST(HttpEncodeKeyTest, ProcessHttpSetValue) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    // Create and serialize initial RowsetMeta
+    RowsetMetaCloudPB initial_rowset_meta;
+    initial_rowset_meta.set_rowset_id_v2("12345");
+    initial_rowset_meta.set_rowset_id(0);
+    initial_rowset_meta.set_tablet_id(67890);
+    initial_rowset_meta.set_num_rows(100);
+    initial_rowset_meta.set_data_disk_size(1024);
+    std::string serialized_initial = initial_rowset_meta.SerializeAsString();
+
+    // Generate proper rowset meta key
+    std::string instance_id = "test_instance";
+    int64_t tablet_id = 67890;
+    int64_t version = 10086;
+
+    // Generate proper rowset meta key
+    MetaRowsetKeyInfo key_info {instance_id, tablet_id, version};
+    std::string initial_key = meta_rowset_key(key_info);
+
+    // Store initial RowsetMeta in TxnKv
+    txn->put(initial_key, serialized_initial);
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    // Create new RowsetMeta to update
+    RowsetMetaCloudPB new_rowset_meta;
+    new_rowset_meta.set_rowset_id_v2("12345");
+    new_rowset_meta.set_rowset_id(0);
+    new_rowset_meta.set_tablet_id(67890);
+    new_rowset_meta.set_num_rows(200);        // Updated row count
+    new_rowset_meta.set_data_disk_size(2048); // Updated size
+    std::string json_value = proto_to_json(new_rowset_meta);
+
+    // Initialize cntl URI with required parameters
+    brpc::URI cntl_uri;
+    cntl_uri._path = "/meta-service/http/set_value";
+    cntl_uri.SetQuery("key_type", "MetaRowsetKey");
+    cntl_uri.SetQuery("instance_id", instance_id);
+    cntl_uri.SetQuery("tablet_id", std::to_string(tablet_id));
+    cntl_uri.SetQuery("version", std::to_string(version));
+
+    brpc::Controller cntl;
+    cntl.request_attachment().append(json_value);
+    cntl.http_request().uri() = cntl_uri;
+
+    // Test update
+    auto response = process_http_set_value(txn_kv.get(), &cntl);
+    EXPECT_EQ(response.status_code, 200) << response.msg;
+    std::stringstream final_json;
+    final_json << "original_value_hex=" << 
hex(initial_rowset_meta.SerializeAsString()) << "\n"
+               << "key_hex=" << hex(initial_key) << "\n"
+               << "original_value_json=" << proto_to_json(initial_rowset_meta) 
<< "\n";
+    // std::cout << "xxx " << final_json.str() << std::endl;
+    EXPECT_EQ(response.body, final_json.str());
+
+    // Verify update
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string updated_value;
+    ASSERT_EQ(txn->get(initial_key, &updated_value), TxnErrorCode::TXN_OK);
+
+    RowsetMetaCloudPB updated_rowset_meta;
+    ASSERT_TRUE(updated_rowset_meta.ParseFromString(updated_value));
+    EXPECT_EQ(updated_rowset_meta.rowset_id_v2(), "12345");
+    EXPECT_EQ(updated_rowset_meta.tablet_id(), 67890);
+    EXPECT_EQ(updated_rowset_meta.num_rows(), 200);
+    EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048);
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to