This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch revert-36772-stream_load_hang
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d99f466aa936779526159c0f1f0c6852612a7a29
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Tue Jun 25 16:00:28 2024 +0800

    Revert "[Fix](stream-load) Fix stream load stuck under high concurrency 
(#36772)"
    
    This reverts commit ea39daa2d32ba9466a969cf7d39090491bfbc62c.
---
 be/src/http/action/stream_load.cpp                 | 93 +++++++++-------------
 be/src/http/action/stream_load.h                   |  4 +-
 .../runtime/stream_load/stream_load_executor.cpp   | 11 +--
 be/src/runtime/stream_load/stream_load_executor.h  |  5 --
 4 files changed, 41 insertions(+), 72 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 0f14270da02..eb8d364cf59 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -33,7 +33,6 @@
 #include <time.h>
 
 #include <algorithm>
-#include <functional>
 #include <future>
 #include <map>
 #include <sstream>
@@ -109,67 +108,20 @@ void StreamLoadAction::handle(HttpRequest* req) {
 
     // status already set to fail
     if (ctx->status.ok()) {
-        ctx->status = _handle(ctx, req);
+        ctx->status = _handle(ctx);
         if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
             LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
                          << ", errmsg=" << ctx->status;
-            _send_reply(ctx, req);
         }
     }
-}
-
-Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
-    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
-        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
-                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
-        return Status::InternalError("receive body don't equal with body 
bytes");
-    }
-
-    // if we use non-streaming, MessageBodyFileSink.finish will close the file
-    RETURN_IF_ERROR(ctx->body_sink->finish());
-    if (!ctx->use_streaming) {
-        // we need to close file first, then execute_plan_fragment here
-        ctx->body_sink.reset();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
-                ctx,
-                [req, this](std::shared_ptr<StreamLoadContext> ctx) { 
_on_finish(ctx, req); }));
-    }
-
-    return Status::OK();
-}
-
-void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
-    ctx->status = ctx->future.get();
-    if (ctx->status.ok()) {
-        if (ctx->group_commit) {
-            LOG(INFO) << "skip commit because this is group commit, pipe_id="
-                      << ctx->id.to_string();
-        } else if (ctx->two_phase_commit) {
-            int64_t pre_commit_start_time = MonotonicNanos();
-            ctx->status = 
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
-            ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
-        } else {
-            // If put file success we need commit this load
-            int64_t commit_and_publish_start_time = MonotonicNanos();
-            ctx->status = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
-            ctx->commit_and_publish_txn_cost_nanos =
-                    MonotonicNanos() - commit_and_publish_start_time;
-        }
-    }
-    _send_reply(ctx, req);
-}
-
-void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
     ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
 
     if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
-        LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
-                     << ", errmsg=" << ctx->status;
         if (ctx->need_rollback) {
             _exec_env->stream_load_executor()->rollback_txn(ctx.get());
             ctx->need_rollback = false;
         }
-        if (ctx->body_sink != nullptr) {
+        if (ctx->body_sink.get() != nullptr) {
             ctx->body_sink->cancel(ctx->status.to_string());
         }
     }
@@ -189,6 +141,42 @@ void 
StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpR
     streaming_load_duration_ms->increment(ctx->load_cost_millis);
 }
 
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::InternalError("receive body don't equal with body 
bytes");
+    }
+
+    // if we use non-streaming, MessageBodyFileSink.finish will close the file
+    RETURN_IF_ERROR(ctx->body_sink->finish());
+    if (!ctx->use_streaming) {
+        // we need to close file first, then execute_plan_fragment here
+        ctx->body_sink.reset();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx));
+    }
+
+    // wait stream load finish
+    RETURN_IF_ERROR(ctx->future.get());
+
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        return Status::OK();
+    }
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+    } else {
+        // If put file success we need commit this load
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    }
+    return Status::OK();
+}
+
 int StreamLoadAction::on_header(HttpRequest* req) {
     streaming_load_current_processing->increment(1);
 
@@ -693,10 +681,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         return Status::OK();
     }
 
-    return _exec_env->stream_load_executor()->execute_plan_fragment(
-            ctx, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
-                _on_finish(ctx, http_req);
-            });
+    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
 
 Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* 
file_path) {
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index f91334e7305..d1de89c9397 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -46,13 +46,11 @@ public:
 
 private:
     Status _on_header(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
-    Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
+    Status _handle(std::shared_ptr<StreamLoadContext> ctx);
     Status _data_saved_path(HttpRequest* req, std::string* file_path);
     Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
     Status _handle_group_commit(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
-    void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
-    void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 62be00f1a7b..d26beb66827 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -27,10 +27,8 @@
 #include <glog/logging.h>
 #include <stdint.h>
 
-#include <functional>
 #include <future>
 #include <map>
-#include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -68,19 +66,13 @@ bvar::LatencyRecorder 
g_stream_load_precommit_txn_latency("stream_load", "precom
 bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", 
"commit_txn");
 
 Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> 
ctx) {
-    return execute_plan_fragment(ctx, [](std::shared_ptr<StreamLoadContext> 
ctx) {});
-}
-
-Status StreamLoadExecutor::execute_plan_fragment(
-        std::shared_ptr<StreamLoadContext> ctx,
-        const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& cb) 
{
 // submit this params
 #ifndef BE_TEST
     ctx->start_write_data_nanos = MonotonicNanos();
     LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", 
txn_id=" << ctx->txn_id
               << ", query_id=" << ctx->id;
     Status st;
-    auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) {
+    auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
         if (ctx->group_commit) {
             ctx->label = state->import_label();
             ctx->txn_id = state->wal_id();
@@ -150,7 +142,6 @@ Status StreamLoadExecutor::execute_plan_fragment(
                   << (ctx->receive_and_read_data_cost_nanos - 
ctx->read_data_cost_nanos) / 1000000
                   << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 
1000000
                   << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 
1000000;
-        cb(ctx);
     };
 
     if (ctx->put_result.__isset.params) {
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/runtime/stream_load/stream_load_executor.h
index acdfe00a262..1364bbbf31b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include <functional>
 #include <memory>
 
 #include "common/factory_creator.h"
@@ -52,10 +51,6 @@ public:
 
     Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
 
-    Status execute_plan_fragment(
-            std::shared_ptr<StreamLoadContext> ctx,
-            const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& 
cb);
-
 protected:
     // collect the load statistics from context and set them to stat
     // return true if stat is set, otherwise, return false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to