This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 30da34a9280 [fix](cloud) fix inconsistent reponse between cloud mode and local mode for streamload 2PC (#38076) 30da34a9280 is described below commit 30da34a92800a8f72e1ee13dc5805d0aefc7cd23 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Fri Jul 19 09:04:35 2024 +0800 [fix](cloud) fix inconsistent reponse between cloud mode and local mode for streamload 2PC (#38076) fix the regression test case of flink_connector_response for cloud p0. --- cloud/src/meta-service/meta_service_txn.cpp | 19 ++++++++++++++----- .../cloud/transaction/CloudGlobalTransactionMgr.java | 7 ++++--- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 4d48d7c9df4..0a3439e94f7 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -826,7 +826,12 @@ void commit_txn_immediately( if (err != TxnErrorCode::TXN_OK) { code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND : cast_as<ErrCategory::READ>(err); - ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "transaction [" << txn_id << "] not found, db_id=" << db_id; + } else { + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id + << " err=" << err; + } msg = ss.str(); LOG(WARNING) << msg; return; @@ -845,7 +850,7 @@ void commit_txn_immediately( DCHECK(txn_info.txn_id() == txn_id); if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { code = MetaServiceCode::TXN_ALREADY_ABORTED; - ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id; + ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; msg = ss.str(); LOG(WARNING) << msg; return; @@ -1868,7 +1873,11 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, if (err != TxnErrorCode::TXN_OK) { code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND : cast_as<ErrCategory::READ>(err); - ss << "failed to get db id, txn_id=" << txn_id << " err=" << err; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "transaction [" << txn_id << "] not found"; + } else { + ss << "failed to get txn info, txn_id=" << txn_id << " err=" << err; + } msg = ss.str(); return; } @@ -1911,13 +1920,13 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, //check state is valid. if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { code = MetaServiceCode::TXN_ALREADY_ABORTED; - ss << "transaction is already abort db_id=" << db_id << "txn_id=" << txn_id; + ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; msg = ss.str(); return; } if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { code = MetaServiceCode::TXN_ALREADY_VISIBLE; - ss << "transaction is already visible db_id=" << db_id << "txn_id=" << txn_id; + ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id; msg = ss.str(); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 727192b4e57..a4d0e582471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -529,6 +529,10 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { throw new UserException("commitTxn() failed, errMsg:" + e.getMessage()); } + if (is2PC && (commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE + || commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_ABORTED)) { + throw new UserException(commitTxnResponse.getStatus().getMsg()); + } if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK && commitTxnResponse.getStatus().getCode() != MetaServiceCode.TXN_ALREADY_VISIBLE) { LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, commitTxnResponse:{}", @@ -545,9 +549,6 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { internalMsgBuilder.append(commitTxnResponse.getStatus().getCode()); throw new UserException("internal error, " + internalMsgBuilder.toString()); } - if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) { - throw new UserException(commitTxnResponse.getStatus().getMsg()); - } TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org