This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0fdf8724d23 branch-3.0: [chore] Fix unhandled exceptions thrown by stoi on streamload #49714 (#50410) 0fdf8724d23 is described below commit 0fdf8724d23001b37a926b3509aadb728aea67a5 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Sun Apr 27 10:29:22 2025 +0800 branch-3.0: [chore] Fix unhandled exceptions thrown by stoi on streamload #49714 (#50410) Cherry-picked from #49714 Co-authored-by: 神技圈子 <songguang...@gmail.com> Co-authored-by: 宋光璠 <songguang...@sf.com> Co-authored-by: morningman <yun...@selectdb.com> Co-authored-by: morningman <morning...@163.com> --- be/src/http/action/adjust_log_level.cpp | 33 ++++++++---- be/src/http/action/stream_load.cpp | 29 +++++----- be/src/util/string_util.cpp | 11 ++++ be/src/util/string_util.h | 2 +- .../data/load_p0/stream_load/large_test_file.csv | 5 ++ .../load_p0/stream_load/test_stream_load.groovy | 6 +-- .../test_stream_load_illegal_skip_lines.groovy | 62 ++++++++++++++++++++++ .../test_stream_load_illegal_timeout.groovy | 61 +++++++++++++++++++++ 8 files changed, 178 insertions(+), 31 deletions(-) diff --git a/be/src/http/action/adjust_log_level.cpp b/be/src/http/action/adjust_log_level.cpp index a8644a0fb5f..2aca94571a6 100644 --- a/be/src/http/action/adjust_log_level.cpp +++ b/be/src/http/action/adjust_log_level.cpp @@ -22,12 +22,13 @@ #include "common/logging.h" #include "http/http_channel.h" #include "http/http_request.h" +#include "util/string_util.h" namespace doris { // **Note**: If the module_name does not exist in the vlog modules, vlog // would create corresponding module for it. -std::tuple<std::string, int, int> handle_request(HttpRequest* req) { +Result<std::tuple<std::string, int, int>> handle_request(HttpRequest* req) { auto parse_param = [&req](std::string param) { const auto& value = req->param(param); if (value.empty()) { @@ -38,22 +39,34 @@ std::tuple<std::string, int, int> handle_request(HttpRequest* req) { }; const auto& module = parse_param("module"); const auto& level = parse_param("level"); - int new_level = std::stoi(level); - return std::make_tuple(module, google::SetVLOGLevel(module.c_str(), new_level), new_level); + auto result = safe_stoi(level, "level"); + if (result.has_value()) { + return std::make_tuple(module, google::SetVLOGLevel(module.c_str(), result.value()), + result.value()); + } else { + return unexpected(std::move(result).error()); + } } void AdjustLogLevelAction::handle(HttpRequest* req) { try { auto handle_result = handle_request(req); - auto msg = - fmt::format("adjust vlog of {} from {} to {} succeed", std::get<0>(handle_result), - std::get<1>(handle_result), std::get<2>(handle_result)); - LOG(INFO) << msg; - HttpChannel::send_reply(req, msg); + if (handle_result.has_value()) { + auto msg = fmt::format( + "adjust vlog of {} from {} to {} succeed", std::get<0>(handle_result.value()), + std::get<1>(handle_result.value()), std::get<2>(handle_result.value())); + LOG(INFO) << msg; + HttpChannel::send_reply(req, msg); + } else { + LOG(WARNING) << "adjust log level failed, error: " << handle_result.error(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + handle_result.error().to_string_no_stack()); + return; + } } catch (const std::exception& e) { - HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); LOG(WARNING) << "adjust log level failed, error: " << e.what(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); return; } } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 55e20a764ba..e3abd5a8a5d 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -329,11 +329,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea } if (!http_req->header(HTTP_TIMEOUT).empty()) { - try { - ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT)); - } catch (const std::invalid_argument& e) { - return Status::InvalidArgument("Invalid timeout format, {}", e.what()); - } + ctx->timeout_second = DORIS_TRY(safe_stoi(http_req->header(HTTP_TIMEOUT), HTTP_TIMEOUT)); } if (!http_req->header(HTTP_COMMENT).empty()) { ctx->load_comment = http_req->header(HTTP_COMMENT); @@ -565,15 +561,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) { - try { - request.__set_send_batch_parallelism( - std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM))); - } catch (const std::invalid_argument& e) { - return Status::InvalidArgument("send_batch_parallelism must be an integer, {}", - e.what()); - } catch (const std::out_of_range& e) { - return Status::InvalidArgument("send_batch_parallelism out of range, {}", e.what()); - } + int parallelism = DORIS_TRY(safe_stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM), + HTTP_SEND_BATCH_PARALLELISM)); + request.__set_send_batch_parallelism(parallelism); } if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) { @@ -629,7 +619,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } } if (!http_req->header(HTTP_SKIP_LINES).empty()) { - request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES))); + int skip_lines = DORIS_TRY(safe_stoi(http_req->header(HTTP_SKIP_LINES), HTTP_SKIP_LINES)); + if (skip_lines < 0) { + return Status::InvalidArgument("Invalid 'skip_lines': {}", skip_lines); + } + request.__set_skip_lines(skip_lines); } if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) { if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) { @@ -650,8 +644,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_memtable_on_sink_node(value); } if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) { - int value = std::stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE)); - request.__set_stream_per_node(value); + int stream_per_node = DORIS_TRY( + safe_stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE), HTTP_LOAD_STREAM_PER_NODE)); + request.__set_stream_per_node(stream_per_node); } if (ctx->group_commit) { if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { diff --git a/be/src/util/string_util.cpp b/be/src/util/string_util.cpp index bbd30771678..0d23feb93e7 100644 --- a/be/src/util/string_util.cpp +++ b/be/src/util/string_util.cpp @@ -31,4 +31,15 @@ size_t hash_of_path(const std::string& identifier, const std::string& path) { return hash; } +Result<int> safe_stoi(const std::string& input, const std::string& name) { + try { + return std::stoi(input); + } catch (const std::invalid_argument& e) { + return ResultError(Status::Error<ErrorCode::INVALID_ARGUMENT>( + std::string("Invalid format of '{}': '{}', {}"), name, input, e.what())); + } catch (const std::out_of_range& e) { + return ResultError(Status::Error<ErrorCode::INVALID_ARGUMENT>( + std::string("'{}' value out of range: '{}', {}"), name, input, e.what())); + } +} } // namespace doris diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h index a5837a538cc..f5edd08e64c 100644 --- a/be/src/util/string_util.h +++ b/be/src/util/string_util.h @@ -127,7 +127,7 @@ public: }; size_t hash_of_path(const std::string& identifier, const std::string& path); - +Result<int> safe_stoi(const std::string& input, const std::string& name); using StringCaseSet = std::set<std::string, StringCaseLess>; using StringCaseUnorderedSet = std::unordered_set<std::string, StringCaseHasher, StringCaseEqual>; template <class T> diff --git a/regression-test/data/load_p0/stream_load/large_test_file.csv b/regression-test/data/load_p0/stream_load/large_test_file.csv new file mode 100644 index 00000000000..2f33dfed7fa --- /dev/null +++ b/regression-test/data/load_p0/stream_load/large_test_file.csv @@ -0,0 +1,5 @@ +1 10 5 testA abc +2 20 7 testB def +3 30 9 testC ghi +4 40 2 testD jkl +5 50 4 testE mno diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index a9cd807fe44..ca3cc83e4d0 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -1457,7 +1457,7 @@ suite("test_stream_load", "p0") { } log.info("Stream load result: ${result}".toString()) def json = parseJson(result) - assertEquals("[INVALID_ARGUMENT]send_batch_parallelism must be an integer, stoi", json.Message) + assertEquals("[INVALID_ARGUMENT]Invalid format of 'send_batch_parallelism': 'a', stoi", json.Message) } } @@ -1474,7 +1474,7 @@ suite("test_stream_load", "p0") { } log.info("Stream load result: ${result}".toString()) def json = parseJson(result) - assertEquals("[INVALID_ARGUMENT]send_batch_parallelism out of range, stoi", json.Message) + assertEquals("[INVALID_ARGUMENT]'send_batch_parallelism' value out of range: '21474836471', stoi", json.Message) } } @@ -1626,7 +1626,7 @@ suite("test_stream_load", "p0") { log.info(sql_result[0][0].toString()) log.info(sql_result[0][1].toString()) - log.info(sql_result[0].size.toString()) + log.info(sql_result.toString()) def beHost=sql_result[0][0] def beHttpPort=sql_result[0][1] diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy new file mode 100644 index 00000000000..dcd97e783d6 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_illegal_skip_lines", "p0") { + def tableName = "test_stream_load_illegal_skip_lines" + + def be_num = sql "show backends;" + if (be_num.size() > 1) { + // not suitable for multiple be cluster. + return + } + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'strict_mode','true' + + file 'large_test_file.csv' + set 'skip_lines', '-3' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + + assertEquals("fail", json.Status.toLowerCase()) + assertEquals("[INVALID_ARGUMENT]Invalid 'skip_lines': -3", json.Message) + } + } +} diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy new file mode 100644 index 00000000000..e49b0cc8391 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_illegal_timeout", "p0") { + def tableName = "test_stream_load_illegal_timeout"; + + def be_num = sql "show backends;" + if (be_num.size() > 1) { + // not suitable for multiple be cluster. + return + } + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'strict_mode','true' + + file 'large_test_file.csv' + set 'timeout', 'abc' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + + assertEquals("fail", json.Status.toLowerCase()) + assertEquals("[INVALID_ARGUMENT]Invalid format of 'timeout': 'abc', stoi", json.Message) + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org