This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new b9b9a11 [Bug] Fix invalid rollback for stream load txn (#3054) b9b9a11 is described below commit b9b9a11eae97249163c9d193745ee9bff2a889b6 Author: WingC <1018957...@qq.com> AuthorDate: Mon Mar 9 09:07:36 2020 -0500 [Bug] Fix invalid rollback for stream load txn (#3054) --- .../runtime/stream_load/stream_load_executor.cpp | 215 +++++++++++---------- 1 file changed, 111 insertions(+), 104 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 6fc4912..14e17be 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -25,8 +25,8 @@ #include "runtime/plan_fragment_executor.h" #include "runtime/runtime_state.h" #include "runtime/stream_load/stream_load_context.h" -#include "util/thrift_rpc_helper.h" #include "util/doris_metrics.h" +#include "util/thrift_rpc_helper.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" @@ -44,65 +44,67 @@ Status k_stream_load_plan_status; Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { DorisMetrics::txn_exec_plan_total.increment(1); - // submit this params +// submit this params #ifndef BE_TEST ctx->ref(); - LOG(INFO) << "begin to execute job. label=" << ctx->label - << ", txn_id=" << ctx->txn_id + LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, - [ctx] (PlanFragmentExecutor* executor) { - ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); - Status status = executor->status(); - if (status.ok()) { - ctx->number_total_rows = executor->runtime_state()->num_rows_load_total(); - ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success(); - ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered(); - ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected(); + ctx->put_result.params, [ctx](PlanFragmentExecutor* executor) { + ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); + Status status = executor->status(); + if (status.ok()) { + ctx->number_total_rows = executor->runtime_state()->num_rows_load_total(); + ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success(); + ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered(); + ctx->number_unselected_rows = + executor->runtime_state()->num_rows_load_unselected(); - int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; - if ((double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - status = Status::InternalError("too many filtered rows"); - } else if(ctx->number_loaded_rows == 0){ - status = Status::InternalError("all partitions have no load data"); - } - if (ctx->number_filtered_rows > 0 && - !executor->runtime_state()->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path( - executor->runtime_state()->get_error_log_file_path()); - } + int64_t num_selected_rows = + ctx->number_total_rows - ctx->number_unselected_rows; + if ((double)ctx->number_filtered_rows / num_selected_rows > + ctx->max_filter_ratio) { + // NOTE: Do not modify the error message here, for historical + // reasons, + // some users may rely on this error message. + status = Status::InternalError("too many filtered rows"); + } else if (ctx->number_loaded_rows == 0) { + status = Status::InternalError("all partitions have no load data"); + } + if (ctx->number_filtered_rows > 0 && + !executor->runtime_state()->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path( + executor->runtime_state()->get_error_log_file_path()); + } - if (status.ok()) { - DorisMetrics::stream_receive_bytes_total.increment(ctx->receive_bytes); - DorisMetrics::stream_load_rows_total.increment(ctx->number_loaded_rows); - } - } else { - LOG(WARNING) << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status.get_error_msg() - << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(); - } + if (status.ok()) { + DorisMetrics::stream_receive_bytes_total.increment(ctx->receive_bytes); + DorisMetrics::stream_load_rows_total.increment(ctx->number_loaded_rows); + } + } else { + LOG(WARNING) << "fragment execute failed" + << ", query_id=" + << UniqueId(ctx->put_result.params.params.query_id) + << ", err_msg=" << status.get_error_msg() << ctx->brief(); + // cancel body_sink, make sender known it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(); + } - switch(ctx->load_src_type) { + switch (ctx->load_src_type) { // reset the stream load ctx's kafka commit offset case TLoadSourceType::KAFKA: ctx->kafka_info->reset_offset(); break; default: break; + } } - } - ctx->promise.set_value(status); - if (ctx->unref()) { - delete ctx; - } - }); + ctx->promise.set_value(status); + if (ctx->unref()) { + delete ctx; + } + }); if (!st.ok()) { // no need to check unref's return value ctx->unref(); @@ -134,7 +136,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { #ifndef BE_TEST RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, - [&request, &result] (FrontendServiceConnection& client) { + [&request, &result](FrontendServiceConnection& client) { client->loadTxnBegin(result, request); })); #else @@ -143,7 +145,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { Status status(result.status); if (!status.ok()) { LOG(WARNING) << "begin transaction failed, errmsg=" << status.get_error_msg() - << ctx->brief(); + << ctx->brief(); if (result.__isset.job_status) { ctx->existing_job_status = result.job_status; } @@ -178,19 +180,24 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitResult result; #ifndef BE_TEST RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( - master_addr.hostname, master_addr.port, - [&request, &result] (FrontendServiceConnection& client) { - client->loadTxnCommit(result, request); - }, config::txn_commit_rpc_timeout_ms)); + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxnCommit(result, request); + }, + config::txn_commit_rpc_timeout_ms)); #else result = k_stream_load_commit_result; #endif - // Return if this transaction is committed successful; otherwise, we need try to + // Return if this transaction is committed successful; otherwise, we need try + // to // rollback this transaction Status status(result.status); if (!status.ok()) { LOG(WARNING) << "commit transaction failed, errmsg=" << status.get_error_msg() - << ctx->brief(); + << ctx->brief(); + if (status.code() == TStatusCode::PUBLISH_TIMEOUT) { + ctx->need_rollback = false; + } return status; } // commit success, set need_rollback to false @@ -219,13 +226,13 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { TLoadTxnRollbackResult result; #ifndef BE_TEST auto rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>( - master_addr.hostname, master_addr.port, - [&request, &result] (FrontendServiceConnection& client) { - client->loadTxnRollback(result, request); - }); + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxnRollback(result, request); + }); if (!rpc_st.ok()) { LOG(WARNING) << "transaction rollback failed. errmsg=" << rpc_st.get_error_msg() - << ctx->brief(); + << ctx->brief(); } #else result = k_stream_load_rollback_result; @@ -237,60 +244,60 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt // currently, only routine load and mini load need to be set attachment return false; } - switch(ctx->load_type) { - case TLoadType::MINI_LOAD: { - attach->loadType = TLoadType::MINI_LOAD; - - TMiniLoadTxnCommitAttachment ml_attach; - ml_attach.loadedRows = ctx->number_loaded_rows; - ml_attach.filteredRows = ctx->number_filtered_rows; - if (!ctx->error_url.empty()) { - ml_attach.__set_errorLogUrl(ctx->error_url); - } + switch (ctx->load_type) { + case TLoadType::MINI_LOAD: { + attach->loadType = TLoadType::MINI_LOAD; - attach->mlTxnCommitAttachment = std::move(ml_attach); - attach->__isset.mlTxnCommitAttachment = true; - break; + TMiniLoadTxnCommitAttachment ml_attach; + ml_attach.loadedRows = ctx->number_loaded_rows; + ml_attach.filteredRows = ctx->number_filtered_rows; + if (!ctx->error_url.empty()) { + ml_attach.__set_errorLogUrl(ctx->error_url); } - case TLoadType::ROUTINE_LOAD: { - attach->loadType = TLoadType::ROUTINE_LOAD; - TRLTaskTxnCommitAttachment rl_attach; - rl_attach.jobId = ctx->job_id; - rl_attach.id = ctx->id.to_thrift(); - rl_attach.__set_loadedRows(ctx->number_loaded_rows); - rl_attach.__set_filteredRows(ctx->number_filtered_rows); - rl_attach.__set_unselectedRows(ctx->number_unselected_rows); - rl_attach.__set_receivedBytes(ctx->receive_bytes); - rl_attach.__set_loadedBytes(ctx->loaded_bytes); - rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000); + attach->mlTxnCommitAttachment = std::move(ml_attach); + attach->__isset.mlTxnCommitAttachment = true; + break; + } + case TLoadType::ROUTINE_LOAD: { + attach->loadType = TLoadType::ROUTINE_LOAD; - attach->rlTaskTxnCommitAttachment = std::move(rl_attach); - attach->__isset.rlTaskTxnCommitAttachment = true; - break; - } - default: - // unknown load type, should not happend - return false; + TRLTaskTxnCommitAttachment rl_attach; + rl_attach.jobId = ctx->job_id; + rl_attach.id = ctx->id.to_thrift(); + rl_attach.__set_loadedRows(ctx->number_loaded_rows); + rl_attach.__set_filteredRows(ctx->number_filtered_rows); + rl_attach.__set_unselectedRows(ctx->number_unselected_rows); + rl_attach.__set_receivedBytes(ctx->receive_bytes); + rl_attach.__set_loadedBytes(ctx->loaded_bytes); + rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000); + + attach->rlTaskTxnCommitAttachment = std::move(rl_attach); + attach->__isset.rlTaskTxnCommitAttachment = true; + break; + } + default: + // unknown load type, should not happend + return false; } - switch(ctx->load_src_type) { - case TLoadSourceType::KAFKA: { - TRLTaskTxnCommitAttachment &rl_attach = attach->rlTaskTxnCommitAttachment; - rl_attach.loadSourceType = TLoadSourceType::KAFKA; + switch (ctx->load_src_type) { + case TLoadSourceType::KAFKA: { + TRLTaskTxnCommitAttachment& rl_attach = attach->rlTaskTxnCommitAttachment; + rl_attach.loadSourceType = TLoadSourceType::KAFKA; - TKafkaRLTaskProgress kafka_progress; - kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset; + TKafkaRLTaskProgress kafka_progress; + kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset; - rl_attach.kafkaRLTaskProgress = std::move(kafka_progress); - rl_attach.__isset.kafkaRLTaskProgress = true; - if (!ctx->error_url.empty()) { - rl_attach.__set_errorLogUrl(ctx->error_url); - } - return true; + rl_attach.kafkaRLTaskProgress = std::move(kafka_progress); + rl_attach.__isset.kafkaRLTaskProgress = true; + if (!ctx->error_url.empty()) { + rl_attach.__set_errorLogUrl(ctx->error_url); } - default: - return true; + return true; + } + default: + return true; } return false; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org