This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 201602d3d1f [streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033) 201602d3d1f is described below commit 201602d3d1f50eedb8934d808034cb904c379cfb Author: Gavin Chou <gavineaglec...@gmail.com> AuthorDate: Sun Jun 30 20:37:05 2024 +0800 [streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033) Abort load txn with label only should be forwarded to FE master to handle due to lack of db id. --- be/src/cloud/cloud_meta_mgr.cpp | 6 +- be/src/cloud/cloud_stream_load_executor.cpp | 69 ++++++++++++++++++++-- be/src/common/config.cpp | 2 +- .../runtime/stream_load/stream_load_executor.cpp | 42 ++++++------- .../apache/doris/service/FrontendServiceImpl.java | 2 +- .../load_p0/stream_load/test_stream_load.groovy | 1 + 6 files changed, 89 insertions(+), 33 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index f0a377cba67..732f3023e91 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -839,8 +839,12 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { if (ctx.db_id > 0 && !ctx.label.empty()) { req.set_db_id(ctx.db_id); req.set_label(ctx.label); - } else { + } else if (ctx.txn_id > 0) { req.set_txn_id(ctx.txn_id); + } else { + LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id + << " txn_id=" << ctx.txn_id << " label=" << ctx.label; + return Status::InternalError<false>("failed to abort txn"); } return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn); } diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index b7d428e59a4..92fb73eacc1 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -26,6 +26,12 @@ namespace doris { +enum class TxnOpParamType : int { + ILLEGAL, + WITH_TXN_ID, + WITH_LABEL, +}; + CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env) : StreamLoadExecutor(exec_env) {} @@ -42,13 +48,48 @@ Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { } Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { - VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation; + std::stringstream ss; + ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label + << " txn_2pc_op=" << ctx->txn_operation; + std::string op_info = ss.str(); + VLOG_DEBUG << "operate_txn_2pc " << op_info; + TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID + : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL + : TxnOpParamType::ILLEGAL; + + Status st = Status::InternalError<false>("impossible branch reached, " + op_info); + if (ctx->txn_operation.compare("commit") == 0) { - return _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); + if (topt == TxnOpParamType::WITH_TXN_ID) { + VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info; + st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); + } else if (topt == TxnOpParamType::WITH_LABEL) { + VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info; + st = StreamLoadExecutor::operate_txn_2pc(ctx); + } else { + st = Status::InternalError<false>( + "failed to 2pc commit txn, with TxnOpParamType::illegal input, " + op_info); + } + } else if (ctx->txn_operation.compare("abort") == 0) { + if (topt == TxnOpParamType::WITH_TXN_ID) { + LOG(INFO) << "2pc abort stream load txn directly: " << op_info; + st = _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx); + WARN_IF_ERROR(st, "failed to rollback txn " + op_info); + } else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send to FE to abort + VLOG_DEBUG << "2pc abort stream load txn with FE support: " << op_info; + StreamLoadExecutor::rollback_txn(ctx); + st = Status::OK(); + } else { + st = Status::InternalError<false>("failed abort txn, with illegal input, " + op_info); + } } else { - // 2pc abort - return _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx); + std::string msg = + "failed to operate_txn_2pc, unrecognized operation: " + ctx->txn_operation; + LOG(WARNING) << msg << " " << op_info; + st = Status::InternalError<false>(msg + " " + op_info); } + WARN_IF_ERROR(st, "failed to operate_txn_2pc " + op_info) + return st; } Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { @@ -85,8 +126,24 @@ Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { } void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { - WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx), - "Failed to rollback txn"); + std::stringstream ss; + ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label; + std::string op_info = ss.str(); + LOG(INFO) << "rollback stream laod txn " << op_info; + TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID + : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL + : TxnOpParamType::ILLEGAL; + + if (topt == TxnOpParamType::WITH_TXN_ID) { + VLOG_DEBUG << "abort stream load txn directly: " << op_info; + WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx), + "failed to rollback txn " + op_info); + } else { // maybe a label send to FE to abort + // does not care about the return status + // ctx->db_id > 0 && !ctx->label.empty() + VLOG_DEBUG << "abort stream load txn with FE support: " << op_info; + StreamLoadExecutor::rollback_txn(ctx); + } } } // namespace doris diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9df75b97bd6..c2274fd169b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -139,7 +139,7 @@ DEFINE_mBool(enable_stacktrace_in_allocator_check_failed, "false"); DEFINE_mInt64(large_memory_check_bytes, "2147483648"); -DEFINE_mBool(enable_memory_orphan_check, "true"); +DEFINE_mBool(enable_memory_orphan_check, "false"); // The maximum time a thread waits for full GC. Currently only query will wait for full gc. DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000"); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index d26beb66827..2bd1c16199d 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -166,9 +166,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { TLoadTxnBeginRequest request; set_request_auth(&request, ctx->auth); - request.db = ctx->db; - request.tbl = ctx->table; - request.label = ctx->label; + request.__set_db(ctx->db); + request.__set_tbl(ctx->table); + request.__set_label(ctx->label); // set timestamp request.__set_timestamp(GetCurrentTimeMicros()); if (ctx->timeout_second != -1) { @@ -286,27 +286,23 @@ Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& request) { set_request_auth(&request, ctx->auth); - request.db = ctx->db; + request.__set_db(ctx->db); if (ctx->db_id > 0) { - request.db_id = ctx->db_id; - request.__isset.db_id = true; + request.__set_db_id(ctx->db_id); } - request.tbl = ctx->table; - request.txnId = ctx->txn_id; - request.sync = true; - request.commitInfos = ctx->commit_infos; - request.__isset.commitInfos = true; + request.__set_tbl(ctx->table); + request.__set_txnId(ctx->txn_id); + request.__set_sync(true); + request.__set_commitInfos(ctx->commit_infos); request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); - request.tbls = ctx->table_list; - request.__isset.tbls = true; + request.__set_tbls(ctx->table_list); VLOG_DEBUG << "commit txn request:" << apache::thrift::ThriftDebugString(request); // set attachment if has TTxnCommitAttachment attachment; if (collect_load_stat(ctx, &attachment)) { - request.txnCommitAttachment = attachment; - request.__isset.txnCommitAttachment = true; + request.__set_txnCommitAttachment(attachment); } } @@ -353,22 +349,20 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnRollbackRequest request; set_request_auth(&request, ctx->auth); - request.db = ctx->db; + request.__set_db(ctx->db); if (ctx->db_id > 0) { - request.db_id = ctx->db_id; - request.__isset.db_id = true; + request.__set_db_id(ctx->db_id); } - request.tbl = ctx->table; - request.txnId = ctx->txn_id; + request.__set_tbl(ctx->table); + request.__set_txnId(ctx->txn_id); request.__set_reason(ctx->status.to_string()); - request.tbls = ctx->table_list; - request.__isset.tbls = true; + request.__set_tbls(ctx->table_list); + request.__set_label(ctx->label); // set attachment if has TTxnCommitAttachment attachment; if (collect_load_stat(ctx, &attachment)) { - request.txnCommitAttachment = attachment; - request.__isset.txnCommitAttachment = true; + request.__set_txnCommitAttachment(attachment); } TLoadTxnRollbackResult result; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 75a1987eb23..b7e11be47be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1823,7 +1823,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new MetaNotFoundException("db " + request.getDb() + " does not exist"); } long dbId = db.getId(); - if (request.getTxnId() != 0) { // txnId is required in thrift + if (request.getTxnId() > 0) { // txnId is required in thrift TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(dbId, request.getTxnId()); if (transactionState == null) { diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 07decba0950..660ad50bd9d 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -1166,6 +1166,7 @@ suite("test_stream_load", "p0") { requestBuilder.setHeader("Expect", "100-Continue") requestBuilder.setHeader("label", "${label}") requestBuilder.setHeader("txn_operation", "${txn_operation}") + log.info("stream load request " + requestBuilder.toString()) String backendStreamLoadUri = null client.execute(requestBuilder.build()).withCloseable { resp -> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org