This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 201602d3d1f [streamload](2pc) Fix 2pc stream load txn in cloud mode 
(#37033)
201602d3d1f is described below

commit 201602d3d1f50eedb8934d808034cb904c379cfb
Author: Gavin Chou <gavineaglec...@gmail.com>
AuthorDate: Sun Jun 30 20:37:05 2024 +0800

    [streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033)
    
    Abort load txn with label only should be forwarded to FE master to
    handle due to lack of db id.
---
 be/src/cloud/cloud_meta_mgr.cpp                    |  6 +-
 be/src/cloud/cloud_stream_load_executor.cpp        | 69 ++++++++++++++++++++--
 be/src/common/config.cpp                           |  2 +-
 .../runtime/stream_load/stream_load_executor.cpp   | 42 ++++++-------
 .../apache/doris/service/FrontendServiceImpl.java  |  2 +-
 .../load_p0/stream_load/test_stream_load.groovy    |  1 +
 6 files changed, 89 insertions(+), 33 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index f0a377cba67..732f3023e91 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -839,8 +839,12 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& 
ctx) {
     if (ctx.db_id > 0 && !ctx.label.empty()) {
         req.set_db_id(ctx.db_id);
         req.set_label(ctx.label);
-    } else {
+    } else if (ctx.txn_id > 0) {
         req.set_txn_id(ctx.txn_id);
+    } else {
+        LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << 
ctx.db_id
+                     << " txn_id=" << ctx.txn_id << " label=" << ctx.label;
+        return Status::InternalError<false>("failed to abort txn");
     }
     return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
 }
diff --git a/be/src/cloud/cloud_stream_load_executor.cpp 
b/be/src/cloud/cloud_stream_load_executor.cpp
index b7d428e59a4..92fb73eacc1 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -26,6 +26,12 @@
 
 namespace doris {
 
+enum class TxnOpParamType : int {
+    ILLEGAL,
+    WITH_TXN_ID,
+    WITH_LABEL,
+};
+
 CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env)
         : StreamLoadExecutor(exec_env) {}
 
@@ -42,13 +48,48 @@ Status 
CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
 }
 
 Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
-    VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation;
+    std::stringstream ss;
+    ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << 
ctx->label
+       << " txn_2pc_op=" << ctx->txn_operation;
+    std::string op_info = ss.str();
+    VLOG_DEBUG << "operate_txn_2pc " << op_info;
+    TxnOpParamType topt = ctx->txn_id > 0       ? TxnOpParamType::WITH_TXN_ID
+                          : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
+                                                : TxnOpParamType::ILLEGAL;
+
+    Status st = Status::InternalError<false>("impossible branch reached, " + 
op_info);
+
     if (ctx->txn_operation.compare("commit") == 0) {
-        return 
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
+        if (topt == TxnOpParamType::WITH_TXN_ID) {
+            VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
+            st = 
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
+        } else if (topt == TxnOpParamType::WITH_LABEL) {
+            VLOG_DEBUG << "2pc commit stream load txn with FE support: " << 
op_info;
+            st = StreamLoadExecutor::operate_txn_2pc(ctx);
+        } else {
+            st = Status::InternalError<false>(
+                    "failed to 2pc commit txn, with TxnOpParamType::illegal 
input, " + op_info);
+        }
+    } else if (ctx->txn_operation.compare("abort") == 0) {
+        if (topt == TxnOpParamType::WITH_TXN_ID) {
+            LOG(INFO) << "2pc abort stream load txn directly: " << op_info;
+            st = 
_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
+            WARN_IF_ERROR(st, "failed to rollback txn " + op_info);
+        } else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send 
to FE to abort
+            VLOG_DEBUG << "2pc abort stream load txn with FE support: " << 
op_info;
+            StreamLoadExecutor::rollback_txn(ctx);
+            st = Status::OK();
+        } else {
+            st = Status::InternalError<false>("failed abort txn, with illegal 
input, " + op_info);
+        }
     } else {
-        // 2pc abort
-        return 
_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
+        std::string msg =
+                "failed to operate_txn_2pc, unrecognized operation: " + 
ctx->txn_operation;
+        LOG(WARNING) << msg << " " << op_info;
+        st = Status::InternalError<false>(msg + " " + op_info);
     }
+    WARN_IF_ERROR(st, "failed to operate_txn_2pc " + op_info)
+    return st;
 }
 
 Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
@@ -85,8 +126,24 @@ Status 
CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
 }
 
 void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
