This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 747faeed173b39fd1d0527eeab0fc029f109dc7b
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Tue Feb 27 17:31:40 2024 +0800

    [Enhancement](group commit) optimize some group commit code (#31392)
    
    This PR optimizes some of the logic related to group commit:
    1. Improved the error handling when there is insufficient WAL space during 
import.
    2. Accounted for cases where the content length is negative during import.
    3. Added missing error log printing in `group_commit_mgr.cpp`.
---
 be/src/http/action/http_stream.cpp  | 17 +++++++++++++----
 be/src/http/action/stream_load.cpp  | 17 +++++++++++++----
 be/src/http/utils.cpp               |  2 +-
 be/src/http/utils.h                 |  2 +-
 be/src/runtime/group_commit_mgr.cpp |  3 +++
 be/test/http/stream_load_test.cpp   |  2 --
 6 files changed, 31 insertions(+), 12 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 6304c44462f..bb5ce729df8 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -378,9 +378,16 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* 
req,
     if (config::wait_internal_group_commit_finish) {
         group_commit_mode = "sync_mode";
     }
-    size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
-                                    ? 0
-                                    : 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
+    int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
+                                     ? 0
+                                     : 
std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
+    if (content_length < 0) {
+        std::stringstream ss;
+        ss << "This http load content length <0 (" << content_length
+           << "), please check your content length.";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
     if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || 
content_length == 0) {
         // off_mode and empty
         ctx->group_commit = false;
@@ -399,7 +406,9 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* 
req,
         if (iequal(group_commit_mode, "async_mode")) {
             if (!load_size_smaller_than_wal_limit(content_length)) {
                 std::stringstream ss;
-                ss << "There is no space for group commit http load async WAL. 
WAL dir info: "
+                ss << "There is no space for group commit http load async WAL. 
This http load "
+                      "size is "
+                   << content_length << ". WAL dir info: "
                    << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
                 LOG(WARNING) << ss.str();
                 return Status::Error<EXCEEDED_LIMIT>(ss.str());
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 9e9db1bd37e..a447b8d4f93 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -710,9 +710,16 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* 
req,
     if (config::wait_internal_group_commit_finish) {
         group_commit_mode = "sync_mode";
     }
-    size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
-                                    ? 0
-                                    : 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
+    int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
+                                     ? 0
+                                     : 
std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
+    if (content_length < 0) {
+        std::stringstream ss;
+        ss << "This stream load content length <0 (" << content_length
+           << "), please check your content length.";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
     if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || 
content_length == 0) {
         // off_mode and empty
         ctx->group_commit = false;
@@ -731,7 +738,9 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* 
req,
         if (iequal(group_commit_mode, "async_mode")) {
             if (!load_size_smaller_than_wal_limit(content_length)) {
                 std::stringstream ss;
-                ss << "There is no space for group commit stream load async 
WAL. WAL dir info: "
+                ss << "There is no space for group commit stream load async 
WAL. This stream load "
+                      "size is "
+                   << content_length << ". WAL dir info: "
                    << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
                 LOG(WARNING) << ss.str();
                 return Status::Error<EXCEEDED_LIMIT>(ss.str());
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 1e477530ebf..49f9d2c4993 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -193,7 +193,7 @@ void do_dir_response(const std::string& dir_path, 
HttpRequest* req) {
     HttpChannel::send_reply(req, result_str);
 }
 
-bool load_size_smaller_than_wal_limit(size_t content_length) {
+bool load_size_smaller_than_wal_limit(int64_t content_length) {
     // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload 
content length. If it is empty or equels to 0, it means this streamload
     // is a chunked streamload and we are not sure its size.
     // 2. if streamload content length is too large, like larger than 80% of 
the WAL constrain.
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index e20e68c5b88..254d59cf13d 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -43,5 +43,5 @@ void do_dir_response(const std::string& dir_path, 
HttpRequest* req);
 
 std::string get_content_type(const std::string& file_name);
 
-bool load_size_smaller_than_wal_limit(size_t content_length);
+bool load_size_smaller_than_wal_limit(int64_t content_length);
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 42c0e83f4b6..032c5bc525f 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -276,6 +276,9 @@ Status GroupCommitTable::_create_group_commit_load(
                 client->streamLoadPut(result, request);
             },
             10000L);
+    if (!st.ok()) {
+        LOG(WARNING) << "create group commit load rpc error, st=" << 
st.to_string();
+    }
     RETURN_IF_ERROR(st);
     st = Status::create<false>(result.status);
     if (!st.ok()) {
diff --git a/be/test/http/stream_load_test.cpp 
b/be/test/http/stream_load_test.cpp
index 90b6cbe6380..d797c081f41 100644
--- a/be/test/http/stream_load_test.cpp
+++ b/be/test/http/stream_load_test.cpp
@@ -64,7 +64,6 @@ TEST_F(StreamLoadTest, TestHeader) {
         auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
         HttpRequest req(evhttp_req);
         EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true);
-        EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false);
         evhttp_request_free(evhttp_req);
     }
 
@@ -80,7 +79,6 @@ TEST_F(StreamLoadTest, TestHeader) {
         HttpRequest req(evhttp_req);
         req.init_from_evhttp();
         EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true);
-        EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false);
         evhttp_uri_free(evhttp_req->uri_elems);
         evhttp_req->uri = nullptr;
         evhttp_req->uri_elems = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to