This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa8ed2bccc [fix](array-type) fix the invalid format load for stream 
load (#12424)
fa8ed2bccc is described below

commit fa8ed2bccccfd20eb9d217dfe0c8d0c452c9365f
Author: carlvinhust2012 <huchengha...@126.com>
AuthorDate: Mon Sep 19 08:52:59 2022 +0800

    [fix](array-type) fix the invalid format load for stream load (#12424)
    
    this pr is used to fix the invalid format load for stream load.
    before the change , we will get the error when we load the invalid array 
format.
    the origin file to load :
    1 [1, 2, 3]
    2 [4, 5, 6]
    3 \N
    4 [7, \N, 8]
    5 10, 11, 12
    [hugo@xafj-palo]$ sh curl_cmd.sh
    {
    "TxnId": 11035,
    "Label": "11c9f111-188e-4616-9a50-aec8b7814513",
    "TwoPhaseCommit": "false",
    "Status": "Fail",
    "Message": "Array does not start with '[' character, found '1'",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 55,
    "LoadTimeMs": 7,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 0
    }
    3. after this change, we will get success and the error url which report 
the error line.
    [hugo@xafj-palo]$ sh curl_cmd.sh
    {
    "TxnId": 11046,
    "Label": "249808ee-55f4-4c08-b671-b3d82689d614",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 4,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 55,
    "LoadTimeMs": 39,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 19,
    "CommitAndPublishTimeMs": 16,
    "ErrorURL": 
"http://10.81.85.89:8502/api/_load_error_log?file=__shard_3/error_log_insert_stmt_8d4130f0c18aeb0a-ad7ffd4233c41893_8d4130f0c18aeb0a_ad7ffd4233c41893";
    }
    
    the sql select result:
    MySQL [example_db]> select * from array_test06;
    +------+--------------+
    | k1 | k2 |
    +------+--------------+
    | 1 | [1, 2, 3] |
    | 2 | [4, 5, 6] |
    | 3 | NULL |
    | 4 | [7, NULL, 8] |
    +------+--------------+
    4 rows in set (0.019 sec)
    
    the url page show us:
    "Reason: Invalid format for array column(k2). src line [10, 11, 12]; "
    
    Issue Number: #7570
---
 be/src/exec/base_scanner.cpp                       | 34 ++++++++++++++++++++++
 be/src/exec/base_scanner.h                         |  4 +++
 be/src/exec/broker_scanner.cpp                     |  8 ++---
 be/src/runtime/types.h                             |  2 ++
 be/src/vec/exec/vbroker_scanner.cpp                |  8 ++---
 .../data/load_p0/broker_load/simple_array.data     |  4 ++-
 .../load_p0/broker_load/test_array_load.groovy     |  3 +-
 7 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 69edbc4b05..864d86b72e 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -487,4 +487,38 @@ void BaseScanner::_fill_columns_from_path() {
     }
 }
 
+bool BaseScanner::is_null(const Slice& slice) {
+    return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
+}
+
+bool BaseScanner::is_array(const Slice& slice) {
+    return slice.size > 1 && slice.data[0] == '[' && slice.data[slice.size - 
1] == ']';
+}
+
+bool BaseScanner::check_array_format(std::vector<Slice>& split_values) {
+    // if not the array format, filter this line and return error url
+    auto dest_slot_descs = _dest_tuple_desc->slots();
+    for (int j = 0; j < split_values.size() && j < dest_slot_descs.size(); 
++j) {
+        auto dest_slot_desc = dest_slot_descs[j];
+        if (!dest_slot_desc->is_materialized()) {
+            continue;
+        }
+        const Slice& value = split_values[j];
+        if (dest_slot_desc->type().is_array_type() && !is_null(value) && 
!is_array(value)) {
+            RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                    [&]() -> std::string { return std::string(value.data, 
value.size); },
+                    [&]() -> std::string {
+                        fmt::memory_buffer err_msg;
+                        fmt::format_to(err_msg, "Invalid format for array 
column({})",
+                                       dest_slot_desc->col_name());
+                        return fmt::to_string(err_msg);
+                    },
+                    &_scanner_eof));
+            _counter->num_rows_filtered++;
+            return false;
+        }
+    }
+    return true;
+}
+
 } // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 80157c939c..6711836e53 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -92,6 +92,10 @@ protected:
     Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
     virtual Status _init_src_block();
 