-    
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
-                  "Failed to rollback txn");
+    std::stringstream ss;
+    ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << 
ctx->label;
+    std::string op_info = ss.str();
+    LOG(INFO) << "rollback stream laod txn " << op_info;
+    TxnOpParamType topt = ctx->txn_id > 0       ? TxnOpParamType::WITH_TXN_ID
+                          : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
+                                                : TxnOpParamType::ILLEGAL;
+
+    if (topt == TxnOpParamType::WITH_TXN_ID) {
+        VLOG_DEBUG << "abort stream load txn directly: " << op_info;
+        
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
+                      "failed to rollback txn " + op_info);
+    } else { // maybe a label send to FE to abort
+        // does not care about the return status
+        // ctx->db_id > 0 && !ctx->label.empty()
+        VLOG_DEBUG << "abort stream load txn with FE support: " << op_info;
+        StreamLoadExecutor::rollback_txn(ctx);
+    }
 }
 
 } // namespace doris
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9df75b97bd6..c2274fd169b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -139,7 +139,7 @@ DEFINE_mBool(enable_stacktrace_in_allocator_check_failed, 
"false");
 
 DEFINE_mInt64(large_memory_check_bytes, "2147483648");
 
-DEFINE_mBool(enable_memory_orphan_check, "true");
+DEFINE_mBool(enable_memory_orphan_check, "false");
 
 // The maximum time a thread waits for full GC. Currently only query will wait 
for full gc.
 DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index d26beb66827..2bd1c16199d 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -166,9 +166,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
 
     TLoadTxnBeginRequest request;
     set_request_auth(&request, ctx->auth);
-    request.db = ctx->db;
-    request.tbl = ctx->table;
-    request.label = ctx->label;
+    request.__set_db(ctx->db);
+    request.__set_tbl(ctx->table);
+    request.__set_label(ctx->label);
     // set timestamp
     request.__set_timestamp(GetCurrentTimeMicros());
     if (ctx->timeout_second != -1) {
@@ -286,27 +286,23 @@ Status 
StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
 void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
                                             TLoadTxnCommitRequest& request) {
     set_request_auth(&request, ctx->auth);
-    request.db = ctx->db;
+    request.__set_db(ctx->db);
     if (ctx->db_id > 0) {
-        request.db_id = ctx->db_id;
-        request.__isset.db_id = true;
+        request.__set_db_id(ctx->db_id);
     }
-    request.tbl = ctx->table;
-    request.txnId = ctx->txn_id;
-    request.sync = true;
-    request.commitInfos = ctx->commit_infos;
-    request.__isset.commitInfos = true;
+    request.__set_tbl(ctx->table);
+    request.__set_txnId(ctx->txn_id);
+    request.__set_sync(true);
+    request.__set_commitInfos(ctx->commit_infos);
     request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
-    request.tbls = ctx->table_list;
-    request.__isset.tbls = true;
+    request.__set_tbls(ctx->table_list);
 
     VLOG_DEBUG << "commit txn request:" << 
apache::thrift::ThriftDebugString(request);
 
     // set attachment if has
     TTxnCommitAttachment attachment;
     if (collect_load_stat(ctx, &attachment)) {
-        request.txnCommitAttachment = attachment;
-        request.__isset.txnCommitAttachment = true;
+        request.__set_txnCommitAttachment(attachment);
     }
 }
 
@@ -353,22 +349,20 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* 
ctx) {
     TNetworkAddress master_addr = _exec_env->master_info()->network_address;
     TLoadTxnRollbackRequest request;
     set_request_auth(&request, ctx->auth);
-    request.db = ctx->db;
+    request.__set_db(ctx->db);
     if (ctx->db_id > 0) {
-        request.db_id = ctx->db_id;
-        request.__isset.db_id = true;
+        request.__set_db_id(ctx->db_id);
     }
-    request.tbl = ctx->table;
-    request.txnId = ctx->txn_id;
+    request.__set_tbl(ctx->table);
+    request.__set_txnId(ctx->txn_id);
     request.__set_reason(ctx->status.to_string());
-    request.tbls = ctx->table_list;
-    request.__isset.tbls = true;
+    request.__set_tbls(ctx->table_list);
+    request.__set_label(ctx->label);
 
     // set attachment if has
     TTxnCommitAttachment attachment;
     if (collect_load_stat(ctx, &attachment)) {
-        request.txnCommitAttachment = attachment;
-        request.__isset.txnCommitAttachment = true;
+        request.__set_txnCommitAttachment(attachment);
     }
 
     TLoadTxnRollbackResult result;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 75a1987eb23..b7e11be47be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1823,7 +1823,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new MetaNotFoundException("db " + request.getDb() + " does 
not exist");
         }
         long dbId = db.getId();
-        if (request.getTxnId() != 0) { // txnId is required in thrift
+        if (request.getTxnId() > 0) { // txnId is required in thrift
             TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
                     .getTransactionState(dbId, request.getTxnId());
             if (transactionState == null) {
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 07decba0950..660ad50bd9d 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1166,6 +1166,7 @@ suite("test_stream_load", "p0") {
             requestBuilder.setHeader("Expect", "100-Continue")
             requestBuilder.setHeader("label", "${label}")
             requestBuilder.setHeader("txn_operation", "${txn_operation}")
+            log.info("stream load request " + requestBuilder.toString())
 
             String backendStreamLoadUri = null
             client.execute(requestBuilder.build()).withCloseable { resp ->


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to