This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch revert-36772-stream_load_hang in repository https://gitbox.apache.org/repos/asf/doris.git
commit d99f466aa936779526159c0f1f0c6852612a7a29 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Tue Jun 25 16:00:28 2024 +0800 Revert "[Fix](stream-load) Fix stream load stuck under high concurrency (#36772)" This reverts commit ea39daa2d32ba9466a969cf7d39090491bfbc62c. --- be/src/http/action/stream_load.cpp | 93 +++++++++------------- be/src/http/action/stream_load.h | 4 +- .../runtime/stream_load/stream_load_executor.cpp | 11 +-- be/src/runtime/stream_load/stream_load_executor.h | 5 -- 4 files changed, 41 insertions(+), 72 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 0f14270da02..eb8d364cf59 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -33,7 +33,6 @@ #include <time.h> #include <algorithm> -#include <functional> #include <future> #include <map> #include <sstream> @@ -109,67 +108,20 @@ void StreamLoadAction::handle(HttpRequest* req) { // status already set to fail if (ctx->status.ok()) { - ctx->status = _handle(ctx, req); + ctx->status = _handle(ctx); if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << ctx->status; - _send_reply(ctx, req); } } -} - -Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { - if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { - LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes - << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::InternalError("receive body don't equal with body bytes"); - } - - // if we use non-streaming, MessageBodyFileSink.finish will close the file - RETURN_IF_ERROR(ctx->body_sink->finish()); - if (!ctx->use_streaming) { - // we need to close file first, then execute_plan_fragment here - ctx->body_sink.reset(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( - ctx, - [req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); })); - } - - return Status::OK(); -} - -void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { - ctx->status = ctx->future.get(); - if (ctx->status.ok()) { - if (ctx->group_commit) { - LOG(INFO) << "skip commit because this is group commit, pipe_id=" - << ctx->id.to_string(); - } else if (ctx->two_phase_commit) { - int64_t pre_commit_start_time = MonotonicNanos(); - ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get()); - ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; - } else { - // If put file success we need commit this load - int64_t commit_and_publish_start_time = MonotonicNanos(); - ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get()); - ctx->commit_and_publish_txn_cost_nanos = - MonotonicNanos() - commit_and_publish_start_time; - } - } - _send_reply(ctx, req); -} - -void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { - LOG(WARNING) << "handle streaming load failed, id=" << ctx->id - << ", errmsg=" << ctx->status; if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } - if (ctx->body_sink != nullptr) { + if (ctx->body_sink.get() != nullptr) { ctx->body_sink->cancel(ctx->status.to_string()); } } @@ -189,6 +141,42 @@ void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpR streaming_load_duration_ms->increment(ctx->load_cost_millis); } +Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::InternalError("receive body don't equal with body bytes"); + } + + // if we use non-streaming, MessageBodyFileSink.finish will close the file + RETURN_IF_ERROR(ctx->body_sink->finish()); + if (!ctx->use_streaming) { + // we need to close file first, then execute_plan_fragment here + ctx->body_sink.reset(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx)); + } + + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); + return Status::OK(); + } + + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } + return Status::OK(); +} + int StreamLoadAction::on_header(HttpRequest* req) { streaming_load_current_processing->increment(1); @@ -693,10 +681,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return Status::OK(); } - return _exec_env->stream_load_executor()->execute_plan_fragment( - ctx, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) { - _on_finish(ctx, http_req); - }); + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) { diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index f91334e7305..d1de89c9397 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -46,13 +46,11 @@ public: private: Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx); - Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req); + Status _handle(std::shared_ptr<StreamLoadContext> ctx); Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx); void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx); - void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req); - void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req); private: ExecEnv* _exec_env; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 62be00f1a7b..d26beb66827 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -27,10 +27,8 @@ #include <glog/logging.h> #include <stdint.h> -#include <functional> #include <future> #include <map> -#include <memory> #include <ostream> #include <string> #include <utility> @@ -68,19 +66,13 @@ bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precom bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx) { - return execute_plan_fragment(ctx, [](std::shared_ptr<StreamLoadContext> ctx) {}); -} - -Status StreamLoadExecutor::execute_plan_fragment( - std::shared_ptr<StreamLoadContext> ctx, - const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& cb) { // submit this params #ifndef BE_TEST ctx->start_write_data_nanos = MonotonicNanos(); LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id; Status st; - auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) { + auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) { if (ctx->group_commit) { ctx->label = state->import_label(); ctx->txn_id = state->wal_id(); @@ -150,7 +142,6 @@ Status StreamLoadExecutor::execute_plan_fragment( << (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000 << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000 << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000; - cb(ctx); }; if (ctx->put_result.__isset.params) { diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index acdfe00a262..1364bbbf31b 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -17,7 +17,6 @@ #pragma once -#include <functional> #include <memory> #include "common/factory_creator.h" @@ -52,10 +51,6 @@ public: Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx); - Status execute_plan_fragment( - std::shared_ptr<StreamLoadContext> ctx, - const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& cb); - protected: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org