This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 18a9b7bc790 [Opt](streamload) Commit txn on BE coordinator in cloud mode to avoid RPC and lock overhead (#36237) 18a9b7bc790 is described below commit 18a9b7bc790602cffa9d05e5df10451f0ce501dc Author: Gavin Chou <gavineaglec...@gmail.com> AuthorDate: Sat Jun 15 11:04:57 2024 +0800 [Opt](streamload) Commit txn on BE coordinator in cloud mode to avoid RPC and lock overhead (#36237) In cloud mode, FE master is not mandatory for committing load txn, and there is overhead (locks and PRC) if we commit load txn via FE master. --- be/src/cloud/cloud_stream_load_executor.cpp | 71 +++++++++++++++++----- be/src/cloud/cloud_stream_load_executor.h | 10 ++- be/src/runtime/stream_load/stream_load_context.cpp | 7 +++ be/src/runtime/stream_load/stream_load_context.h | 2 + be/src/runtime/stream_load/stream_load_executor.h | 8 +-- .../transaction/CloudGlobalTransactionMgr.java | 45 ++++++++------ 6 files changed, 106 insertions(+), 37 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index 7381e4de069..b7d428e59a4 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -17,6 +17,8 @@ #include "cloud/cloud_stream_load_executor.h" +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/logging.h" #include "common/status.h" @@ -29,23 +31,62 @@ CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env) CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default; +Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { + auto st = _exec_env->storage_engine().to_cloud().meta_mgr().precommit_txn(*ctx); + if (!st.ok()) { + LOG(WARNING) << "Failed to precommit txn: " << st << ", " << ctx->brief(); + return st; + } + ctx->need_rollback = false; + return st; +} + +Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { + VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation; + if (ctx->txn_operation.compare("commit") == 0) { + return _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); + } else { + // 2pc abort + return _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx); + } +} + Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - // forward to fe to execute commit transaction for MoW table - Status st; - int retry_times = 0; - // mow table will retry when DELETE_BITMAP_LOCK_ERROR occurs - do { - st = StreamLoadExecutor::commit_txn(ctx); - if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) { - break; + if (ctx->load_type == TLoadType::ROUTINE_LOAD) { + return StreamLoadExecutor::commit_txn(ctx); + } + + // forward to fe to excute commit transaction for MoW table + if (ctx->is_mow_table()) { + Status st; + int retry_times = 0; + while (retry_times < config::mow_stream_load_commit_retry_times) { + st = StreamLoadExecutor::commit_txn(ctx); + // DELETE_BITMAP_LOCK_ERROR will be retried + if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) { + break; + } + LOG_WARNING("Failed to commit txn") + .tag("txn_id", ctx->txn_id) + .tag("retry_times", retry_times) + .error(st); + retry_times++; } - LOG_WARNING("Failed to commit txn") - .tag("txn_id", ctx->txn_id) - .tag("retry_times", retry_times) - .error(st); - retry_times++; - } while (retry_times < config::mow_stream_load_commit_retry_times); + return st; + } + + auto st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, false); + if (!st.ok()) { + LOG(WARNING) << "Failed to commit txn: " << st << ", " << ctx->brief(); + return st; + } + ctx->need_rollback = false; return st; } -} // namespace doris \ No newline at end of file +void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { + WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx), + "Failed to rollback txn"); +} + +} // namespace doris diff --git a/be/src/cloud/cloud_stream_load_executor.h b/be/src/cloud/cloud_stream_load_executor.h index 32cdba9db9b..b0cb91d06ac 100644 --- a/be/src/cloud/cloud_stream_load_executor.h +++ b/be/src/cloud/cloud_stream_load_executor.h @@ -19,12 +19,20 @@ #include "runtime/stream_load/stream_load_executor.h" namespace doris { + class CloudStreamLoadExecutor final : public StreamLoadExecutor { public: CloudStreamLoadExecutor(ExecEnv* exec_env); ~CloudStreamLoadExecutor() override; + Status pre_commit_txn(StreamLoadContext* ctx) override; + + Status operate_txn_2pc(StreamLoadContext* ctx) override; + Status commit_txn(StreamLoadContext* ctx) override; + + void rollback_txn(StreamLoadContext* ctx) override; }; -} // namespace doris \ No newline at end of file + +} // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 0cd1f0e3d59..cec015fe92c 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -350,4 +350,11 @@ std::string StreamLoadContext::brief(bool detail) const { return ss.str(); } +bool StreamLoadContext::is_mow_table() const { + return (put_result.__isset.params && put_result.params.__isset.is_mow_table && + put_result.params.is_mow_table) || + (put_result.__isset.pipeline_params && put_result.pipeline_params.__isset.is_mow_table && + put_result.pipeline_params.is_mow_table); +} + } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 1461e863d5d..db210b350e7 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -118,6 +118,8 @@ public: // also print the load source info if detail is set to true std::string brief(bool detail = false) const; + bool is_mow_table() const; + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 0d8056d9141..1364bbbf31b 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -39,19 +39,19 @@ public: Status begin_txn(StreamLoadContext* ctx); - Status pre_commit_txn(StreamLoadContext* ctx); + virtual Status pre_commit_txn(StreamLoadContext* ctx); - Status operate_txn_2pc(StreamLoadContext* ctx); + virtual Status operate_txn_2pc(StreamLoadContext* ctx); virtual Status commit_txn(StreamLoadContext* ctx); void get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& request); - void rollback_txn(StreamLoadContext* ctx); + virtual void rollback_txn(StreamLoadContext* ctx); Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx); -private: +protected: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 297ac9f1746..f0ab74ccd6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -337,7 +337,16 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); } + /** + * Post process of commitTxn + * 1. update some stats + * 2. produce event for further processes like async MV + * @param commitTxnResponse commit txn call response from meta-service + */ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) { + // ======================================== + // update some table stats + // ======================================== long dbId = commitTxnResponse.getTxnInfo().getDbId(); long txnId = commitTxnResponse.getTxnInfo().getTxnId(); // 1. update rowCountfor AnalysisManager @@ -389,6 +398,25 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { if (sb.length() > 0) { LOG.info("notify partition first load. {}", sb); } + + // ======================================== + // produce event + // ======================================== + List<Long> tableList = commitTxnResponse.getTxnInfo().getTableIdsList() + .stream().distinct().collect(Collectors.toList()); + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of transaction + try { + for (Long tableId : tableList) { + Env.getCurrentEnv().getEventProcessor().processEvent( + new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, dbId, tableId)); + } + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed, db {}, tables {} ", dbId, tableList, t); + } } private Set<Long> getBaseTabletsFromTables(List<Table> tableList, List<TabletCommitInfo> tabletCommitInfos) @@ -528,23 +556,6 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime()); } afterCommitTxnResp(commitTxnResponse); - // Here, we only wait for the EventProcessor to finish processing the event, - // but regardless of the success or failure of the result, - // it does not affect the logic of transaction - try { - produceEvent(dbId, tableList); - } catch (Throwable t) { - // According to normal logic, no exceptions will be thrown, - // but in order to avoid bugs affecting the original logic, all exceptions are caught - LOG.warn("produceEvent failed: ", t); - } - } - - private void produceEvent(long dbId, List<Table> tableList) { - for (Table table : tableList) { - Env.getCurrentEnv().getEventProcessor().processEvent( - new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, dbId, table.getId())); - } } private List<OlapTable> getMowTableList(List<Table> tableList) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org