This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c2f621beabe [improve](cloud) get versions in batch_get (#34269)
c2f621beabe is described below

commit c2f621beabe4b3ed8d7ab54b0aa58db4fab236cb
Author: walter <w41te...@gmail.com>
AuthorDate: Tue Apr 30 14:14:49 2024 +0800

    [improve](cloud) get versions in batch_get (#34269)
---
 cloud/src/meta-service/meta_service.cpp | 78 ++++++++++++++++++++-------------
 1 file changed, 47 insertions(+), 31 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 6cc37f78857..1c115f4e795 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -318,7 +318,12 @@ void 
MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
     response->mutable_table_ids()->CopyFrom(request->table_ids());
     response->mutable_partition_ids()->CopyFrom(request->partition_ids());
 
-    while (code == MetaServiceCode::OK &&
+    constexpr size_t BATCH_SIZE = 500;
+    std::vector<std::string> version_keys;
+    std::vector<std::optional<std::string>> version_values;
+    version_keys.reserve(BATCH_SIZE);
+    version_values.reserve(BATCH_SIZE);
+    while ((code == MetaServiceCode::OK || code == 
MetaServiceCode::KV_TXN_TOO_OLD) &&
            response->versions_size() < response->partition_ids_size()) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv_->create_txn(&txn);
@@ -327,48 +332,59 @@ void 
MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
             code = cast_as<ErrCategory::CREATE>(err);
             break;
         }
-        for (size_t i = response->versions_size(); i < num_acquired; ++i) {
-            int64_t db_id = request->db_ids(i);
-            int64_t table_id = request->table_ids(i);
-            int64_t partition_id = request->partition_ids(i);
-            std::string ver_key;
-            if (is_table_version) {
-                table_version_key({instance_id, db_id, table_id}, &ver_key);
-            } else {
-                partition_version_key({instance_id, db_id, table_id, 
partition_id}, &ver_key);
+
+        for (size_t i = response->versions_size(); i < num_acquired; i += 
BATCH_SIZE) {
+            size_t limit = (i + BATCH_SIZE < num_acquired) ? i + BATCH_SIZE : 
num_acquired;
+            version_keys.clear();
+            version_values.clear();
+            for (size_t j = i; j < limit; j++) {
+                int64_t db_id = request->db_ids(j);
+                int64_t table_id = request->table_ids(j);
+                int64_t partition_id = request->partition_ids(j);
+                std::string ver_key;
+                if (is_table_version) {
+                    table_version_key({instance_id, db_id, table_id}, 
&ver_key);
+                } else {
+                    partition_version_key({instance_id, db_id, table_id, 
partition_id}, &ver_key);
+                }
+                version_keys.push_back(std::move(ver_key));
             }
 
-            // TODO(walter) support batch get.
-            std::string ver_val;
-            err = txn->get(ver_key, &ver_val, true);
+            err = txn->batch_get(&version_values, version_keys);
             TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err);
-            VLOG_DEBUG << "xxx get version_key=" << hex(ver_key);
-            if (err == TxnErrorCode::TXN_OK) {
-                if (is_table_version) {
-                    int64_t version = *reinterpret_cast<const 
int64_t*>(ver_val.data());
+            if (err == TxnErrorCode::TXN_TOO_OLD) {
+                // txn too old, fallback to non-snapshot versions.
+                LOG(WARNING) << "batch_get_version execution time exceeds the 
txn mvcc window, "
+                                "fallback to acquire non-snapshot versions, 
partition_ids_size="
+                             << request->partition_ids_size() << ", index=" << 
i;
+                break;
+            } else if (err != TxnErrorCode::TXN_OK) {
+                msg = fmt::format("failed to batch get versions, index={}, 
err={}", i, err);
+                code = cast_as<ErrCategory::READ>(err);
+                break;
+            }
+
+            for (auto&& value : version_values) {
+                if (!value.has_value()) {
+                    // return -1 if the target version is not exists.
+                    response->add_versions(-1);
+                } else if (is_table_version) {
+                    if (value->size() != sizeof(int64_t)) {
+                        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                        msg = "malformed table version value";
+                        break;
+                    }
+                    int64_t version = *reinterpret_cast<const 
int64_t*>(value->data());
                     response->add_versions(version);
                 } else {
                     VersionPB version_pb;
-                    if (!version_pb.ParseFromString(ver_val)) {
+                    if (!version_pb.ParseFromString(*value)) {
                         code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                         msg = "malformed version value";
                         break;
                     }
                     response->add_versions(version_pb.version());
                 }
-            } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-                // return -1 if the target version is not exists.
-                response->add_versions(-1);
-            } else if (err == TxnErrorCode::TXN_TOO_OLD) {
-                // txn too old, fallback to non-snapshot versions.
-                LOG(WARNING) << "batch_get_version execution time exceeds the 
txn mvcc window, "
-                                "fallback to acquire non-snapshot versions, 
partition_ids_size="
-                             << request->partition_ids_size() << ", index=" << 
i;
-                break;
-            } else {
-                msg = fmt::format("failed to get txn, err={}", err);
-                code = cast_as<ErrCategory::READ>(err);
-                break;
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to