gavinchou commented on code in PR #58459:
URL: https://github.com/apache/doris/pull/58459#discussion_r2626777132
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -1671,6 +1592,207 @@ int64_t calculate_restore_job_expired_time(
return final_expiration;
}
+int InstanceRecycler::abort_txn_for_related_rowset(int64_t txn_id) {
+ AbortTxnRequest req;
+ TxnInfoPB txn_info;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn").tag("err", err);
+ return -1;
+ }
+
+ // get txn index
+ TxnIndexPB txn_idx_pb;
+ auto index_key = txn_index_key({instance_id_, txn_id});
+ std::string index_val;
+ err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn index").tag("err", err);
+ return -1;
+ }
+ if (!txn_idx_pb.ParseFromString(index_val)) {
+ LOG_WARNING("failed to parse txn index").tag("err", err);
+ return -1;
+ }
+
+ auto info_key = txn_info_key({instance_id_,
txn_idx_pb.tablet_index().db_id(), txn_id});
+ std::string info_val;
+ err = txn->get(info_key, &info_val);
Review Comment:
we may keep a txn_id -> txn_info in cache when we mark a txn aborted when
recycle
Add a TODO/FIXME here
it may reduce a lot of reads to txn info.
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -2567,30 +2567,50 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}
- // Check if the compaction/sc tablet job has finished
- bool is_versioned_read = is_version_read_enabled(instance_id);
- if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
- !request->tablet_job_id().empty()) {
- if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id,
rowset_id,
- request->tablet_job_id(), is_versioned_read,
resource_mgr_.get())) {
+ auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id,
rowset_id});
+ // Check recycle_rowset_key to ensure idempotency for commit_rowset
operation.
+ // The precondition for commit_rowset is that prepare_rowset has been
successfully executed,
+ // which creates the recycle_rowset_key. Therefore, we only need to check
if the
+ // recycle_rowset_key exists to determine if this is a duplicate request:
+ // - If key not found: commit_rowset has already been executed and remove
the key,
+ // this is a duplicate request and should be rejected.
+ // - If key exists but marked as recycled: the rowset data has been
recycled by recycler,
+ // this request should be rejected to prevent data inconsistency.
+ // - If key exists and not marked: this is a valid commit_rowset request,
proceed normally.
+ // Note: We don't need to check txn/job status separately because
prepare_rowset has already
+ // validated them, and the existence of recycle_rowset_key is sufficient
to guarantee idempotency.
+ if (config::enable_recycle_rowset_key_check) {
+ std::string recycle_rs_val;
+ err = txn->get(recycle_rs_key, &recycle_rs_val);
+ if (err == TxnErrorCode::TXN_OK) {
+ RecycleRowsetPB recycle_rowset;
+ if (!recycle_rowset.ParseFromString(recycle_rs_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("malformed recycle rowset value. key={}",
hex(recycle_rs_key));
+ return;
+ }
+ auto rs_meta = recycle_rowset.rowset_meta();
+ if (rs_meta.has_recycled_marked() && rs_meta.recycled_marked()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("rowset has already been marked as recycled,
key={}, rs_meta={}",
+ hex(recycle_rs_key),
rs_meta.ShortDebugString());
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
Review Comment:
we should not check "delete rowset" for request from older version FE BE
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -1671,6 +1592,207 @@ int64_t calculate_restore_job_expired_time(
return final_expiration;
}
+int InstanceRecycler::abort_txn_for_related_rowset(int64_t txn_id) {
+ AbortTxnRequest req;
+ TxnInfoPB txn_info;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn").tag("err", err);
+ return -1;
+ }
+
+ // get txn index
+ TxnIndexPB txn_idx_pb;
+ auto index_key = txn_index_key({instance_id_, txn_id});
+ std::string index_val;
+ err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn index").tag("err", err);
+ return -1;
+ }
+ if (!txn_idx_pb.ParseFromString(index_val)) {
+ LOG_WARNING("failed to parse txn index").tag("err", err);
+ return -1;
+ }
+
+ auto info_key = txn_info_key({instance_id_,
txn_idx_pb.tablet_index().db_id(), txn_id});
+ std::string info_val;
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn info").tag("err", err);
+ return -1;
+ }
+ if (!txn_info.ParseFromString(info_val)) {
+ LOG_WARNING("failed to parse txn info").tag("err", err);
+ return -1;
+ }
+
+ if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+ LOG_INFO("txn is not prepared status, txn_id={}", txn_id);
+ return 0;
+ }
+
+ req.set_txn_id(txn_id);
+
+ LOG(INFO) << "begin abort txn for related rowset, txn_id=" << txn_id
+ << " instance_id=" << instance_id_ << " txn_info=" <<
txn_info.ShortDebugString();
+
+ _abort_txn(instance_id_, &req, txn.get(), txn_info, ss, code, msg);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_info.txn_id() << "
err=" << err;
+ msg = ss.str();
+ return -1;
+ }
+
+ LOG(INFO) << "finish abort txn for related rowset, txn_id=" << txn_id
+ << " instance_id=" << instance_id_ << " txn_info=" <<
txn_info.ShortDebugString()
+ << " code=" << code << " msg=" << msg;
+
+ return 0;
+}
+
+int InstanceRecycler::abort_job_for_related_rowset(const RowsetMetaCloudPB&
rowset_meta) {
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+ req.set_action(FinishTabletJobRequest::ABORT);
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+
+ TabletIndexPB tablet_idx;
+ int ret = get_tablet_idx(txn_kv_.get(), instance_id_,
rowset_meta.tablet_id(), tablet_idx);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to get tablet index, tablet_id=" <<
rowset_meta.tablet_id()
+ << " instance_id=" << instance_id_ << " ret=" << ret;
+ return ret;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn, instance_id=" << instance_id_
<< " err=" << err;
+ return -1;
+ }
+
+ std::string job_key =
+ job_tablet_key({instance_id_, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(),
tablet_idx.tablet_id()});
+ std::string job_val;
+ err = txn->get(job_key, &job_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ LOG(INFO) << "job not exists, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id();
+ return 0;
+ }
+ LOG(WARNING) << "failed to get job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id() << " err=" <<
err;
+ return -1;
+ }
+
+ TabletJobInfoPB job_pb;
+ if (!job_pb.ParseFromString(job_val)) {
+ LOG(WARNING) << "failed to parse job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id();
+ return -1;
+ }
+
+ std::string job_id {};
+ if (!job_pb.compaction().empty()) {
+ for (const auto& c : job_pb.compaction()) {
+ if (c.id() == rowset_meta.job_id()) {
+ job_id = c.id();
+ break;
+ }
+ }
+ } else if (job_pb.has_schema_change()) {
+ job_id = job_pb.schema_change().id();
+ }
+
+ if (!job_id.empty() && rowset_meta.job_id() == job_id) {
+ LOG(INFO) << "begin to abort job for related rowset, job_id=" <<
rowset_meta.job_id()
+ << " instance_id=" << instance_id_ << " tablet_id=" <<
tablet_idx.tablet_id();
+ req.mutable_job()->CopyFrom(job_pb);
+ req.set_action(FinishTabletJobRequest::ABORT);
+ _finish_tablet_job(&req, &res, instance_id_, txn, txn_kv_.get(),
+ delete_bitmap_lock_white_list_.get(),
resource_mgr_.get(), code, msg,
+ ss);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "failed to abort job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id() << "
code=" << code
+ << " msg=" << msg;
+ return -1;
+ }
+ LOG(INFO) << "finish abort job for related rowset, job_id=" <<
rowset_meta.job_id()
+ << " instance_id=" << instance_id_ << " tablet_id=" <<
tablet_idx.tablet_id()
+ << " code=" << code << " msg=" << msg;
+ } else {
+ LOG(INFO) << "there is no job for related rowset, directly recycle
rowset data, "
+ "instance_id="
+ << instance_id_ << " tablet_id=" << tablet_idx.tablet_id();
+ }
+
+ return 0;
+}
+
+int InstanceRecycler::abort_txn_or_job_for_recycle(const RowsetMetaCloudPB&
rowset_meta) {
Review Comment:
we should not proceed when the rowset is input of a compaction, in which the
recyle type is COMPACT.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]