This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0fdf8724d23 branch-3.0: [chore] Fix unhandled exceptions thrown by 
stoi on streamload #49714 (#50410)
0fdf8724d23 is described below

commit 0fdf8724d23001b37a926b3509aadb728aea67a5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Apr 27 10:29:22 2025 +0800

    branch-3.0: [chore] Fix unhandled exceptions thrown by stoi on streamload 
#49714 (#50410)
    
    Cherry-picked from #49714
    
    Co-authored-by: 神技圈子 <songguang...@gmail.com>
    Co-authored-by: 宋光璠 <songguang...@sf.com>
    Co-authored-by: morningman <yun...@selectdb.com>
    Co-authored-by: morningman <morning...@163.com>
---
 be/src/http/action/adjust_log_level.cpp            | 33 ++++++++----
 be/src/http/action/stream_load.cpp                 | 29 +++++-----
 be/src/util/string_util.cpp                        | 11 ++++
 be/src/util/string_util.h                          |  2 +-
 .../data/load_p0/stream_load/large_test_file.csv   |  5 ++
 .../load_p0/stream_load/test_stream_load.groovy    |  6 +--
 .../test_stream_load_illegal_skip_lines.groovy     | 62 ++++++++++++++++++++++
 .../test_stream_load_illegal_timeout.groovy        | 61 +++++++++++++++++++++
 8 files changed, 178 insertions(+), 31 deletions(-)

diff --git a/be/src/http/action/adjust_log_level.cpp 
b/be/src/http/action/adjust_log_level.cpp
index a8644a0fb5f..2aca94571a6 100644
--- a/be/src/http/action/adjust_log_level.cpp
+++ b/be/src/http/action/adjust_log_level.cpp
@@ -22,12 +22,13 @@
 #include "common/logging.h"
 #include "http/http_channel.h"
 #include "http/http_request.h"
+#include "util/string_util.h"
 
 namespace doris {
 
 // **Note**: If the module_name does not exist in the vlog modules, vlog
 // would create corresponding module for it.
-std::tuple<std::string, int, int> handle_request(HttpRequest* req) {
+Result<std::tuple<std::string, int, int>> handle_request(HttpRequest* req) {
     auto parse_param = [&req](std::string param) {
         const auto& value = req->param(param);
         if (value.empty()) {
@@ -38,22 +39,34 @@ std::tuple<std::string, int, int> 
handle_request(HttpRequest* req) {
     };
     const auto& module = parse_param("module");
     const auto& level = parse_param("level");
-    int new_level = std::stoi(level);
-    return std::make_tuple(module, google::SetVLOGLevel(module.c_str(), 
new_level), new_level);
+    auto result = safe_stoi(level, "level");
+    if (result.has_value()) {
+        return std::make_tuple(module, google::SetVLOGLevel(module.c_str(), 
result.value()),
+                               result.value());
+    } else {
+        return unexpected(std::move(result).error());
+    }
 }
 
 void AdjustLogLevelAction::handle(HttpRequest* req) {
     try {
         auto handle_result = handle_request(req);
-        auto msg =
-                fmt::format("adjust vlog of {} from {} to {} succeed", 
std::get<0>(handle_result),
-                            std::get<1>(handle_result), 
std::get<2>(handle_result));
-        LOG(INFO) << msg;
-        HttpChannel::send_reply(req, msg);
+        if (handle_result.has_value()) {
+            auto msg = fmt::format(
+                    "adjust vlog of {} from {} to {} succeed", 
std::get<0>(handle_result.value()),
+                    std::get<1>(handle_result.value()), 
std::get<2>(handle_result.value()));
+            LOG(INFO) << msg;
+            HttpChannel::send_reply(req, msg);
+        } else {
+            LOG(WARNING) << "adjust log level failed, error: " << 
handle_result.error();
+            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                    
handle_result.error().to_string_no_stack());
+            return;
+        }
     } catch (const std::exception& e) {
-        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
e.what());
         LOG(WARNING) << "adjust log level failed, error: " << e.what();
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
e.what());
         return;
     }
 }
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 55e20a764ba..e3abd5a8a5d 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -329,11 +329,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<Strea
     }
 
     if (!http_req->header(HTTP_TIMEOUT).empty()) {
-        try {
-            ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
-        } catch (const std::invalid_argument& e) {
-            return Status::InvalidArgument("Invalid timeout format, {}", 
e.what());
-        }
+        ctx->timeout_second = 
DORIS_TRY(safe_stoi(http_req->header(HTTP_TIMEOUT), HTTP_TIMEOUT));
     }
     if (!http_req->header(HTTP_COMMENT).empty()) {
         ctx->load_comment = http_req->header(HTTP_COMMENT);
@@ -565,15 +561,9 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
     }
 
     if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
-        try {
-            request.__set_send_batch_parallelism(
-                    std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM)));
-        } catch (const std::invalid_argument& e) {
-            return Status::InvalidArgument("send_batch_parallelism must be an 
integer, {}",
-                                           e.what());
-        } catch (const std::out_of_range& e) {
-            return Status::InvalidArgument("send_batch_parallelism out of 
range, {}", e.what());
-        }
+        int parallelism = 
DORIS_TRY(safe_stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM),
+                                              HTTP_SEND_BATCH_PARALLELISM));
+        request.__set_send_batch_parallelism(parallelism);
     }
 
     if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
