This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 71fd40ed418 [fix](cloud) provide a conf to enable/disable streamload commit on be (#37858) 71fd40ed418 is described below commit 71fd40ed418633fd27eb1599503ebcc6e573c558 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Tue Jul 16 01:42:17 2024 +0800 [fix](cloud) provide a conf to enable/disable streamload commit on be (#37858) pick #37855 Signed-off-by: freemandealer <freeman.zhang1...@gmail.com> Co-authored-by: freemandealer <freeman.zhang1...@gmail.com> --- be/src/cloud/cloud_stream_load_executor.cpp | 12 ++++++------ be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index a87f37a5188..1b8167c96eb 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -60,7 +60,10 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { Status st = Status::InternalError<false>("impossible branch reached, " + op_info); if (ctx->txn_operation.compare("commit") == 0) { - if (topt == TxnOpParamType::WITH_TXN_ID) { + if (!config::enable_stream_load_commit_txn_on_be) { + VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info; + st = StreamLoadExecutor::operate_txn_2pc(ctx); + } else if (topt == TxnOpParamType::WITH_TXN_ID) { VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info; st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); } else if (topt == TxnOpParamType::WITH_LABEL) { @@ -93,12 +96,9 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { } Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - if (ctx->load_type == TLoadType::ROUTINE_LOAD) { - return StreamLoadExecutor::commit_txn(ctx); - } - // forward to fe to excute commit transaction for MoW table - if (ctx->is_mow_table()) { + if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be || + ctx->load_type == TLoadType::ROUTINE_LOAD) { Status st; int retry_times = 0; while (retry_times < config::mow_stream_load_commit_retry_times) { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 92303473ad6..c8e62465b27 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -537,6 +537,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50"); DEFINE_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800"); +// enable stream load commit txn on BE directly, bypassing FE. Only for cloud. +DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false"); // The buffer size to store stream table function schema info DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 1a9e3291db5..9e348698929 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -586,6 +586,8 @@ DECLARE_mInt32(stream_load_record_batch_size); DECLARE_Int32(stream_load_record_expire_time_secs); // time interval to clean expired stream load records DECLARE_mInt64(clean_stream_load_record_interval_secs); +// enable stream load commit txn on BE directly, bypassing FE. Only for cloud. +DECLARE_mBool(enable_stream_load_commit_txn_on_be); // The buffer size to store stream table function schema info DECLARE_Int64(stream_tvf_buffer_size); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org