This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 30da34a9280 [fix](cloud) fix inconsistent reponse between cloud mode 
and local mode for streamload 2PC (#38076)
30da34a9280 is described below

commit 30da34a92800a8f72e1ee13dc5805d0aefc7cd23
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Fri Jul 19 09:04:35 2024 +0800

    [fix](cloud) fix inconsistent reponse between cloud mode and local mode for 
streamload 2PC (#38076)
    
    fix  the regression test case of flink_connector_response for cloud p0.
---
 cloud/src/meta-service/meta_service_txn.cpp           | 19 ++++++++++++++-----
 .../cloud/transaction/CloudGlobalTransactionMgr.java  |  7 ++++---
 2 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 4d48d7c9df4..0a3439e94f7 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -826,7 +826,12 @@ void commit_txn_immediately(
     if (err != TxnErrorCode::TXN_OK) {
         code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
                                                       : 
cast_as<ErrCategory::READ>(err);
-        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
+        } else {
+            ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id
+               << " err=" << err;
+        }
         msg = ss.str();
         LOG(WARNING) << msg;
         return;
@@ -845,7 +850,7 @@ void commit_txn_immediately(
     DCHECK(txn_info.txn_id() == txn_id);
     if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
         code = MetaServiceCode::TXN_ALREADY_ABORTED;
-        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        ss << "transaction [" << txn_id << "] is already aborted, db_id=" << 
db_id;
         msg = ss.str();
         LOG(WARNING) << msg;
         return;
@@ -1868,7 +1873,11 @@ void 
MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
             if (err != TxnErrorCode::TXN_OK) {
                 code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
                                                               : 
cast_as<ErrCategory::READ>(err);
-                ss << "failed to get db id, txn_id=" << txn_id << " err=" << 
err;
+                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                    ss << "transaction [" << txn_id << "] not found";
+                } else {
+                    ss << "failed to get txn info, txn_id=" << txn_id << " 
err=" << err;
+                }
                 msg = ss.str();
                 return;
             }
@@ -1911,13 +1920,13 @@ void 
MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
         //check state is valid.
         if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
             code = MetaServiceCode::TXN_ALREADY_ABORTED;
-            ss << "transaction is already abort db_id=" << db_id << "txn_id=" 
<< txn_id;
+            ss << "transaction [" << txn_id << "] is already aborted, db_id=" 
<< db_id;
             msg = ss.str();
             return;
         }
         if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
             code = MetaServiceCode::TXN_ALREADY_VISIBLE;
-            ss << "transaction is already visible db_id=" << db_id << 
"txn_id=" << txn_id;
+            ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" 
<< db_id;
             msg = ss.str();
             return;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 727192b4e57..a4d0e582471 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -529,6 +529,10 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             throw new UserException("commitTxn() failed, errMsg:" + 
e.getMessage());
         }
 
+        if (is2PC && (commitTxnResponse.getStatus().getCode() == 
MetaServiceCode.TXN_ALREADY_VISIBLE
+                || commitTxnResponse.getStatus().getCode() == 
MetaServiceCode.TXN_ALREADY_ABORTED)) {
+            throw new UserException(commitTxnResponse.getStatus().getMsg());
+        }
         if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK
                 && commitTxnResponse.getStatus().getCode() != 
MetaServiceCode.TXN_ALREADY_VISIBLE) {
             LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, 
commitTxnResponse:{}",
@@ -545,9 +549,6 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
             throw new UserException("internal error, " + 
internalMsgBuilder.toString());
         }
-        if (is2PC && commitTxnResponse.getStatus().getCode() == 
MetaServiceCode.TXN_ALREADY_VISIBLE) {
-            throw new UserException(commitTxnResponse.getStatus().getMsg());
-        }
 
         TransactionState txnState = 
TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
         TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to