gavinchou commented on code in PR #53681: URL: https://github.com/apache/doris/pull/53681#discussion_r2224993828
########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2460,6 +2621,56 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* tablet_idx.partition_id(), tablet_id}; update_tablet_stats(info, stats); if (code != MetaServiceCode::OK) return; + + if (is_versioned_write) { + std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id}); + + // Try to read existing versioned tablet stats + std::string stats_val; + TabletStatsPB stats_pb; + TxnErrorCode err = txn->get(stats_key, &stats_val); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + // If versioned stats doesn't exist, read from single version + TabletStats detached_stats; + internal_get_tablet_stats(code, msg, txn.get(), instance_id, tablet_idx, stats_pb, + detached_stats, false); + if (code != MetaServiceCode::OK) { + LOG(WARNING) << "failed to get tablet stats for versioned write: " << msg; Review Comment: log with some useful info like txn_id or tablet_id ########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2312,12 +2445,38 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* return; } + bool is_versioned_write = is_version_write_enabled(instance_id); + // Save rowset meta for (auto& i : rowsets) { - size_t rowset_size = i.first.size() + i.second.size(); - txn->put(i.first, i.second); - LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id + auto [tablet_id, version] = i.first; + std::string rowset_key = meta_rowset_key({instance_id, tablet_id, version}); + std::string val; + if (!i.second.SerializeToString(&val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize rowset_meta, txn_id=" << txn_id; + msg = ss.str(); + return; + } + size_t rowset_size = rowset_key.size() + val.size(); + txn->put(rowset_key, val); + LOG(INFO) << "xxx put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id << " rowset_size=" << rowset_size; + + if (is_versioned_write) { + std::string versioned_rowset_key = + versioned::meta_rowset_load_key({instance_id, tablet_id, version}); + if (!versioned::document_put(txn.get(), versioned_rowset_key, std::move(i.second))) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to put versioned rowset meta, txn_id=" << txn_id + << " key=" << hex(versioned_rowset_key); + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + LOG(INFO) << "xxx put versioned rowset meta key=" << hex(versioned_rowset_key) Review Comment: ditto ########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2312,12 +2445,38 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* return; } + bool is_versioned_write = is_version_write_enabled(instance_id); + // Save rowset meta for (auto& i : rowsets) { - size_t rowset_size = i.first.size() + i.second.size(); - txn->put(i.first, i.second); - LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id + auto [tablet_id, version] = i.first; + std::string rowset_key = meta_rowset_key({instance_id, tablet_id, version}); + std::string val; + if (!i.second.SerializeToString(&val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize rowset_meta, txn_id=" << txn_id; + msg = ss.str(); + return; + } + size_t rowset_size = rowset_key.size() + val.size(); + txn->put(rowset_key, val); + LOG(INFO) << "xxx put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id Review Comment: remove "xxx " by the way ########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2460,6 +2621,56 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* tablet_idx.partition_id(), tablet_id}; update_tablet_stats(info, stats); if (code != MetaServiceCode::OK) return; + + if (is_versioned_write) { + std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id}); + + // Try to read existing versioned tablet stats + std::string stats_val; + TabletStatsPB stats_pb; + TxnErrorCode err = txn->get(stats_key, &stats_val); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + // If versioned stats doesn't exist, read from single version + TabletStats detached_stats; + internal_get_tablet_stats(code, msg, txn.get(), instance_id, tablet_idx, stats_pb, + detached_stats, false); + if (code != MetaServiceCode::OK) { + LOG(WARNING) << "failed to get tablet stats for versioned write: " << msg; + return; + } + } else if (err == TxnErrorCode::TXN_OK) { + // Parse existing versioned stats + if (!stats_pb.ParseFromString(stats_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("malformed versioned tablet stats, key={}", hex(stats_key)); + LOG(WARNING) << msg; + return; + } + } else { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get versioned tablet stats, err={}", err); Review Comment: log with some useful info like txn_id or tablet_id ########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2460,6 +2621,56 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* tablet_idx.partition_id(), tablet_id}; update_tablet_stats(info, stats); if (code != MetaServiceCode::OK) return; + + if (is_versioned_write) { Review Comment: better extract/move to a function ########## cloud/src/meta-service/txn_lazy_committer.cpp: ########## @@ -191,6 +274,58 @@ void convert_tmp_rowsets( tablet_idx.partition_id(), tablet_id}; update_tablet_stats(info, stats, txn, code, msg); if (code != MetaServiceCode::OK) return; + + if (is_versioned_write) { Review Comment: consider extract/move to separate function to reduce large for loop ########## cloud/src/meta-service/txn_lazy_committer.cpp: ########## @@ -25,27 +25,104 @@ #include "cpp/sync_point.h" #include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_tablet_stats.h" +#include "meta-store/document_message.h" #include "meta-store/keys.h" +#include "meta-store/versioned_value.h" using namespace std::chrono; namespace doris::cloud { +void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t txn_id, + MetaServiceCode& code, std::string& msg, int64_t* db_id); + void scan_tmp_rowset( const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv, - MetaServiceCode& code, std::string& msg, int64_t* db_id, + MetaServiceCode& code, std::string& msg, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* tmp_rowsets_meta, KVStats* stats); void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats, std::unique_ptr<Transaction>& txn, MetaServiceCode& code, std::string& msg); +// Get the versionstamp from the partition version key. +// +// The partition version key is constructed during the txn commit process, so the versionstamp +// can be retrieved from the key itself. +// +// In order to keep consistency, the pending_txn_id is used to ensure the correct versionstamp +// is returned. The is_partition_version_valid flag indicates whether the above condition is met. +std::pair<MetaServiceCode, std::string> get_partition_versionstamp( + TxnKv* txn_kv, std::string_view instance_id, int64_t txn_id, int64_t partition_id, + Versionstamp* versionstamp, bool* is_partition_version_valid) { + std::string partition_key = versioned::partition_version_key({instance_id, partition_id}); + + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + return {cast_as<ErrCategory::CREATE>(err), + fmt::format("failed to create txn, txn_id={}, err={}", txn_id, err)}; + } + + std::string partition_version_value; + err = versioned_get(txn.get(), partition_key, versionstamp, &partition_version_value); + if (err != TxnErrorCode::TXN_OK) { + MetaServiceCode code = err == TxnErrorCode::TXN_KEY_NOT_FOUND + ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + return {code, fmt::format("failed to get partition version, txn_id={}, key={} err={}", + txn_id, hex(partition_key), err)}; + } + + VersionPB partition_version; + if (!partition_version.ParseFromString(partition_version_value)) { + return {MetaServiceCode::PROTOBUF_PARSE_ERR, + fmt::format("failed to parse partition version pb, txn_id={}, key={}", txn_id, + hex(partition_key))}; + } + + *is_partition_version_valid = partition_version.pending_txn_ids_size() > 0 && Review Comment: should we LOG the existed txn_id and the expected txn_id if not valid ########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2479,15 +2690,32 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* recycle_pb.set_creation_time(commit_time); recycle_pb.set_label(txn_info.label()); - if (!recycle_pb.SerializeToString(&recycle_val)) { - code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; - ss << "failed to serialize recycle_pb, txn_id=" << txn_id; - msg = ss.str(); - return; + if (is_versioned_write) { + commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb); + std::string log_key = versioned::log_key({instance_id}); + std::string operation_log_value; + OperationLogPB operation_log; + operation_log.mutable_commit_txn()->Swap(&commit_txn_log); + if (!operation_log.SerializeToString(&operation_log_value)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize operation_log, txn_id=" << txn_id; + msg = ss.str(); + return; + } + versioned_put(txn.get(), log_key, operation_log_value); Review Comment: stats the operarion log kv size in bytes and log ########## cloud/src/meta-service/txn_lazy_committer.cpp: ########## @@ -25,27 +25,104 @@ #include "cpp/sync_point.h" #include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_tablet_stats.h" +#include "meta-store/document_message.h" #include "meta-store/keys.h" +#include "meta-store/versioned_value.h" using namespace std::chrono; namespace doris::cloud { +void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t txn_id, + MetaServiceCode& code, std::string& msg, int64_t* db_id); + void scan_tmp_rowset( const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv, - MetaServiceCode& code, std::string& msg, int64_t* db_id, + MetaServiceCode& code, std::string& msg, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* tmp_rowsets_meta, KVStats* stats); void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats, std::unique_ptr<Transaction>& txn, MetaServiceCode& code, std::string& msg); +// Get the versionstamp from the partition version key. +// +// The partition version key is constructed during the txn commit process, so the versionstamp +// can be retrieved from the key itself. +// +// In order to keep consistency, the pending_txn_id is used to ensure the correct versionstamp +// is returned. The is_partition_version_valid flag indicates whether the above condition is met. +std::pair<MetaServiceCode, std::string> get_partition_versionstamp( + TxnKv* txn_kv, std::string_view instance_id, int64_t txn_id, int64_t partition_id, + Versionstamp* versionstamp, bool* is_partition_version_valid) { + std::string partition_key = versioned::partition_version_key({instance_id, partition_id}); + + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + return {cast_as<ErrCategory::CREATE>(err), + fmt::format("failed to create txn, txn_id={}, err={}", txn_id, err)}; + } + + std::string partition_version_value; + err = versioned_get(txn.get(), partition_key, versionstamp, &partition_version_value); + if (err != TxnErrorCode::TXN_OK) { + MetaServiceCode code = err == TxnErrorCode::TXN_KEY_NOT_FOUND + ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + return {code, fmt::format("failed to get partition version, txn_id={}, key={} err={}", + txn_id, hex(partition_key), err)}; + } + + VersionPB partition_version; + if (!partition_version.ParseFromString(partition_version_value)) { + return {MetaServiceCode::PROTOBUF_PARSE_ERR, + fmt::format("failed to parse partition version pb, txn_id={}, key={}", txn_id, + hex(partition_key))}; + } + + *is_partition_version_valid = partition_version.pending_txn_ids_size() > 0 && Review Comment: log more info like partition_id -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org