+    bool is_null(const Slice& slice);
+    bool is_array(const Slice& slice);
+    bool check_array_format(std::vector<Slice>& split_values);
+
     RuntimeState* _state;
     const TBrokerScanRangeParams& _params;
 
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index aed2acd7b8..1c7961f15a 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -389,10 +389,6 @@ bool BrokerScanner::check_decimal_input(const Slice& 
slice, int precision, int s
     return true;
 }
 
-bool is_null(const Slice& slice) {
-    return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
-}
-
 // Convert one row to this tuple
 Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, 
MemPool* tuple_pool,
                                        bool* fill_tuple) {
@@ -494,6 +490,10 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& 
line) {
         return Status::OK();
     }
 
+    if (!check_array_format(_split_values)) {
+        return Status::OK();
+    }
+
     for (int i = 0; i < _split_values.size(); ++i) {
         auto slot_desc = _src_slot_descs[i];
         const Slice& value = _split_values[i];
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 9c20e4092f..583bf28c10 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -185,6 +185,8 @@ struct TypeDescriptor {
 
     bool is_collection_type() const { return type == TYPE_ARRAY || type == 
TYPE_MAP; }
 
+    bool is_array_type() const { return type == TYPE_ARRAY; }
+
     /// Returns the byte size of this type.  Returns 0 for variable length 
types.
     int get_byte_size() const { return ::doris::get_byte_size(type); }
 
diff --git a/be/src/vec/exec/vbroker_scanner.cpp 
b/be/src/vec/exec/vbroker_scanner.cpp
index 01a9f3255e..0679a1859b 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -30,10 +30,6 @@
 
 namespace doris::vectorized {
 
-bool is_null(const Slice& slice) {
-    return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
-}
-
 VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                                const TBrokerScanRangeParams& params,
                                const std::vector<TBrokerRangeDesc>& ranges,
@@ -95,6 +91,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
         return Status::OK();
     }
 
+    if (!check_array_format(_split_values)) {
+        return Status::OK();
+    }
+
     int idx = 0;
     for (int i = 0; i < _split_values.size(); ++i) {
         int dest_index = idx++;
diff --git a/regression-test/data/load_p0/broker_load/simple_array.data 
b/regression-test/data/load_p0/broker_load/simple_array.data
index 7501722c69..88eb710915 100644
--- a/regression-test/data/load_p0/broker_load/simple_array.data
+++ b/regression-test/data/load_p0/broker_load/simple_array.data
@@ -1,3 +1,5 @@
 
1/[1,2,3,4,5]/[32767,32768,32769]/[65534,65535,65536]/["a","b","c","d","e"]/["hello","world"]/["1991-01-01"]/["1991-01-01
 00:00:00"]/[0.33,0.67]/[3.1415926,0.878787878]/[1,1.2,1.3]
 
2/[1,2,3,4,5]/[32767,32768,32769]/[65534,65535,65536]/["a","b","c","d","e"]/["hello","world"]/\N/\N/\N/\N/[1,\N,1.3]
-3/\N/\N/\N/\N/\N/\N/\N/\N/\N/\N
\ No newline at end of file
+3/\N/\N/\N/\N/\N/\N/\N/\N/\N/\N
+4/1,2,3,4,5/\N/\N/\N/\N/\N/\N/\N/\N/\N
+5/[1,2,3,4,5/\N/\N/\N/\N/\N/\N/\N/\N/\N
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy 
b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
index 35d5a54589..b71722b425 100644
--- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
@@ -124,6 +124,7 @@ suite("test_array_load", "p0") {
             set 'where', where_expr
             set 'fuzzy_parse', fuzzy_flag
             set 'column_separator', column_sep
+            set 'max_filter_ratio', '0.6'
             file file_name // import json file
             time 10000 // limit inflight 10s
 
@@ -136,7 +137,7 @@ suite("test_array_load", "p0") {
                 log.info("Stream load result: ${result}".toString())
                 def json = parseJson(result)
                 assertEquals("success", json.Status.toLowerCase())
-                assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows)
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows + json.NumberFilteredRows)
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to