gavinchou commented on code in PR #53468: URL: https://github.com/apache/doris/pull/53468#discussion_r2222899321
########## cloud/src/recycler/recycler.cpp: ########## @@ -2505,6 +2518,107 @@ int InstanceRecycler::recycle_rowsets() { return ret; } +int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view key, + const RowsetMetaCloudPB& rowset_meta) { + constexpr int MAX_RETRY = 10; + int64_t tablet_id = rowset_meta.tablet_id(); + const std::string& rowset_id = rowset_meta.rowset_id_v2(); + for (int i = 0; i < MAX_RETRY; ++i) { + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to create txn").tag("err", err); + return -1; + } + + std::string rowset_ref_count_key = + versioned::data_rowset_ref_count_key({instance_id_, tablet_id, rowset_id}); + int64_t ref_count = 0; + { + std::string value; + TxnErrorCode err = txn->get(rowset_ref_count_key, &value); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + // This is the old version rowset, we could recycle it directly. + ref_count = 1; + } else if (err != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to get rowset ref count key").tag("err", err); + return -1; + } else if (!txn->decode_atomic_int(value, &ref_count)) { + LOG_WARNING("failed to decode rowset data ref count") + .tag("key", hex(key)) + .tag("rowset_id", rowset_id) + .tag("tablet_id", tablet_id) + .tag("value", hex(value)); + return -1; + } + }; + + if (ref_count == 1) { + // It would not be added since it is recycling. + if (delete_rowset_data(rowset_meta) != 0) { + LOG_WARNING("failed to delete rowset data") + .tag("tablet_id", tablet_id) + .tag("rowset_id", rowset_id) + .tag("key", hex(key)); + return -1; + } + + // Reset the transaction to avoid timeout. + err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to create txn").tag("err", err); + return -1; + } + txn->remove(rowset_ref_count_key); + LOG_INFO("delete rowset data ref count key") + .tag("txn_id", rowset_meta.txn_id()) + .tag("key", hex(key)) + .tag("tablet_id", tablet_id) + .tag("rowset_id", rowset_id); + } else { + // Decrease the rowset ref count. + // + // The read conflict range will protect the rowset ref count key, if any conflict happens, + // we will retry and check whether the rowset ref count is 1 and the data need to be deleted. + txn->atomic_add(rowset_ref_count_key, -1); Review Comment: 这里需要构造足够多的case 能覆盖到所有ref count 加加减减的逻辑. 是不是只能是基于fdb的单测, 端到端的测试很难测到corener case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org