w41ter commented on code in PR #38243: URL: https://github.com/apache/doris/pull/38243#discussion_r1715102836
########## cloud/src/recycler/recycler.cpp: ########## @@ -1707,6 +1777,62 @@ int InstanceRecycler::recycle_rowsets() { return ret; } +bool check_txn_abort(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, + int64_t txn_id) { + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to create txn, txn_id=" << txn_id << " instance_id=" << instance_id; + return false; + } + + std::string index_val; + const std::string index_key = txn_index_key({instance_id, txn_id}); + err = txn->get(index_key, &index_val); + if (err != TxnErrorCode::TXN_OK) { + if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) { + // txn has been recycled; + return true; + } + LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id + << " instance_id=" << instance_id << " key=" << hex(index_key); + return false; + } + + TxnIndexPB index_pb; + if (!index_pb.ParseFromString(index_val)) { + LOG(WARNING) << "failed to parse txn_index_pb, txn_id=" << txn_id + << " instance_id=" << instance_id; + return false; + } + + DCHECK(index_pb.has_tablet_index() == true); + DCHECK(index_pb.tablet_index().has_db_id() == true); + int64_t db_id = index_pb.tablet_index().db_id(); Review Comment: For compatibility, consider checking the DB id before using it, just like: ```c++ if (!index_pb.tablet_index().has_db_id()) { return false; } ``` ########## cloud/src/recycler/recycler.cpp: ########## @@ -1707,6 +1777,62 @@ int InstanceRecycler::recycle_rowsets() { return ret; } +bool check_txn_abort(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, + int64_t txn_id) { + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to create txn, txn_id=" << txn_id << " instance_id=" << instance_id; + return false; + } + + std::string index_val; + const std::string index_key = txn_index_key({instance_id, txn_id}); + err = txn->get(index_key, &index_val); + if (err != TxnErrorCode::TXN_OK) { + if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) { + // txn has been recycled; + return true; + } + LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id + << " instance_id=" << instance_id << " key=" << hex(index_key); + return false; + } + + TxnIndexPB index_pb; + if (!index_pb.ParseFromString(index_val)) { + LOG(WARNING) << "failed to parse txn_index_pb, txn_id=" << txn_id + << " instance_id=" << instance_id; + return false; + } + + DCHECK(index_pb.has_tablet_index() == true); + DCHECK(index_pb.tablet_index().has_db_id() == true); + int64_t db_id = index_pb.tablet_index().db_id(); Review Comment: A new kind of task needs to be added. This task should scan all partition keys, parse db_id, and fill the db_id of the `TabletIndexPB`. ########## cloud/src/recycler/recycler.cpp: ########## @@ -1918,42 +2043,69 @@ int InstanceRecycler::abort_timeout_txn() { LOG_WARNING("failed to parse txn info").tag("key", hex(k)); return -1; } - txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED); - txn_info.set_finish_time(current_time); - txn_info.set_reason("timeout"); - VLOG_DEBUG << "txn_info=" << txn_info.DebugString(); - txn_inf_val.clear(); - if (!txn_info.SerializeToString(&txn_inf_val)) { - LOG_WARNING("failed to serialize txn info").tag("key", hex(k)); - return -1; - } - txn->put(txn_inf_key, txn_inf_val); - VLOG_DEBUG << "txn->put, txn_inf_key=" << hex(txn_inf_key); - // Put recycle txn key - std::string recyc_txn_key, recyc_txn_val; - recycle_txn_key({instance_id_, db_id, txn_id}, &recyc_txn_key); - RecycleTxnPB recycle_txn_pb; - recycle_txn_pb.set_creation_time(current_time); - recycle_txn_pb.set_label(txn_info.label()); - if (!recycle_txn_pb.SerializeToString(&recyc_txn_val)) { - LOG_WARNING("failed to serialize txn recycle info") - .tag("key", hex(k)) - .tag("db_id", db_id) - .tag("txn_id", txn_id); - return -1; - } - txn->put(recyc_txn_key, recyc_txn_val); - // Remove txn running key - txn->remove(k); - err = txn->commit(); - if (err != TxnErrorCode::TXN_OK) { - LOG_WARNING("failed to commit txn err={}", err) - .tag("key", hex(k)) - .tag("db_id", db_id) - .tag("txn_id", txn_id); - return -1; + + if (TxnStatusPB::TXN_STATUS_COMMITTED == txn_info.status()) { + txn.reset(); + std::shared_ptr<TxnLazyCommitTask> task = + txn_lazy_committer_->submit(instance_id_, txn_info.txn_id()); + std::pair<MetaServiceCode, std::string> ret = task->wait(); + if (ret.first != MetaServiceCode::OK) { + LOG(WARNING) << "lazy commit txn failed txn_id=" << txn_id << " code=" << ret.first + << "msg=" << ret.second; + return -1; + } + ++num_advance; + return 0; + } else { Review Comment: A suggestion, remove this `else {`, so the diff view will be clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org