platoneko commented on code in PR #32738: URL: https://github.com/apache/doris/pull/32738#discussion_r1538715168
########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -166,9 +166,9 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller } RPC_RATE_LIMIT(commit_index) - if (request->index_ids().empty() || !request->has_table_id()) { + if (request->index_ids().empty() || !request->has_db_id() || !request->has_table_id()) { Review Comment: 需要考虑兼容旧版本的 FE,不允许空 db_id 的请求会导致旧 FE 不可用;应该实现成 request 带有 db_id 说明是支持 table version 的 FE,才去写 table version kv。 所以看来,升级到这个版本必须: 1. 升级 MS 集群,可以热更新 2. 等所有 MS 节点升级启动完成,才能去升级 FE 节点;FE 集群必须停服升级才能做到各 FE 请求一致 ########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -514,9 +559,9 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll RPC_RATE_LIMIT(drop_partition) if (request->partition_ids().empty() || request->index_ids().empty() || - !request->has_table_id()) { + !request->has_db_id() || !request->has_table_id()) { Review Comment: ditto ########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -434,9 +458,9 @@ void MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro } RPC_RATE_LIMIT(commit_partition) - if (request->partition_ids().empty() || !request->has_table_id()) { + if (request->partition_ids().empty() || !request->has_db_id() || !request->has_table_id()) { Review Comment: ditto ########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -579,6 +624,27 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } } if (!need_commit) return; + + // update table versions + // Todo: use int64_t instead of protobuf as a value so that we can use atomic_add + std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()}); + std::string ver_val_str; + err = txn->get(ver_key, &ver_val_str); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format( "failed to get table version, table_id {}, key {}", request->table_id(), hex(ver_key)); + return; + } + VersionPB version_pb; + { + version_pb.ParseFromString(ver_val_str); Review Comment: ditto ########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -222,6 +222,30 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller LOG_INFO("remove recycle index").tag("key", hex(key)); txn->remove(key); } + + // init table version + std::string val; Review Comment: 不建议通过查询 version kv 来确定是否需要初始化 table version,多一次 IO ########## gensrc/proto/cloud.proto: ########## @@ -718,6 +718,9 @@ message GetVersionRequest { repeated int64 db_ids = 6; repeated int64 table_ids = 7; repeated int64 partition_ids = 8; + + // True if get table version + optional bool is_table_version = 9 [default = false]; Review Comment: No need to set default to false, optional bool field will be false if not set ########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -222,6 +222,30 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller LOG_INFO("remove recycle index").tag("key", hex(key)); txn->remove(key); } + + // init table version + std::string val; Review Comment: 必须判断是否是 create table 请求,只有 create table 才能初始化 table version kv ########## cloud/src/meta-service/keys.h: ########## @@ -34,6 +34,7 @@ // 0x01 "txn" ${instance_id} "txn_running" ${db_id} ${txn_id} -> TxnRunningPB // // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id} -> VersionPB +// 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id} -> VersionPB Review Comment: value use int64, rather than VersionPB ########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -1009,6 +1009,35 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, response->add_versions(i.second); } + // Save table versions Review Comment: 目前来看 Doris 的写事务不可能会跨 table,所以只需要更新一个 table version kv; version 的更新不用实现成 get v, v += 1 , put v,这样性能很差,实际上只需要调用 atomic_add 即可(因为该 fdb txn 中不关心当前 version 是多少) ########## cloud/src/meta-service/meta_service_partition.cpp: ########## @@ -493,6 +517,27 @@ void MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro LOG_INFO("remove recycle partition").tag("key", hex(key)); txn->remove(key); } + + // update table versions + // Todo: use int64_t instead of protobuf as a value so that we can use atomic_add + std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()}); + std::string ver_val_str; + err = txn->get(ver_key, &ver_val_str); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format( "failed to get table version, table_id {}, key {}", request->table_id(), hex(ver_key)); + return; + } + VersionPB version_pb; + { + version_pb.ParseFromString(ver_val_str); + int version = version_pb.version(); + version_pb.set_version(version + 1); Review Comment: Use int64 as table version value rather than `VersionPB` and call `Transaction::atomic_add` ########## cloud/src/recycler/recycler.cpp: ########## @@ -872,38 +871,29 @@ int InstanceRecycler::recycle_versions() { int num_scanned = 0; int num_recycled = 0; - LOG_INFO("begin to recycle partition versions").tag("instance_id", instance_id_); + LOG_INFO("begin to recycle table and partition versions").tag("instance_id", instance_id_); auto start_time = steady_clock::now(); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { auto cost = duration<float>(steady_clock::now() - start_time).count(); - LOG_INFO("recycle partition versions finished, cost={}s", cost) + LOG_INFO("recycle table and partition versions finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) .tag("num_recycled", num_recycled); }); - auto version_key_begin = version_key({instance_id_, 0, 0, 0}); - auto version_key_end = version_key({instance_id_, INT64_MAX, 0, 0}); - int64_t last_scanned_table_id = 0; - bool is_recycled = false; // Is last scanned kv recycled - auto recycle_func = [&num_scanned, &num_recycled, &last_scanned_table_id, &is_recycled, this]( - std::string_view k, std::string_view) { + auto version_key_begin = table_version_key({instance_id_, 0, 0}); + auto version_key_end = table_version_key({instance_id_, INT64_MAX, 0}); + auto recycle_func = [&num_scanned, &num_recycled, this](std::string_view k, std::string_view) { Review Comment: 这样改对已经支持 table version 的表是正确的,但是对于存量的表就无法回收 partition version kv 了 ########## fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java: ########## @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.qe; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.Config; +import org.apache.doris.rpc.RpcException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SnapshotProxy { + private static final Logger LOG = LogManager.getLogger(SnapshotProxy.class); + + public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException { + int tryTimes = 0; + while (tryTimes++ < Config.meta_service_rpc_retry_times) { + Cloud.GetVersionResponse resp = getVisibleVersionAsync(request, + Config.default_get_version_from_ms_timeout_second * 1000); + if (resp.hasStatus() && (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK + || resp.getStatus().getCode() == Cloud.MetaServiceCode.VERSION_NOT_FOUND)) { + if (LOG.isDebugEnabled()) { + LOG.debug("get version from meta service, code: {}", resp.getStatus().getCode()); + } + return resp; + } + + LOG.warn("get version from meta service failed, status: {}, retry time: {}", + resp.getStatus(), tryTimes); + + // sleep random millis, retry rpc failed + if (tryTimes > Config.meta_service_rpc_retry_times / 2) { + sleepSeveralMs(500, 1000); + } else { + sleepSeveralMs(20, 200); + } + } + + LOG.warn("get version from meta service failed after retry {} times", tryTimes); + throw new RpcException("get version from meta service", "failed after retry n times"); + } + + public static Cloud.GetVersionResponse getVisibleVersionAsync(Cloud.GetVersionRequest request, int timeoutMs) { Review Comment: Async 接口应该返回的是 Future ? ########## fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java: ########## @@ -2666,11 +2671,44 @@ public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersion // During `getNextVersion` and `updateVisibleVersionAndTime` period, // the write lock on the table should be held continuously public long getNextVersion() { - return tableAttributes.getNextVersion(); + if (!Config.isCloudMode()) { Review Comment: Cloud mode 是否有 getNextVersion 的需求?如果没有,但是却执行到这个函数,可能得考虑报错日志 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org