This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d0da94e22b2 [refactor](streamload) refactor stream load executor (#25615) d0da94e22b2 is described below commit d0da94e22b20afdaaf9fba1316dce70a6060d95d Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Mon Oct 23 14:34:26 2023 +0800 [refactor](streamload) refactor stream load executor (#25615) --- .../runtime/stream_load/stream_load_executor.cpp | 226 +++++++-------------- 1 file changed, 74 insertions(+), 152 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 32e4d76dc7c..3dd39e56f65 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -71,161 +71,83 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); Status st; + auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) { + if (ctx->group_commit) { + ctx->label = state->import_label(); + ctx->txn_id = state->wal_id(); + } + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); + ctx->commit_infos = std::move(state->tablet_commit_infos()); + if (status->ok()) { + ctx->number_total_rows = state->num_rows_load_total(); + ctx->number_loaded_rows = state->num_rows_load_success(); + ctx->number_filtered_rows = state->num_rows_load_filtered(); + ctx->number_unselected_rows = state->num_rows_load_unselected(); + + int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; + if (!ctx->group_commit && num_selected_rows > 0 && + (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { + // NOTE: Do not modify the error message here, for historical reasons, + // some users may rely on this error message. + *status = Status::InternalError("too many filtered rows"); + } + if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + } + + if (status->ok()) { + DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment( + ctx->number_loaded_rows); + } + } else { + LOG(WARNING) << "fragment execute failed" + << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) + << ", err_msg=" << status->to_string() << ", " << ctx->brief(); + // cancel body_sink, make sender known it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(status->to_string()); + } + + switch (ctx->load_src_type) { + // reset the stream load ctx's kafka commit offset + case TLoadSourceType::KAFKA: + ctx->kafka_info->reset_offset(); + break; + default: + break; + } + } + ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; + ctx->promise.set_value(*status); + + if (!status->ok() && ctx->body_sink != nullptr) { + // In some cases, the load execution is exited early. + // For example, when max_filter_ratio is 0 and illegal data is encountered + // during stream loading, the entire load process is terminated early. + // However, the http connection may still be sending data to stream_load_pipe + // and waiting for it to be consumed. + // Therefore, we need to actively cancel to end the pipe. + ctx->body_sink->cancel(status->to_string()); + } + + if (ctx->need_commit_self && ctx->body_sink != nullptr) { + if (ctx->body_sink->cancelled() || !status->ok()) { + ctx->status = *status; + this->rollback_txn(ctx.get()); + } else { + static_cast<void>(this->commit_txn(ctx.get())); + } + } + }; + if (ctx->put_result.__isset.params) { - st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { - if (ctx->group_commit) { - ctx->label = state->import_label(); - ctx->txn_id = state->wal_id(); - } - ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); - if (status->ok()) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - - int64_t num_selected_rows = - ctx->number_total_rows - ctx->number_unselected_rows; - if (!ctx->group_commit && num_selected_rows > 0 && - (double)ctx->number_filtered_rows / num_selected_rows > - ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - *status = Status::InternalError("too many filtered rows"); - } - if (ctx->number_filtered_rows > 0 && - !state->get_error_log_file_path().empty()) { - ctx->error_url = - to_load_error_http_path(state->get_error_log_file_path()); - } - - if (status->ok()) { - DorisMetrics::instance()->stream_receive_bytes_total->increment( - ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total->increment( - ctx->number_loaded_rows); - } - } else { - LOG(WARNING) - << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status->to_string() << ", " << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(status->to_string()); - } - - switch (ctx->load_src_type) { - // reset the stream load ctx's kafka commit offset - case TLoadSourceType::KAFKA: - ctx->kafka_info->reset_offset(); - break; - default: - break; - } - } - ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(*status); - - if (!status->ok() && ctx->body_sink != nullptr) { - // In some cases, the load execution is exited early. - // For example, when max_filter_ratio is 0 and illegal data is encountered - // during stream loading, the entire load process is terminated early. - // However, the http connection may still be sending data to stream_load_pipe - // and waiting for it to be consumed. - // Therefore, we need to actively cancel to end the pipe. - ctx->body_sink->cancel(status->to_string()); - } - - if (ctx->need_commit_self && ctx->body_sink != nullptr) { - if (ctx->body_sink->cancelled() || !status->ok()) { - ctx->status = *status; - this->rollback_txn(ctx.get()); - } else { - static_cast<void>(this->commit_txn(ctx.get())); - } - } - }); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, exec_fragment); } else { - st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) { - if (ctx->group_commit) { - ctx->label = state->import_label(); - ctx->txn_id = state->wal_id(); - } - ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); - if (status->ok()) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - - int64_t num_selected_rows = - ctx->number_total_rows - ctx->number_unselected_rows; - if (!ctx->group_commit && num_selected_rows > 0 && - (double)ctx->number_filtered_rows / num_selected_rows > - ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - *status = Status::InternalError("too many filtered rows"); - } - if (ctx->number_filtered_rows > 0 && - !state->get_error_log_file_path().empty()) { - ctx->error_url = - to_load_error_http_path(state->get_error_log_file_path()); - } - - if (status->ok()) { - DorisMetrics::instance()->stream_receive_bytes_total->increment( - ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total->increment( - ctx->number_loaded_rows); - } - } else { - LOG(WARNING) - << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status->to_string() << ", " << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(status->to_string()); - } - - switch (ctx->load_src_type) { - // reset the stream load ctx's kafka commit offset - case TLoadSourceType::KAFKA: - ctx->kafka_info->reset_offset(); - break; - default: - break; - } - } - ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(*status); - - if (!status->ok() && ctx->body_sink != nullptr) { - // In some cases, the load execution is exited early. - // For example, when max_filter_ratio is 0 and illegal data is encountered - // during stream loading, the entire load process is terminated early. - // However, the http connection may still be sending data to stream_load_pipe - // and waiting for it to be consumed. - // Therefore, we need to actively cancel to end the pipe. - ctx->body_sink->cancel(status->to_string()); - } - - if (ctx->need_commit_self && ctx->body_sink != nullptr) { - if (ctx->body_sink->cancelled() || !status->ok()) { - ctx->status = *status; - this->rollback_txn(ctx.get()); - } else { - static_cast<void>(this->commit_txn(ctx.get())); - } - } - }); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, + exec_fragment); } + if (!st.ok()) { // no need to check unref's return value return st; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org