This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 18a9b7bc790 [Opt](streamload) Commit txn on BE coordinator in cloud 
mode to avoid RPC and lock overhead (#36237)
18a9b7bc790 is described below

commit 18a9b7bc790602cffa9d05e5df10451f0ce501dc
Author: Gavin Chou <gavineaglec...@gmail.com>
AuthorDate: Sat Jun 15 11:04:57 2024 +0800

    [Opt](streamload) Commit txn on BE coordinator in cloud mode to avoid RPC 
and lock overhead (#36237)
    
    In cloud mode, FE master is not mandatory for committing load txn, and
    there is overhead (locks and PRC) if we commit load txn via FE master.
---
 be/src/cloud/cloud_stream_load_executor.cpp        | 71 +++++++++++++++++-----
 be/src/cloud/cloud_stream_load_executor.h          | 10 ++-
 be/src/runtime/stream_load/stream_load_context.cpp |  7 +++
 be/src/runtime/stream_load/stream_load_context.h   |  2 +
 be/src/runtime/stream_load/stream_load_executor.h  |  8 +--
 .../transaction/CloudGlobalTransactionMgr.java     | 45 ++++++++------
 6 files changed, 106 insertions(+), 37 deletions(-)

diff --git a/be/src/cloud/cloud_stream_load_executor.cpp 
b/be/src/cloud/cloud_stream_load_executor.cpp
index 7381e4de069..b7d428e59a4 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -17,6 +17,8 @@
 
 #include "cloud/cloud_stream_load_executor.h"
 
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
 #include "cloud/config.h"
 #include "common/logging.h"
 #include "common/status.h"
@@ -29,23 +31,62 @@ CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* 
exec_env)
 
 CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default;
 
+Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
+    auto st = 
_exec_env->storage_engine().to_cloud().meta_mgr().precommit_txn(*ctx);
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to precommit txn: " << st << ", " << 
ctx->brief();
+        return st;
+    }
+    ctx->need_rollback = false;
+    return st;
+}
+
+Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
+    VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation;
+    if (ctx->txn_operation.compare("commit") == 0) {
+        return 
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
+    } else {
+        // 2pc abort
+        return 
_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
+    }
+}
+
 Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
-    // forward to fe to execute commit transaction for MoW table
-    Status st;
-    int retry_times = 0;
-    // mow table will retry when DELETE_BITMAP_LOCK_ERROR occurs
-    do {
-        st = StreamLoadExecutor::commit_txn(ctx);
-        if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
-            break;
+    if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
+        return StreamLoadExecutor::commit_txn(ctx);
+    }
+
+    // forward to fe to excute commit transaction for MoW table
+    if (ctx->is_mow_table()) {
+        Status st;
+        int retry_times = 0;
+        while (retry_times < config::mow_stream_load_commit_retry_times) {
+            st = StreamLoadExecutor::commit_txn(ctx);
+            // DELETE_BITMAP_LOCK_ERROR will be retried
+            if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
+                break;
+            }
+            LOG_WARNING("Failed to commit txn")
+                    .tag("txn_id", ctx->txn_id)
+                    .tag("retry_times", retry_times)
+                    .error(st);
+            retry_times++;
         }
-        LOG_WARNING("Failed to commit txn")
-                .tag("txn_id", ctx->txn_id)
-                .tag("retry_times", retry_times)
-                .error(st);
-        retry_times++;
-    } while (retry_times < config::mow_stream_load_commit_retry_times);
+        return st;
+    }
+
+    auto st = 
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, false);
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to commit txn: " << st << ", " << ctx->brief();
+        return st;
+    }
+    ctx->need_rollback = false;
     return st;
 }
 
-} // namespace doris
\ No newline at end of file
+void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
+    
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
+                  "Failed to rollback txn");
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_stream_load_executor.h 
b/be/src/cloud/cloud_stream_load_executor.h
index 32cdba9db9b..b0cb91d06ac 100644
--- a/be/src/cloud/cloud_stream_load_executor.h
+++ b/be/src/cloud/cloud_stream_load_executor.h
@@ -19,12 +19,20 @@
 #include "runtime/stream_load/stream_load_executor.h"
 
 namespace doris {
+
 class CloudStreamLoadExecutor final : public StreamLoadExecutor {
 public:
     CloudStreamLoadExecutor(ExecEnv* exec_env);
 
     ~CloudStreamLoadExecutor() override;
 
+    Status pre_commit_txn(StreamLoadContext* ctx) override;
+
+    Status operate_txn_2pc(StreamLoadContext* ctx) override;
+
     Status commit_txn(StreamLoadContext* ctx) override;
+
+    void rollback_txn(StreamLoadContext* ctx) override;
 };
-} // namespace doris
\ No newline at end of file
+
+} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp 
b/be/src/runtime/stream_load/stream_load_context.cpp
index 0cd1f0e3d59..cec015fe92c 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -350,4 +350,11 @@ std::string StreamLoadContext::brief(bool detail) const {
     return ss.str();
 }
 
