gavinchou commented on code in PR #37669:
URL: https://github.com/apache/doris/pull/37669#discussion_r1694690972


##########
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java:
##########
@@ -442,6 +444,56 @@ public boolean existCommittedTxns(Long dbId, Long tableId, 
Long partitionId) {
         return false;
     }
 
+    public static boolean checkFailedTxnsByCoordinator(TransactionState txn) {
+        TxnCoordinator coordinator = txn.getCoordinator();
+        if (coordinator.sourceType == TransactionState.TxnSourceType.FE) {
+            List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
+            for (Frontend fe : frontends) {
+                if (fe.getHost().equals(coordinator.ip) && 
fe.getLastStartupTime() > coordinator.startTime) {
+                    return true;
+                }
+            }
+        } else if (coordinator.sourceType == 
TransactionState.TxnSourceType.BE) {
+            Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id);
+            if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() 
> coordinator.startTime
+                    || (!be.isAlive() && System.currentTimeMillis() - 
be.getLastUpdateMs()
+                                >= 
Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) {
+                return true;
+            }
+        }

Review Comment:
   add check for dropped FE/BE(dead)



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2553,6 +2568,119 @@ void 
MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle
     response->mutable_txn_info()->CopyFrom(txn_info);
 }
 
+void 
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* 
controller,
+                                                 const 
AbortTxnWithCoordinatorRequest* request,
+                                                 
AbortTxnWithCoordinatorResponse* response,
+                                                 ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(abort_txn_with_coordinator);
+    if (!request->has_id() || !request->has_ip() || 
!request->has_start_time()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid coordinate id, coordinate ip or coordinate start time.";
+        return;
+    }
+    // TODO: For auth
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+    RPC_RATE_LIMIT(abort_txn_with_coordinator);
+    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, 
INT64_MAX});
+    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" 
<< hex(end_info_key);
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+    std::unique_ptr<RangeGetIterator> it;
+    int64_t abort_txn_cnt = 0;
+    int64_t total_iteration_cnt = 0;
+    bool need_commit = false;
+    do {
+        err = txn->get(begin_info_key, end_info_key, &it, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get txn info. err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        while (it->has_next()) {
+            total_iteration_cnt++;
+            auto [k, v] = it->next();
+            LOG(INFO) << "check txn info txn_info_key=" << hex(k);

Review Comment:
   VLOG 



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2553,6 +2568,100 @@ void 
MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle
     response->mutable_txn_info()->CopyFrom(txn_info);
 }
 
+void 
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* 
controller,
+                                                 const 
AbortTxnWithCoordinatorRequest* request,
+                                                 
AbortTxnWithCoordinatorResponse* response,
+                                                 ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(abort_txn_with_coordinator);
+    if (!request->has_id() || !request->has_ip() || 
!request->has_start_time()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid coordinate id, coordinate ip or coordinate start time.";
+        return;
+    }
+    // TODO: For auth
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+    RPC_RATE_LIMIT(abort_txn_with_coordinator);
+    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, 
INT64_MAX});
+    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" 
<< hex(end_info_key);
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+    std::unique_ptr<RangeGetIterator> it;
+    int64_t abort_txn_cnt = 0;
+    int64_t total_iteration_cnt = 0;
+    bool need_commit = false;
+    do {
+        err = txn->get(begin_info_key, end_info_key, &it, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get txn info. err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        while (it->has_next()) {

Review Comment:
   create_txn every 3 seconds 



##########
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java:
##########
@@ -442,6 +444,56 @@ public boolean existCommittedTxns(Long dbId, Long tableId, 
Long partitionId) {
         return false;
     }
 
+    public static boolean checkFailedTxnsByCoordinator(TransactionState txn) {
+        TxnCoordinator coordinator = txn.getCoordinator();
+        if (coordinator.sourceType == TransactionState.TxnSourceType.FE) {
+            List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
+            for (Frontend fe : frontends) {
+                if (fe.getHost().equals(coordinator.ip) && 
fe.getLastStartupTime() > coordinator.startTime) {
+                    return true;
+                }
+            }
+        } else if (coordinator.sourceType == 
TransactionState.TxnSourceType.BE) {
+            Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id);
+            if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() 
> coordinator.startTime
+                    || (!be.isAlive() && System.currentTimeMillis() - 
be.getLastUpdateMs()
+                                >= 
Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static List<TransactionState> 
checkFailedTxns(List<TransactionState> conflictTxns) {
+        List<TransactionState> failedTxns = new ArrayList<>();
+        for (TransactionState txn : conflictTxns) {
+            boolean failed = false;
+            if (!failed) {
+                failed = checkFailedTxnsByCoordinator(txn);
+            }
+            if (failed) {
+                failedTxns.add(txn);
+            }

Review Comment:
   ```suggestion
               if (checkFailedTxnsByCoordinator(txn)) { // failed
                   failedTxns.add(txn);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to