This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d1e66aa4033 [fix](cloud) fix tablet stats for versioned keys (#59193)
d1e66aa4033 is described below
commit d1e66aa4033c08fe332f787d527c0e78fa610daa
Author: walter <[email protected]>
AuthorDate: Tue Dec 23 02:48:52 2025 +0800
[fix](cloud) fix tablet stats for versioned keys (#59193)
---
cloud/src/meta-service/meta_service.cpp | 38 ++++-
cloud/src/meta-service/meta_service.h | 2 +-
cloud/src/meta-service/meta_service_http.cpp | 5 +-
.../src/meta-service/meta_service_tablet_stats.cpp | 174 ++++++++++++++++++++-
cloud/src/meta-service/meta_service_tablet_stats.h | 7 +-
5 files changed, 218 insertions(+), 8 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 7798d7e3bbb..129d1cd4e2d 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -5528,16 +5528,48 @@ std::pair<std::string, std::string>
init_key_pair(std::string instance_id, int64
}
MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string
cloud_unique_id_str,
- std::string
table_id_str) {
+ std::string
table_id_str,
+ std::string
tablet_id_str) {
// parse params
int64_t table_id;
+ int64_t tablet_id = -1;
std::string instance_id;
- MetaServiceResponseStatus st = parse_fix_tablet_stats_param(
- resource_mgr_, table_id_str, cloud_unique_id_str, table_id,
instance_id);
+ MetaServiceResponseStatus st =
+ parse_fix_tablet_stats_param(resource_mgr_, table_id_str,
cloud_unique_id_str,
+ tablet_id_str, table_id, instance_id,
tablet_id);
if (st.code() != MetaServiceCode::OK) {
return st;
}
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ bool is_versioned_write = is_version_write_enabled(instance_id);
+ if (is_versioned_write) {
+ if (tablet_id < 0) {
+ st.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ st.set_msg(
+ "cannot fix tablet stats for all tablets of a table when
versioned write is "
+ "enabled, consider specifying tablet_id");
+ return st;
+ }
+
+ TabletIndexPB tablet_idx;
+ CloneChainReader reader(instance_id, txn_kv_.get(),
resource_mgr_.get());
+ TxnErrorCode err = reader.get_tablet_index(tablet_id, &tablet_idx);
+ if (err != TxnErrorCode::TXN_OK) {
+ st.set_code(cast_as<ErrCategory::READ>(err));
+ st.set_msg(fmt::format("failed to get tablet index for
tablet_id={}, err={}", tablet_id,
+ err));
+ return st;
+ }
+
+ auto&& [code, msg] = fix_versioned_tablet_stats_internal(
+ txn_kv_.get(), instance_id, tablet_idx, is_versioned_read,
is_versioned_write,
+ resource_mgr_.get());
+ st.set_code(code);
+ st.set_msg(std::move(msg));
+ return st;
+ }
+
std::pair<std::string, std::string> key_pair = init_key_pair(instance_id,
table_id);
std::string old_begin_key;
while (old_begin_key < key_pair.first) {
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index e255ffebe3a..44027ed6316 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -368,7 +368,7 @@ public:
InstanceInfoPB*
instance);
MetaServiceResponseStatus fix_tablet_stats(std::string cloud_unique_id_str,
- std::string table_id_str);
+ std::string table_id_str,
std::string tablet_id_str);
std::pair<MetaServiceCode, std::string> fix_tablet_db_id(const
std::string& instance_id,
int64_t
tablet_id, int64_t db_id);
diff --git a/cloud/src/meta-service/meta_service_http.cpp
b/cloud/src/meta-service/meta_service_http.cpp
index 071fb9b5e95..ff2c4ce080d 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -609,9 +609,10 @@ static HttpResponse
process_fix_tablet_stats(MetaServiceImpl* service, brpc::Con
auto& uri = ctrl->http_request().uri();
std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id");
std::string_view table_id = http_query(uri, "table_id");
+ std::string_view tablet_id = http_query(uri, "tablet_id");
- MetaServiceResponseStatus st =
- service->fix_tablet_stats(std::string(cloud_unique_id),
std::string(table_id));
+ MetaServiceResponseStatus st = service->fix_tablet_stats(
+ std::string(cloud_unique_id), std::string(table_id),
std::string(tablet_id));
return http_text_reply(st, st.DebugString());
}
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.cpp
b/cloud/src/meta-service/meta_service_tablet_stats.cpp
index 9543cd724c1..1b73efc2ba1 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.cpp
+++ b/cloud/src/meta-service/meta_service_tablet_stats.cpp
@@ -32,8 +32,10 @@
#include "meta-service/meta_service_helper.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
namespace doris::cloud {
@@ -302,7 +304,8 @@ void internal_get_load_tablet_stats_batch(MetaServiceCode&
code, std::string& ms
MetaServiceResponseStatus parse_fix_tablet_stats_param(
std::shared_ptr<ResourceManager> resource_mgr, const std::string&
table_id_str,
- const std::string& cloud_unique_id_str, int64_t& table_id,
std::string& instance_id) {
+ const std::string& cloud_unique_id_str, const std::string&
tablet_id_str, int64_t& table_id,
+ std::string& instance_id, int64_t& tablet_id) {
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
MetaServiceResponseStatus st;
@@ -317,6 +320,16 @@ MetaServiceResponseStatus parse_fix_tablet_stats_param(
return st;
}
+ if (!tablet_id_str.empty()) {
+ try {
+ tablet_id = std::stoll(tablet_id_str);
+ } catch (...) {
+ st.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ st.set_msg("Invalid tablet_id, tablet_id: " + tablet_id_str);
+ return st;
+ }
+ }
+
instance_id = get_instance_id(resource_mgr, cloud_unique_id_str);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
@@ -636,4 +649,163 @@ MetaServiceResponseStatus check_new_tablet_stats(
return st;
}
+std::pair<MetaServiceCode, std::string> fix_versioned_tablet_stats_internal(
+ TxnKv* txn_kv, const std::string& instance_id, const TabletIndexPB&
tablet_idx,
+ bool is_versioned_read, bool is_versioned_write, ResourceManager*
resource_mgr) {
+ int64_t tablet_id = tablet_idx.tablet_id();
+ std::unique_ptr<Transaction> txn;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to create txn";
+ return {code, msg};
+ }
+
+ TabletStatsPB original_tablet_stat;
+ TabletStatsPB existing_compact_stats;
+ TabletStatsPB existing_load_stats;
+ Versionstamp compact_versionstamp;
+ Versionstamp load_versionstamp;
+ GetRowsetResponse resp;
+
+ CloneChainReader meta_reader(instance_id, resource_mgr);
+ if (is_versioned_read) {
+ // Get existing compact stats
+ err = meta_reader.get_tablet_compact_stats(txn.get(), tablet_id,
&existing_compact_stats,
+ &compact_versionstamp,
true);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get versioned compact stats,
tablet_id={}, err={}",
+ tablet_id, err);
+ return {code, msg};
+ }
+
+ // Get existing load stats
+ err = meta_reader.get_tablet_load_stats(txn.get(), tablet_id,
&existing_load_stats,
+ &load_versionstamp, true);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get versioned load stats,
tablet_id={}, err={}", tablet_id,
+ err);
+ return {code, msg};
+ }
+ MetaReader::merge_tablet_stats(existing_compact_stats,
existing_load_stats,
+ &original_tablet_stat);
+
+ std::vector<RowsetMetaCloudPB> rowset_metas;
+ int64_t start = 0, end = std::numeric_limits<int64_t>::max() - 1;
+ err = meta_reader.get_rowset_metas(txn.get(), tablet_id, start, end,
&rowset_metas);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get versioned rowset, err={},
tablet_id={}", err,
+ tablet_id);
+ return {code, msg};
+ }
+
+ std::move(rowset_metas.begin(), rowset_metas.end(),
+
google::protobuf::RepeatedPtrFieldBackInserter(resp.mutable_rowset_meta()));
+ } else {
+ internal_get_tablet_stats(code, msg, txn.get(), instance_id,
tablet_idx,
+ original_tablet_stat, true);
+ if (code != MetaServiceCode::OK) {
+ return {code, msg};
+ }
+ // get rowsets in tablet and accumulate disk size
+ internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max()
- 1, instance_id,
+ tablet_id, code, msg, &resp);
+ if (code != MetaServiceCode::OK) {
+ return {code, msg};
+ }
+ }
+
+ int64_t table_id = original_tablet_stat.idx().table_id();
+ int64_t index_id = original_tablet_stat.idx().index_id();
+ int64_t partition_id = original_tablet_stat.idx().partition_id();
+
+ int64_t total_disk_size = 0;
+ int64_t index_disk_size = 0;
+ int64_t data_disk_size = 0;
+ for (const auto& rs_meta : resp.rowset_meta()) {
+ total_disk_size += rs_meta.total_disk_size();
+ index_disk_size += rs_meta.index_disk_size();
+ data_disk_size += rs_meta.data_disk_size();
+ }
+
+ // set new disk size to tabletPB and write it back
+ TabletStatsPB tablet_stat;
+ tablet_stat.CopyFrom(original_tablet_stat);
+ tablet_stat.set_data_size(total_disk_size);
+ tablet_stat.set_index_size(index_disk_size);
+ tablet_stat.set_segment_size(data_disk_size);
+
+ // Write single version stats
+ std::string tablet_stat_key;
+ std::string tablet_stat_value;
+ tablet_stat_key = stats_tablet_key({instance_id, table_id, index_id,
partition_id, tablet_id});
+ if (!tablet_stat.SerializeToString(&tablet_stat_value)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = "failed to serialize tablet stat";
+ return {code, msg};
+ }
+ txn->put(tablet_stat_key, tablet_stat_value);
+
+ std::string num_segs_key =
+ stats_tablet_num_segs_key({instance_id, table_id, index_id,
partition_id, tablet_id});
+ std::string num_rows_key =
+ stats_tablet_num_rows_key({instance_id, table_id, index_id,
partition_id, tablet_id});
+ std::string num_rowsets_key = stats_tablet_num_rowsets_key(
+ {instance_id, table_id, index_id, partition_id, tablet_id});
+ std::string data_size_key =
+ stats_tablet_data_size_key({instance_id, table_id, index_id,
partition_id, tablet_id});
+ std::string index_size_key =
+ stats_tablet_index_size_key({instance_id, table_id, index_id,
partition_id, tablet_id});
+ std::string segment_size_key = stats_tablet_segment_size_key(
+ {instance_id, table_id, index_id, partition_id, tablet_id});
+ txn->remove(num_segs_key);
+ txn->remove(num_rows_key);
+ txn->remove(num_rowsets_key);
+ txn->remove(data_size_key);
+ txn->remove(index_size_key);
+ txn->remove(segment_size_key);
+
+ if (is_versioned_write) {
+ // Write compact stats (aggregate stats with accurate disk sizes)
+ std::string compact_stats_key =
+ versioned::tablet_compact_stats_key({instance_id, tablet_id});
+ TabletStatsPB compact_stats = tablet_stat; // Use the fixed stats with
accurate disk sizes
+ versioned_put(txn.get(), compact_stats_key, compact_versionstamp,
tablet_stat_value);
+ LOG(INFO) << "put versioned tablet compact stats key=" <<
hex(compact_stats_key)
+ << " tablet_id=" << tablet_id << " with existing
versionstamp";
+
+ // Write load stats (detached stats, set to 0 since we recalculated
from rowsets)
+ std::string load_stats_key =
versioned::tablet_load_stats_key({instance_id, tablet_id});
+ TabletStatsPB load_stats;
+ load_stats.mutable_idx()->CopyFrom(tablet_stat.idx());
+
+ std::string load_stats_value;
+ if (!load_stats.SerializeToString(&load_stats_value)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = "failed to serialize load stats";
+ return {code, msg};
+ }
+
+ // Overwrite with existing versionstamp
+ versioned_put(txn.get(), load_stats_key, load_versionstamp,
load_stats_value);
+ LOG(INFO) << "put versioned tablet load stats key=" <<
hex(load_stats_key)
+ << " tablet_id=" << tablet_id << " with existing
versionstamp";
+ }
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ msg = "failed to commit txn";
+ return {code, msg};
+ }
+
+ return {MetaServiceCode::OK, ""};
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.h
b/cloud/src/meta-service/meta_service_tablet_stats.h
index 7eb1c616fe3..177f0ab0200 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.h
+++ b/cloud/src/meta-service/meta_service_tablet_stats.h
@@ -104,13 +104,18 @@ void
internal_get_load_tablet_stats_batch(MetaServiceCode& code, std::string& ms
MetaServiceResponseStatus parse_fix_tablet_stats_param(
std::shared_ptr<ResourceManager> resource_mgr, const std::string&
table_id_str,
- const std::string& cloud_unique_id_str, int64_t& table_id,
std::string& instance_id);
+ const std::string& cloud_unique_id_str, const std::string&
tablet_id_str, int64_t& table_id,
+ std::string& instance_id, int64_t& tablet_id);
MetaServiceResponseStatus fix_tablet_stats_internal(
std::shared_ptr<TxnKv> txn_kv, std::pair<std::string, std::string>&
key_pair,
std::vector<std::shared_ptr<TabletStatsPB>>&
tablet_stat_shared_ptr_vec_batch,
const std::string& instance_id, size_t batch_size = 20);
+std::pair<MetaServiceCode, std::string> fix_versioned_tablet_stats_internal(
+ TxnKv* txn_kv, const std::string& instance_id, const TabletIndexPB&
tablet_idx,
+ bool is_versioned_read, bool is_versioned_write, ResourceManager*
resource_mgr);
+
MetaServiceResponseStatus check_new_tablet_stats(
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id,
const std::vector<std::shared_ptr<TabletStatsPB>>&
tablet_stat_shared_ptr_vec_batch);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]