This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 71fd40ed418 [fix](cloud) provide a conf to enable/disable streamload 
commit on be (#37858)
71fd40ed418 is described below

commit 71fd40ed418633fd27eb1599503ebcc6e573c558
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Tue Jul 16 01:42:17 2024 +0800

    [fix](cloud) provide a conf to enable/disable streamload commit on be 
(#37858)
    
    pick #37855
    
    Signed-off-by: freemandealer <freeman.zhang1...@gmail.com>
    Co-authored-by: freemandealer <freeman.zhang1...@gmail.com>
---
 be/src/cloud/cloud_stream_load_executor.cpp | 12 ++++++------
 be/src/common/config.cpp                    |  2 ++
 be/src/common/config.h                      |  2 ++
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/be/src/cloud/cloud_stream_load_executor.cpp 
b/be/src/cloud/cloud_stream_load_executor.cpp
index a87f37a5188..1b8167c96eb 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -60,7 +60,10 @@ Status 
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
     Status st = Status::InternalError<false>("impossible branch reached, " + 
op_info);
 
     if (ctx->txn_operation.compare("commit") == 0) {
-        if (topt == TxnOpParamType::WITH_TXN_ID) {
+        if (!config::enable_stream_load_commit_txn_on_be) {
+            VLOG_DEBUG << "2pc commit stream load txn with FE support: " << 
op_info;
+            st = StreamLoadExecutor::operate_txn_2pc(ctx);
+        } else if (topt == TxnOpParamType::WITH_TXN_ID) {
             VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
             st = 
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
         } else if (topt == TxnOpParamType::WITH_LABEL) {
@@ -93,12 +96,9 @@ Status 
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
 }
 
 Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
-    if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
-        return StreamLoadExecutor::commit_txn(ctx);
-    }
-
     // forward to fe to excute commit transaction for MoW table
-    if (ctx->is_mow_table()) {
+    if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
+        ctx->load_type == TLoadType::ROUTINE_LOAD) {
         Status st;
         int retry_times = 0;
         while (retry_times < config::mow_stream_load_commit_retry_times) {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 92303473ad6..c8e62465b27 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -537,6 +537,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50");
 DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
+// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
+DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
 // The buffer size to store stream table function schema info
 DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1a9e3291db5..9e348698929 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -586,6 +586,8 @@ DECLARE_mInt32(stream_load_record_batch_size);
 DECLARE_Int32(stream_load_record_expire_time_secs);
 // time interval to clean expired stream load records
 DECLARE_mInt64(clean_stream_load_record_interval_secs);
+// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
+DECLARE_mBool(enable_stream_load_commit_txn_on_be);
 // The buffer size to store stream table function schema info
 DECLARE_Int64(stream_tvf_buffer_size);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to