+bool StreamLoadContext::is_mow_table() const {
+    return (put_result.__isset.params && 
put_result.params.__isset.is_mow_table &&
+            put_result.params.is_mow_table) ||
+           (put_result.__isset.pipeline_params && 
put_result.pipeline_params.__isset.is_mow_table &&
+            put_result.pipeline_params.is_mow_table);
+}
+
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 1461e863d5d..db210b350e7 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -118,6 +118,8 @@ public:
     // also print the load source info if detail is set to true
     std::string brief(bool detail = false) const;
 
+    bool is_mow_table() const;
+
 public:
     static const int default_txn_id = -1;
     // load type, eg: ROUTINE LOAD/MANUAL LOAD
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/runtime/stream_load/stream_load_executor.h
index 0d8056d9141..1364bbbf31b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -39,19 +39,19 @@ public:
 
     Status begin_txn(StreamLoadContext* ctx);
 
-    Status pre_commit_txn(StreamLoadContext* ctx);
+    virtual Status pre_commit_txn(StreamLoadContext* ctx);
 
-    Status operate_txn_2pc(StreamLoadContext* ctx);
+    virtual Status operate_txn_2pc(StreamLoadContext* ctx);
 
     virtual Status commit_txn(StreamLoadContext* ctx);
 
     void get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& 
request);
 
-    void rollback_txn(StreamLoadContext* ctx);
+    virtual void rollback_txn(StreamLoadContext* ctx);
 
     Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
 
-private:
+protected:
     // collect the load statistics from context and set them to stat
     // return true if stat is set, otherwise, return false
     bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* 
attachment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 297ac9f1746..f0ab74ccd6a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -337,7 +337,16 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
txnCommitAttachment, false);
     }
 
+    /**
+     * Post process of commitTxn
+     * 1. update some stats
+     * 2. produce event for further processes like async MV
+     * @param commitTxnResponse commit txn call response from meta-service
+     */
     public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+        // ========================================
+        // update some table stats
+        // ========================================
         long dbId = commitTxnResponse.getTxnInfo().getDbId();
         long txnId = commitTxnResponse.getTxnInfo().getTxnId();
         // 1. update rowCountfor AnalysisManager
@@ -389,6 +398,25 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         if (sb.length() > 0) {
             LOG.info("notify partition first load. {}", sb);
         }
+
+        // ========================================
+        // produce event
+        // ========================================
+        List<Long> tableList = commitTxnResponse.getTxnInfo().getTableIdsList()
+                .stream().distinct().collect(Collectors.toList());
+        // Here, we only wait for the EventProcessor to finish processing the 
event,
+        // but regardless of the success or failure of the result,
+        // it does not affect the logic of transaction
+        try {
+            for (Long tableId : tableList) {
+                Env.getCurrentEnv().getEventProcessor().processEvent(
+                    new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, 
dbId, tableId));
+            }
+        } catch (Throwable t) {
+            // According to normal logic, no exceptions will be thrown,
+            // but in order to avoid bugs affecting the original logic, all 
exceptions are caught
+            LOG.warn("produceEvent failed, db {}, tables {} ", dbId, 
tableList, t);
+        }
     }
 
     private Set<Long> getBaseTabletsFromTables(List<Table> tableList, 
List<TabletCommitInfo> tabletCommitInfos)
@@ -528,23 +556,6 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() 
- txnState.getPrepareTime());
         }
         afterCommitTxnResp(commitTxnResponse);
-        // Here, we only wait for the EventProcessor to finish processing the 
event,
-        // but regardless of the success or failure of the result,
-        // it does not affect the logic of transaction
-        try {
-            produceEvent(dbId, tableList);
-        } catch (Throwable t) {
-            // According to normal logic, no exceptions will be thrown,
-            // but in order to avoid bugs affecting the original logic, all 
exceptions are caught
-            LOG.warn("produceEvent failed: ", t);
-        }
-    }
-
-    private void produceEvent(long dbId, List<Table> tableList) {
-        for (Table table : tableList) {
-            Env.getCurrentEnv().getEventProcessor().processEvent(
-                    new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, 
dbId, table.getId()));
-        }
     }
 
     private List<OlapTable> getMowTableList(List<Table> tableList) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to