Lchangliang commented on code in PR #37669: URL: https://github.com/apache/doris/pull/37669#discussion_r1694510821
########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -2553,6 +2568,100 @@ void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle response->mutable_txn_info()->CopyFrom(txn_info); } +void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* controller, + const AbortTxnWithCoordinatorRequest* request, + AbortTxnWithCoordinatorResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(abort_txn_with_coordinator); + if (!request->has_id() || !request->has_ip() || !request->has_start_time()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "invalid coordinate id, coordinate ip or coordinate start time."; + return; + } + // TODO: For auth + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "cannot find instance_id with cloud_unique_id=" + << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); + msg = ss.str(); + return; + } + RPC_RATE_LIMIT(abort_txn_with_coordinator); + std::string begin_info_key = txn_info_key({instance_id, 0, 0}); + std::string end_info_key = txn_info_key({instance_id, INT64_MAX, INT64_MAX}); + LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key); + + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + msg = "failed to create txn"; + code = cast_as<ErrCategory::CREATE>(err); + return; + } + std::unique_ptr<RangeGetIterator> it; + int64_t abort_txn_cnt = 0; + int64_t total_iteration_cnt = 0; + bool need_commit = false; + do { + err = txn->get(begin_info_key, end_info_key, &it, true); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get txn info. err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + while (it->has_next()) { Review Comment: If is timeout, it means there are too many old unfinished txns need to abort. We can abort they also when check conflict txns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org