This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d0da94e22b2 [refactor](streamload) refactor stream load executor 
(#25615)
d0da94e22b2 is described below

commit d0da94e22b20afdaaf9fba1316dce70a6060d95d
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Mon Oct 23 14:34:26 2023 +0800

    [refactor](streamload) refactor stream load executor (#25615)
---
 .../runtime/stream_load/stream_load_executor.cpp   | 226 +++++++--------------
 1 file changed, 74 insertions(+), 152 deletions(-)

diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 32e4d76dc7c..3dd39e56f65 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -71,161 +71,83 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
     LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" 
<< ctx->txn_id
               << ", query_id=" << 
print_id(ctx->put_result.params.params.query_id);
     Status st;
+    auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
+        if (ctx->group_commit) {
+            ctx->label = state->import_label();
+            ctx->txn_id = state->wal_id();
+        }
+        ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+        ctx->commit_infos = std::move(state->tablet_commit_infos());
+        if (status->ok()) {
+            ctx->number_total_rows = state->num_rows_load_total();
+            ctx->number_loaded_rows = state->num_rows_load_success();
+            ctx->number_filtered_rows = state->num_rows_load_filtered();
+            ctx->number_unselected_rows = state->num_rows_load_unselected();
+
+            int64_t num_selected_rows = ctx->number_total_rows - 
ctx->number_unselected_rows;
+            if (!ctx->group_commit && num_selected_rows > 0 &&
+                (double)ctx->number_filtered_rows / num_selected_rows > 
ctx->max_filter_ratio) {
+                // NOTE: Do not modify the error message here, for historical 
reasons,
+                // some users may rely on this error message.
+                *status = Status::InternalError("too many filtered rows");
+            }
+            if (ctx->number_filtered_rows > 0 && 
!state->get_error_log_file_path().empty()) {
+                ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
+            }
+
+            if (status->ok()) {
+                
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
+                DorisMetrics::instance()->stream_load_rows_total->increment(
+                        ctx->number_loaded_rows);
+            }
+        } else {
+            LOG(WARNING) << "fragment execute failed"
+                         << ", query_id=" << 
UniqueId(ctx->put_result.params.params.query_id)
+                         << ", err_msg=" << status->to_string() << ", " << 
ctx->brief();
+            // cancel body_sink, make sender known it
+            if (ctx->body_sink != nullptr) {
+                ctx->body_sink->cancel(status->to_string());
+            }
+
+            switch (ctx->load_src_type) {
+            // reset the stream load ctx's kafka commit offset
+            case TLoadSourceType::KAFKA:
+                ctx->kafka_info->reset_offset();
+                break;
+            default:
+                break;
+            }
+        }
+        ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
+        ctx->promise.set_value(*status);
+
+        if (!status->ok() && ctx->body_sink != nullptr) {
+            // In some cases, the load execution is exited early.
+            // For example, when max_filter_ratio is 0 and illegal data is 
encountered
+            // during stream loading, the entire load process is terminated 
early.
+            // However, the http connection may still be sending data to 
stream_load_pipe
+            // and waiting for it to be consumed.
+            // Therefore, we need to actively cancel to end the pipe.
+            ctx->body_sink->cancel(status->to_string());
+        }
+
+        if (ctx->need_commit_self && ctx->body_sink != nullptr) {
+            if (ctx->body_sink->cancelled() || !status->ok()) {
+                ctx->status = *status;
+                this->rollback_txn(ctx.get());
+            } else {
+                static_cast<void>(this->commit_txn(ctx.get()));
+            }
+        }
+    };
+
     if (ctx->put_result.__isset.params) {
-        st = _exec_env->fragment_mgr()->exec_plan_fragment(
-                ctx->put_result.params, [ctx, this](RuntimeState* state, 
Status* status) {
-                    if (ctx->group_commit) {
-                        ctx->label = state->import_label();
-                        ctx->txn_id = state->wal_id();
-                    }
-                    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
-                    ctx->commit_infos = 
std::move(state->tablet_commit_infos());
-                    if (status->ok()) {
-                        ctx->number_total_rows = state->num_rows_load_total();
-                        ctx->number_loaded_rows = 
state->num_rows_load_success();
-                        ctx->number_filtered_rows = 
state->num_rows_load_filtered();
-                        ctx->number_unselected_rows = 
state->num_rows_load_unselected();
-
-                        int64_t num_selected_rows =
-                                ctx->number_total_rows - 
ctx->number_unselected_rows;
-                        if (!ctx->group_commit && num_selected_rows > 0 &&
-                            (double)ctx->number_filtered_rows / 
num_selected_rows >
-                                    ctx->max_filter_ratio) {
-                            // NOTE: Do not modify the error message here, for 
historical reasons,
-                            // some users may rely on this error message.
-                            *status = Status::InternalError("too many filtered 
rows");
-                        }
-                        if (ctx->number_filtered_rows > 0 &&
-                            !state->get_error_log_file_path().empty()) {
-                            ctx->error_url =
-                                    
to_load_error_http_path(state->get_error_log_file_path());
-                        }
-
-                        if (status->ok()) {
-                            
DorisMetrics::instance()->stream_receive_bytes_total->increment(
-                                    ctx->receive_bytes);
-                            
DorisMetrics::instance()->stream_load_rows_total->increment(
-                                    ctx->number_loaded_rows);
-                        }
-                    } else {
-                        LOG(WARNING)
-                                << "fragment execute failed"
-                                << ", query_id=" << 
UniqueId(ctx->put_result.params.params.query_id)
-                                << ", err_msg=" << status->to_string() << ", " 
<< ctx->brief();
-                        // cancel body_sink, make sender known it
-                        if (ctx->body_sink != nullptr) {
-                            ctx->body_sink->cancel(status->to_string());
-                        }
-
-                        switch (ctx->load_src_type) {
-                        // reset the stream load ctx's kafka commit offset
-                        case TLoadSourceType::KAFKA:
-                            ctx->kafka_info->reset_offset();
-                            break;
-                        default:
-                            break;
-                        }
-                    }
-                    ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
-                    ctx->promise.set_value(*status);
-
-                    if (!status->ok() && ctx->body_sink != nullptr) {
-                        // In some cases, the load execution is exited early.
-                        // For example, when max_filter_ratio is 0 and illegal 
data is encountered
-                        // during stream loading, the entire load process is 
terminated early.
-                        // However, the http connection may still be sending 
data to stream_load_pipe
-                        // and waiting for it to be consumed.
-                        // Therefore, we need to actively cancel to end the 
pipe.
-                        ctx->body_sink->cancel(status->to_string());
-                    }
-
-                    if (ctx->need_commit_self && ctx->body_sink != nullptr) {
-                        if (ctx->body_sink->cancelled() || !status->ok()) {
-                            ctx->status = *status;
-                            this->rollback_txn(ctx.get());
-                        } else {
-                            static_cast<void>(this->commit_txn(ctx.get()));
-                        }
-                    }
-                });
+        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, 
exec_fragment);
     } else {
-        st = _exec_env->fragment_mgr()->exec_plan_fragment(
-                ctx->put_result.pipeline_params, [ctx, this](RuntimeState* 
state, Status* status) {
-                    if (ctx->group_commit) {
-                        ctx->label = state->import_label();
-                        ctx->txn_id = state->wal_id();
-                    }
-                    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
-                    ctx->commit_infos = 
std::move(state->tablet_commit_infos());
-                    if (status->ok()) {
-                        ctx->number_total_rows = state->num_rows_load_total();
-                        ctx->number_loaded_rows = 
state->num_rows_load_success();
-                        ctx->number_filtered_rows = 
state->num_rows_load_filtered();
-                        ctx->number_unselected_rows = 
state->num_rows_load_unselected();
-
-                        int64_t num_selected_rows =
-                                ctx->number_total_rows - 
ctx->number_unselected_rows;
-                        if (!ctx->group_commit && num_selected_rows > 0 &&
-                            (double)ctx->number_filtered_rows / 
num_selected_rows >
-                                    ctx->max_filter_ratio) {
-                            // NOTE: Do not modify the error message here, for 
historical reasons,
-                            // some users may rely on this error message.
-                            *status = Status::InternalError("too many filtered 
rows");
-                        }
-                        if (ctx->number_filtered_rows > 0 &&
-                            !state->get_error_log_file_path().empty()) {
-                            ctx->error_url =
-                                    
to_load_error_http_path(state->get_error_log_file_path());
-                        }
-
-                        if (status->ok()) {
-                            
DorisMetrics::instance()->stream_receive_bytes_total->increment(
-                                    ctx->receive_bytes);
-                            
DorisMetrics::instance()->stream_load_rows_total->increment(
-                                    ctx->number_loaded_rows);
-                        }
-                    } else {
-                        LOG(WARNING)
-                                << "fragment execute failed"
-                                << ", query_id=" << 
UniqueId(ctx->put_result.params.params.query_id)
-                                << ", err_msg=" << status->to_string() << ", " 
<< ctx->brief();
-                        // cancel body_sink, make sender known it
-                        if (ctx->body_sink != nullptr) {
-                            ctx->body_sink->cancel(status->to_string());
-                        }
-
-                        switch (ctx->load_src_type) {
-                        // reset the stream load ctx's kafka commit offset
-                        case TLoadSourceType::KAFKA:
-                            ctx->kafka_info->reset_offset();
-                            break;
-                        default:
-                            break;
-                        }
-                    }
-                    ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
-                    ctx->promise.set_value(*status);
-
-                    if (!status->ok() && ctx->body_sink != nullptr) {
-                        // In some cases, the load execution is exited early.
-                        // For example, when max_filter_ratio is 0 and illegal 
data is encountered
-                        // during stream loading, the entire load process is 
terminated early.
-                        // However, the http connection may still be sending 
data to stream_load_pipe
-                        // and waiting for it to be consumed.
-                        // Therefore, we need to actively cancel to end the 
pipe.
-                        ctx->body_sink->cancel(status->to_string());
-                    }
-
-                    if (ctx->need_commit_self && ctx->body_sink != nullptr) {
-                        if (ctx->body_sink->cancelled() || !status->ok()) {
-                            ctx->status = *status;
-                            this->rollback_txn(ctx.get());
-                        } else {
-                            static_cast<void>(this->commit_txn(ctx.get()));
-                        }
-                    }
-                });
+        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params,
+                                                           exec_fragment);
     }
+
     if (!st.ok()) {
         // no need to check unref's return value
         return st;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to