gavinchou commented on code in PR #37669:
URL: https://github.com/apache/doris/pull/37669#discussion_r1694295326


##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2553,6 +2568,100 @@ void 
MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle
     response->mutable_txn_info()->CopyFrom(txn_info);
 }
 
+void 
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* 
controller,
+                                                 const 
AbortTxnWithCoordinatorRequest* request,
+                                                 
AbortTxnWithCoordinatorResponse* response,
+                                                 ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(abort_txn_with_coordinator);
+    if (!request->has_id() || !request->has_ip() || 
!request->has_start_time()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid coordinate id, coordinate ip or coordinate start time.";
+        return;
+    }
+    // TODO: For auth
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+    RPC_RATE_LIMIT(abort_txn_with_coordinator);
+    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, 
INT64_MAX});
+    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" 
<< hex(end_info_key);
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+    std::unique_ptr<RangeGetIterator> it;
+    int64_t abort_txn_cnt = 0;
+    int64_t total_iteration_cnt = 0;
+    bool need_commit = false;
+    do {
+        err = txn->get(begin_info_key, end_info_key, &it, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get txn info. err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        while (it->has_next()) {
+            total_iteration_cnt++;
+            auto [k, v] = it->next();
+            LOG(INFO) << "check txn info txn_info_key=" << hex(k);
+            TxnInfoPB info_pb;
+            if (!info_pb.ParseFromArray(v.data(), v.size())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "malformed txn running info";
+                msg = ss.str();
+                ss << " key=" << hex(k);
+                LOG(WARNING) << ss.str();
+                return;
+            }
+            const auto& coordinate = info_pb.coordinator();
+            if (info_pb.status() == TxnStatusPB::TXN_STATUS_PREPARED &&
+                coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && 
coordinate.id() == request->id() &&
+                coordinate.ip() == request->ip() &&

Review Comment:
   what if there are multiple BEs on the same host?



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2640,10 +2750,43 @@ void 
MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont
                         running_table_ids.end(), result.begin());
                 result.resize(iter - result.begin());
                 if (result.size() > 0) {
-                    response->set_finished(false);
-                    LOG(INFO) << "skip timeout txn count: " << 
skip_timeout_txn_cnt
-                              << " total iteration count: " << 
total_iteration_cnt;
-                    return;
+                    finished = false;
+                    std::vector<std::tuple<std::variant<int64_t, std::string>, 
int, int>> out;
+                    std::string_view key_view = k;
+                    key_view.remove_prefix(1);
+                    int ret = decode_key(&key_view, &out);
+                    if (ret != 0) [[unlikely]] {
+                        // decode version key error means this is something 
wrong,
+                        // we can not continue this txn
+                        LOG(WARNING) << "failed to decode key, ret=" << ret << 
" key=" << hex(k);
+                    } else {
+                        DCHECK(out.size() == 5) << " key=" << hex(k) << " " << 
out.size();
+                        const std::string& decode_instance_id = 
std::get<1>(std::get<0>(out[1]));
+                        int64_t db_id = std::get<0>(std::get<0>(out[3]));
+                        int64_t txn_id = std::get<0>(std::get<0>(out[4]));
+                        std::string conflict_txn_info_key =
+                                txn_info_key({decode_instance_id, db_id, 
txn_id});

Review Comment:
   make this part a separate function which should be called 
`get_txn_info_key_from_txn_running_key()`



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2553,6 +2568,100 @@ void 
MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle
     response->mutable_txn_info()->CopyFrom(txn_info);
 }
 
+void 
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* 
controller,
+                                                 const 
AbortTxnWithCoordinatorRequest* request,
+                                                 
AbortTxnWithCoordinatorResponse* response,
+                                                 ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(abort_txn_with_coordinator);
+    if (!request->has_id() || !request->has_ip() || 
!request->has_start_time()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid coordinate id, coordinate ip or coordinate start time.";
+        return;
+    }
+    // TODO: For auth
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+    RPC_RATE_LIMIT(abort_txn_with_coordinator);
+    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, 
INT64_MAX});
+    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" 
<< hex(end_info_key);
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+    std::unique_ptr<RangeGetIterator> it;
+    int64_t abort_txn_cnt = 0;
+    int64_t total_iteration_cnt = 0;
+    bool need_commit = false;
+    do {
+        err = txn->get(begin_info_key, end_info_key, &it, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get txn info. err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        while (it->has_next()) {

Review Comment:
   Is it possible that this loop take more than 5 seconds which will lead to 
"TxnTimeout Error"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to