@@ -629,7 +619,11 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         }
     }
     if (!http_req->header(HTTP_SKIP_LINES).empty()) {
-        request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES)));
+        int skip_lines = 
DORIS_TRY(safe_stoi(http_req->header(HTTP_SKIP_LINES), HTTP_SKIP_LINES));
+        if (skip_lines < 0) {
+            return Status::InvalidArgument("Invalid 'skip_lines': {}", 
skip_lines);
+        }
+        request.__set_skip_lines(skip_lines);
     }
     if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
         if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
@@ -650,8 +644,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
         request.__set_memtable_on_sink_node(value);
     }
     if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) {
-        int value = std::stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE));
-        request.__set_stream_per_node(value);
+        int stream_per_node = DORIS_TRY(
+                safe_stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE), 
HTTP_LOAD_STREAM_PER_NODE));
+        request.__set_stream_per_node(stream_per_node);
     }
     if (ctx->group_commit) {
         if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
diff --git a/be/src/util/string_util.cpp b/be/src/util/string_util.cpp
index bbd30771678..0d23feb93e7 100644
--- a/be/src/util/string_util.cpp
+++ b/be/src/util/string_util.cpp
@@ -31,4 +31,15 @@ size_t hash_of_path(const std::string& identifier, const 
std::string& path) {
     return hash;
 }
 
+Result<int> safe_stoi(const std::string& input, const std::string& name) {
+    try {
+        return std::stoi(input);
+    } catch (const std::invalid_argument& e) {
+        return ResultError(Status::Error<ErrorCode::INVALID_ARGUMENT>(
+                std::string("Invalid format of '{}': '{}', {}"), name, input, 
e.what()));
+    } catch (const std::out_of_range& e) {
+        return ResultError(Status::Error<ErrorCode::INVALID_ARGUMENT>(
+                std::string("'{}' value out of range: '{}', {}"), name, input, 
e.what()));
+    }
+}
 } // namespace doris
diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h
index a5837a538cc..f5edd08e64c 100644
--- a/be/src/util/string_util.h
+++ b/be/src/util/string_util.h
@@ -127,7 +127,7 @@ public:
 };
 
 size_t hash_of_path(const std::string& identifier, const std::string& path);
-
+Result<int> safe_stoi(const std::string& input, const std::string& name);
 using StringCaseSet = std::set<std::string, StringCaseLess>;
 using StringCaseUnorderedSet = std::unordered_set<std::string, 
StringCaseHasher, StringCaseEqual>;
 template <class T>
diff --git a/regression-test/data/load_p0/stream_load/large_test_file.csv 
b/regression-test/data/load_p0/stream_load/large_test_file.csv
new file mode 100644
index 00000000000..2f33dfed7fa
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/large_test_file.csv
@@ -0,0 +1,5 @@
+1      10      5       testA   abc
+2      20      7       testB   def
+3      30      9       testC   ghi
+4      40      2       testD   jkl
+5      50      4       testE   mno
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index a9cd807fe44..ca3cc83e4d0 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1457,7 +1457,7 @@ suite("test_stream_load", "p0") {
             }
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
-            assertEquals("[INVALID_ARGUMENT]send_batch_parallelism must be an 
integer, stoi", json.Message)
+            assertEquals("[INVALID_ARGUMENT]Invalid format of 
'send_batch_parallelism': 'a', stoi", json.Message)
         }
     }
 
@@ -1474,7 +1474,7 @@ suite("test_stream_load", "p0") {
             }
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
-            assertEquals("[INVALID_ARGUMENT]send_batch_parallelism out of 
range, stoi", json.Message)
+            assertEquals("[INVALID_ARGUMENT]'send_batch_parallelism' value out 
of range: '21474836471', stoi", json.Message)
         }
     }
 
@@ -1626,7 +1626,7 @@ suite("test_stream_load", "p0") {
   
    log.info(sql_result[0][0].toString())
    log.info(sql_result[0][1].toString())
-   log.info(sql_result[0].size.toString())
+   log.info(sql_result.toString())
 
    def beHost=sql_result[0][0]
    def beHttpPort=sql_result[0][1]
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy
new file mode 100644
index 00000000000..dcd97e783d6
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_illegal_skip_lines", "p0") {
+    def tableName = "test_stream_load_illegal_skip_lines"
+
+    def be_num = sql "show backends;"
+    if (be_num.size() > 1) {
+        // not suitable for multiple be cluster.
+        return
+    }
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+            `v11` varchar(6) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'strict_mode','true'
+
+        file 'large_test_file.csv'
+        set 'skip_lines', '-3'
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals("[INVALID_ARGUMENT]Invalid 'skip_lines': -3", 
json.Message)
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy
new file mode 100644
index 00000000000..e49b0cc8391
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_illegal_timeout", "p0") {
+    def tableName = "test_stream_load_illegal_timeout";
+
+   def be_num = sql "show backends;"
+    if (be_num.size() > 1) {
+        // not suitable for multiple be cluster.
+        return
+    }
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+            `v11` varchar(6) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'strict_mode','true'
+
+        file 'large_test_file.csv'
+        set 'timeout', 'abc'
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals("[INVALID_ARGUMENT]Invalid format of 'timeout': 
'abc', stoi", json.Message)
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to