This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c2f621beabe [improve](cloud) get versions in batch_get (#34269) c2f621beabe is described below commit c2f621beabe4b3ed8d7ab54b0aa58db4fab236cb Author: walter <w41te...@gmail.com> AuthorDate: Tue Apr 30 14:14:49 2024 +0800 [improve](cloud) get versions in batch_get (#34269) --- cloud/src/meta-service/meta_service.cpp | 78 ++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 6cc37f78857..1c115f4e795 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -318,7 +318,12 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr response->mutable_table_ids()->CopyFrom(request->table_ids()); response->mutable_partition_ids()->CopyFrom(request->partition_ids()); - while (code == MetaServiceCode::OK && + constexpr size_t BATCH_SIZE = 500; + std::vector<std::string> version_keys; + std::vector<std::optional<std::string>> version_values; + version_keys.reserve(BATCH_SIZE); + version_values.reserve(BATCH_SIZE); + while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) && response->versions_size() < response->partition_ids_size()) { std::unique_ptr<Transaction> txn; TxnErrorCode err = txn_kv_->create_txn(&txn); @@ -327,48 +332,59 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr code = cast_as<ErrCategory::CREATE>(err); break; } - for (size_t i = response->versions_size(); i < num_acquired; ++i) { - int64_t db_id = request->db_ids(i); - int64_t table_id = request->table_ids(i); - int64_t partition_id = request->partition_ids(i); - std::string ver_key; - if (is_table_version) { - table_version_key({instance_id, db_id, table_id}, &ver_key); - } else { - partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); + + for (size_t i = response->versions_size(); i < num_acquired; i += BATCH_SIZE) { + size_t limit = (i + BATCH_SIZE < num_acquired) ? i + BATCH_SIZE : num_acquired; + version_keys.clear(); + version_values.clear(); + for (size_t j = i; j < limit; j++) { + int64_t db_id = request->db_ids(j); + int64_t table_id = request->table_ids(j); + int64_t partition_id = request->partition_ids(j); + std::string ver_key; + if (is_table_version) { + table_version_key({instance_id, db_id, table_id}, &ver_key); + } else { + partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); + } + version_keys.push_back(std::move(ver_key)); } - // TODO(walter) support batch get. - std::string ver_val; - err = txn->get(ver_key, &ver_val, true); + err = txn->batch_get(&version_values, version_keys); TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err); - VLOG_DEBUG << "xxx get version_key=" << hex(ver_key); - if (err == TxnErrorCode::TXN_OK) { - if (is_table_version) { - int64_t version = *reinterpret_cast<const int64_t*>(ver_val.data()); + if (err == TxnErrorCode::TXN_TOO_OLD) { + // txn too old, fallback to non-snapshot versions. + LOG(WARNING) << "batch_get_version execution time exceeds the txn mvcc window, " + "fallback to acquire non-snapshot versions, partition_ids_size=" + << request->partition_ids_size() << ", index=" << i; + break; + } else if (err != TxnErrorCode::TXN_OK) { + msg = fmt::format("failed to batch get versions, index={}, err={}", i, err); + code = cast_as<ErrCategory::READ>(err); + break; + } + + for (auto&& value : version_values) { + if (!value.has_value()) { + // return -1 if the target version is not exists. + response->add_versions(-1); + } else if (is_table_version) { + if (value->size() != sizeof(int64_t)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "malformed table version value"; + break; + } + int64_t version = *reinterpret_cast<const int64_t*>(value->data()); response->add_versions(version); } else { VersionPB version_pb; - if (!version_pb.ParseFromString(ver_val)) { + if (!version_pb.ParseFromString(*value)) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; msg = "malformed version value"; break; } response->add_versions(version_pb.version()); } - } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - // return -1 if the target version is not exists. - response->add_versions(-1); - } else if (err == TxnErrorCode::TXN_TOO_OLD) { - // txn too old, fallback to non-snapshot versions. - LOG(WARNING) << "batch_get_version execution time exceeds the txn mvcc window, " - "fallback to acquire non-snapshot versions, partition_ids_size=" - << request->partition_ids_size() << ", index=" << i; - break; - } else { - msg = fmt::format("failed to get txn, err={}", err); - code = cast_as<ErrCategory::READ>(err); - break; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org