This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fa8ed2bccc [fix](array-type) fix the invalid format load for stream load (#12424) fa8ed2bccc is described below commit fa8ed2bccccfd20eb9d217dfe0c8d0c452c9365f Author: carlvinhust2012 <huchengha...@126.com> AuthorDate: Mon Sep 19 08:52:59 2022 +0800 [fix](array-type) fix the invalid format load for stream load (#12424) this pr is used to fix the invalid format load for stream load. before the change , we will get the error when we load the invalid array format. the origin file to load : 1 [1, 2, 3] 2 [4, 5, 6] 3 \N 4 [7, \N, 8] 5 10, 11, 12 [hugo@xafj-palo]$ sh curl_cmd.sh { "TxnId": 11035, "Label": "11c9f111-188e-4616-9a50-aec8b7814513", "TwoPhaseCommit": "false", "Status": "Fail", "Message": "Array does not start with '[' character, found '1'", "NumberTotalRows": 0, "NumberLoadedRows": 0, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 55, "LoadTimeMs": 7, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 0, "WriteDataTimeMs": 3, "CommitAndPublishTimeMs": 0 } 3. after this change, we will get success and the error url which report the error line. [hugo@xafj-palo]$ sh curl_cmd.sh { "TxnId": 11046, "Label": "249808ee-55f4-4c08-b671-b3d82689d614", "TwoPhaseCommit": "false", "Status": "Success", "Message": "OK", "NumberTotalRows": 5, "NumberLoadedRows": 4, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 55, "LoadTimeMs": 39, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 0, "WriteDataTimeMs": 19, "CommitAndPublishTimeMs": 16, "ErrorURL": "http://10.81.85.89:8502/api/_load_error_log?file=__shard_3/error_log_insert_stmt_8d4130f0c18aeb0a-ad7ffd4233c41893_8d4130f0c18aeb0a_ad7ffd4233c41893" } the sql select result: MySQL [example_db]> select * from array_test06; +------+--------------+ | k1 | k2 | +------+--------------+ | 1 | [1, 2, 3] | | 2 | [4, 5, 6] | | 3 | NULL | | 4 | [7, NULL, 8] | +------+--------------+ 4 rows in set (0.019 sec) the url page show us: "Reason: Invalid format for array column(k2). src line [10, 11, 12]; " Issue Number: #7570 --- be/src/exec/base_scanner.cpp | 34 ++++++++++++++++++++++ be/src/exec/base_scanner.h | 4 +++ be/src/exec/broker_scanner.cpp | 8 ++--- be/src/runtime/types.h | 2 ++ be/src/vec/exec/vbroker_scanner.cpp | 8 ++--- .../data/load_p0/broker_load/simple_array.data | 4 ++- .../load_p0/broker_load/test_array_load.groovy | 3 +- 7 files changed, 53 insertions(+), 10 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 69edbc4b05..864d86b72e 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -487,4 +487,38 @@ void BaseScanner::_fill_columns_from_path() { } } +bool BaseScanner::is_null(const Slice& slice) { + return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N'; +} + +bool BaseScanner::is_array(const Slice& slice) { + return slice.size > 1 && slice.data[0] == '[' && slice.data[slice.size - 1] == ']'; +} + +bool BaseScanner::check_array_format(std::vector<Slice>& split_values) { + // if not the array format, filter this line and return error url + auto dest_slot_descs = _dest_tuple_desc->slots(); + for (int j = 0; j < split_values.size() && j < dest_slot_descs.size(); ++j) { + auto dest_slot_desc = dest_slot_descs[j]; + if (!dest_slot_desc->is_materialized()) { + continue; + } + const Slice& value = split_values[j]; + if (dest_slot_desc->type().is_array_type() && !is_null(value) && !is_array(value)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return std::string(value.data, value.size); }, + [&]() -> std::string { + fmt::memory_buffer err_msg; + fmt::format_to(err_msg, "Invalid format for array column({})", + dest_slot_desc->col_name()); + return fmt::to_string(err_msg); + }, + &_scanner_eof)); + _counter->num_rows_filtered++; + return false; + } + } + return true; +} + } // namespace doris diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 80157c939c..6711836e53 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -92,6 +92,10 @@ protected: Status _fill_dest_block(vectorized::Block* dest_block, bool* eof); virtual Status _init_src_block(); + bool is_null(const Slice& slice); + bool is_array(const Slice& slice); + bool check_array_format(std::vector<Slice>& split_values); + RuntimeState* _state; const TBrokerScanRangeParams& _params; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index aed2acd7b8..1c7961f15a 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -389,10 +389,6 @@ bool BrokerScanner::check_decimal_input(const Slice& slice, int precision, int s return true; } -bool is_null(const Slice& slice) { - return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N'; -} - // Convert one row to this tuple Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) { @@ -494,6 +490,10 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) { return Status::OK(); } + if (!check_array_format(_split_values)) { + return Status::OK(); + } + for (int i = 0; i < _split_values.size(); ++i) { auto slot_desc = _src_slot_descs[i]; const Slice& value = _split_values[i]; diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index 9c20e4092f..583bf28c10 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -185,6 +185,8 @@ struct TypeDescriptor { bool is_collection_type() const { return type == TYPE_ARRAY || type == TYPE_MAP; } + bool is_array_type() const { return type == TYPE_ARRAY; } + /// Returns the byte size of this type. Returns 0 for variable length types. int get_byte_size() const { return ::doris::get_byte_size(type); } diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index 01a9f3255e..0679a1859b 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -30,10 +30,6 @@ namespace doris::vectorized { -bool is_null(const Slice& slice) { - return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N'; -} - VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges, @@ -95,6 +91,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, return Status::OK(); } + if (!check_array_format(_split_values)) { + return Status::OK(); + } + int idx = 0; for (int i = 0; i < _split_values.size(); ++i) { int dest_index = idx++; diff --git a/regression-test/data/load_p0/broker_load/simple_array.data b/regression-test/data/load_p0/broker_load/simple_array.data index 7501722c69..88eb710915 100644 --- a/regression-test/data/load_p0/broker_load/simple_array.data +++ b/regression-test/data/load_p0/broker_load/simple_array.data @@ -1,3 +1,5 @@ 1/[1,2,3,4,5]/[32767,32768,32769]/[65534,65535,65536]/["a","b","c","d","e"]/["hello","world"]/["1991-01-01"]/["1991-01-01 00:00:00"]/[0.33,0.67]/[3.1415926,0.878787878]/[1,1.2,1.3] 2/[1,2,3,4,5]/[32767,32768,32769]/[65534,65535,65536]/["a","b","c","d","e"]/["hello","world"]/\N/\N/\N/\N/[1,\N,1.3] -3/\N/\N/\N/\N/\N/\N/\N/\N/\N/\N \ No newline at end of file +3/\N/\N/\N/\N/\N/\N/\N/\N/\N/\N +4/1,2,3,4,5/\N/\N/\N/\N/\N/\N/\N/\N/\N +5/[1,2,3,4,5/\N/\N/\N/\N/\N/\N/\N/\N/\N \ No newline at end of file diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy b/regression-test/suites/load_p0/broker_load/test_array_load.groovy index 35d5a54589..b71722b425 100644 --- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy @@ -124,6 +124,7 @@ suite("test_array_load", "p0") { set 'where', where_expr set 'fuzzy_parse', fuzzy_flag set 'column_separator', column_sep + set 'max_filter_ratio', '0.6' file file_name // import json file time 10000 // limit inflight 10s @@ -136,7 +137,7 @@ suite("test_array_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows) assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org