This is an automated email from the ASF dual-hosted git repository.

xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0732eb54bc [feature](struct-type) support csv format stream load for 
struct type (#17143)
0732eb54bc is described below

commit 0732eb54bc48c2375877ff13d7c5a926bb47c27a
Author: xy720 <22125576+xy...@users.noreply.github.com>
AuthorDate: Wed Mar 1 15:48:48 2023 +0800

    [feature](struct-type) support csv format stream load for struct type 
(#17143)
    
    Refactor from_string method in data_type_struct.cpp to support csv format 
stream load for struct type.
---
 be/src/vec/data_types/data_type_map.cpp            |   4 +-
 be/src/vec/data_types/data_type_struct.cpp         | 188 +++++++++++++++++----
 be/src/vec/exprs/vexpr.cpp                         |   7 +
 .../data/load_p0/stream_load/struct_malformat.csv  |   5 +
 .../data/load_p0/stream_load/struct_normal.csv     |  13 ++
 .../data/load_p0/stream_load/test_stream_load.out  |  22 +++
 .../load_p0/stream_load/test_stream_load.groovy    | 104 ++++++++++++
 7 files changed, 305 insertions(+), 38 deletions(-)

diff --git a/be/src/vec/data_types/data_type_map.cpp 
b/be/src/vec/data_types/data_type_map.cpp
index 4895a48c4c..5c362f5b90 100644
--- a/be/src/vec/data_types/data_type_map.cpp
+++ b/be/src/vec/data_types/data_type_map.cpp
@@ -156,11 +156,11 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* 
column) const {
     auto* map_column = assert_cast<ColumnMap*>(column);
 
     if (*rb.position() != '{') {
-        return Status::InvalidArgument("map does not start with '{' character, 
found '{}'",
+        return Status::InvalidArgument("map does not start with '{}' 
character, found '{}'", "{",
                                        *rb.position());
     }
     if (*(rb.end() - 1) != '}') {
-        return Status::InvalidArgument("map does not end with '}' character, 
found '{}'",
+        return Status::InvalidArgument("map does not end with '{}' character, 
found '{}'", "}",
                                        *(rb.end() - 1));
     }
 
diff --git a/be/src/vec/data_types/data_type_struct.cpp 
b/be/src/vec/data_types/data_type_struct.cpp
index 886ac5342c..66954687fe 100644
--- a/be/src/vec/data_types/data_type_struct.cpp
+++ b/be/src/vec/data_types/data_type_struct.cpp
@@ -55,8 +55,6 @@ DataTypeStruct::DataTypeStruct(const DataTypes& elems_, const 
Strings& names_)
     }
 
     Status st = check_tuple_names(names);
-    //if (!st.ok()) {
-    //}
 }
 
 std::string DataTypeStruct::do_get_name() const {
@@ -68,11 +66,6 @@ std::string DataTypeStruct::do_get_name() const {
         if (i != 0) {
             s << ", ";
         }
-
-        // if (have_explicit_names) {
-        //     s << back_quote_if_need(names[i]) << ' ';
-        // }
-
         s << elems[i]->get_name();
     }
     s << ")";
@@ -80,16 +73,84 @@ std::string DataTypeStruct::do_get_name() const {
     return s.str();
 }
 
+bool next_slot_from_string(ReadBuffer& rb, StringRef& output, bool& is_name, 
bool& has_quota) {
+    StringRef element(rb.position(), 0);
+    has_quota = false;
+    is_name = false;
+    if (rb.eof()) {
+        return false;
+    }
+
+    // ltrim
+    while (!rb.eof() && isspace(*rb.position())) {
+        ++rb.position();
+        element.data = rb.position();
+    }
+
+    // parse string
+    if (*rb.position() == '"' || *rb.position() == '\'') {
+        const char str_sep = *rb.position();
+        size_t str_len = 1;
+        // search until next '"' or '\''
+        while (str_len < rb.count() && *(rb.position() + str_len) != str_sep) {
+            ++str_len;
+        }
+        // invalid string
+        if (str_len >= rb.count()) {
+            rb.position() = rb.end();
+            return false;
+        }
+        has_quota = true;
+        rb.position() += str_len + 1;
+        element.size += str_len + 1;
+    }
+
+    // parse element until separator ':' or ',' or end '}'
+    while (!rb.eof() && (*rb.position() != ':') && (*rb.position() != ',') &&
+           (rb.count() != 1 || *rb.position() != '}')) {
+        if (has_quota && !isspace(*rb.position())) {
+            return false;
+        }
+        ++rb.position();
+        ++element.size;
+    }
+    // invalid element
+    if (rb.eof()) {
+        return false;
+    }
+
+    if (*rb.position() == ':') {
+        is_name = true;
+    }
+
+    // adjust read buffer position to first char of next element
+    ++rb.position();
+
+    // rtrim
+    while (element.size > 0 && isspace(element.data[element.size - 1])) {
+        --element.size;
+    }
+
+    // trim '"' and '\'' for string
+    if (element.size >= 2 && (element.data[0] == '"' || element.data[0] == 
'\'') &&
+        element.data[0] == element.data[element.size - 1]) {
+        ++element.data;
+        element.size -= 2;
+    }
+    output = element;
+    return true;
+}
+
 Status DataTypeStruct::from_string(ReadBuffer& rb, IColumn* column) const {
     DCHECK(!rb.eof());
     auto* struct_column = assert_cast<ColumnStruct*>(column);
 
     if (*rb.position() != '{') {
-        return Status::InvalidArgument("Struct does not start with '{' 
character, found '{}'",
+        return Status::InvalidArgument("Struct does not start with '{}' 
character, found '{}'", "{",
                                        *rb.position());
     }
-    if (rb.count() < 2 || *(rb.end() - 1) != '}') {
-        return Status::InvalidArgument("Struct does not end with '}' 
character, found '{}'",
+    if (*(rb.end() - 1) != '}') {
+        return Status::InvalidArgument("Struct does not end with '{}' 
character, found '{}'", "}",
                                        *(rb.end() - 1));
     }
 
@@ -99,43 +160,98 @@ Status DataTypeStruct::from_string(ReadBuffer& rb, 
IColumn* column) const {
     }
 
     ++rb.position();
+
+    bool is_explicit_names = false;
+    std::vector<std::string> field_names;
     std::vector<ReadBuffer> field_rbs;
-    field_rbs.reserve(elems.size());
+    std::vector<size_t> field_pos;
 
-    // here get the value "jack" and 20 from {"name":"jack","age":20}
     while (!rb.eof()) {
-        size_t field_len = 0;
-        auto start = rb.position();
-        while (!rb.eof() && *start != ',' && *start != '}') {
-            field_len++;
-            start++;
+        StringRef slot(rb.position(), rb.count());
+        bool has_quota = false;
+        bool is_name = false;
+        if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
+            return Status::InvalidArgument("Cannot read struct field from text 
'{}'",
+                                           slot.to_string());
         }
-        if (field_len >= rb.count()) {
-            return Status::InvalidArgument("Invalid Length");
-        }
-        ReadBuffer field_rb(rb.position(), field_len);
-
-        size_t len = 0;
-        auto start_rb = field_rb.position();
-        while (!field_rb.eof() && *start_rb != ':') {
-            len++;
-            start_rb++;
-        }
-        ReadBuffer field(field_rb.position() + len + 1, field_rb.count() - len 
- 1);
+        if (is_name) {
+            std::string name = slot.to_string();
+            if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
+                return Status::InvalidArgument("Cannot read struct field from 
text '{}'",
+                                               slot.to_string());
+            }
+            ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
+            field_names.push_back(name);
+            field_rbs.push_back(field_rb);
 
-        if (field.count() >= 2 && ((*field.position() == '"' && *(field.end() 
- 1) == '"') ||
-                                   (*field.position() == '\'' && *(field.end() 
- 1) == '\''))) {
-            ReadBuffer field_no_quote(field.position() + 1, field.count() - 2);
-            field_rbs.push_back(field_no_quote);
+            if (!is_explicit_names) {
+                is_explicit_names = true;
+            }
         } else {
-            field_rbs.push_back(field);
+            ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
+            field_rbs.push_back(field_rb);
         }
+    }
 
-        rb.position() += field_len + 1;
+    // TODO: should we support insert default field value when actual field 
number is less than
+    // schema field number?
+    if (field_rbs.size() != elems.size()) {
+        std::string cmp_str = field_rbs.size() > elems.size() ? "more" : 
"less";
+        return Status::InvalidArgument(
+                "Actual struct field number {} is {} than schema field number 
{}.",
+                field_rbs.size(), cmp_str, elems.size());
+    }
+
+    if (is_explicit_names) {
+        if (field_names.size() != field_rbs.size()) {
+            return Status::InvalidArgument(
+                    "Struct field name number {} is not equal to field number 
{}.",
+                    field_names.size(), field_rbs.size());
+        }
+        std::unordered_set<std::string> name_set;
+        for (size_t i = 0; i < field_names.size(); i++) {
+            // check duplicate fields
+            auto ret = name_set.insert(field_names[i]);
+            if (!ret.second) {
+                return Status::InvalidArgument("Struct field name {} is 
duplicate with others.",
+                                               field_names[i]);
+            }
+            // check name valid
+            auto idx = try_get_position_by_name(field_names[i]);
+            if (idx == std::nullopt) {
+                return Status::InvalidArgument("Cannot find struct field name 
{} in schema.",
+                                               field_names[i]);
+            }
+            field_pos.push_back(idx.value());
+        }
+    } else {
+        for (size_t i = 0; i < field_rbs.size(); i++) {
+            field_pos.push_back(i);
+        }
     }
 
     for (size_t idx = 0; idx < elems.size(); idx++) {
-        elems[idx]->from_string(field_rbs[idx], 
&struct_column->get_column(idx));
+        auto field_rb = field_rbs[field_pos[idx]];
+        // handle empty element
+        if (field_rb.count() == 0) {
+            struct_column->get_column(idx).insert_default();
+            continue;
+        }
+        // handle null element
+        if (field_rb.count() == 4 && strncmp(field_rb.position(), "null", 4) 
== 0) {
+            auto& nested_null_col =
+                    
reinterpret_cast<ColumnNullable&>(struct_column->get_column(idx));
+            nested_null_col.insert_null_elements(1);
+            continue;
+        }
+        auto st = elems[idx]->from_string(field_rb, 
&struct_column->get_column(idx));
+        if (!st.ok()) {
+            // we should do column revert if error
+            for (size_t j = 0; j < idx; j++) {
+                struct_column->get_column(j).pop_back(1);
+            }
+            return st;
+        }
     }
 
     return Status::OK();
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index f90993884d..4e71afbf31 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -381,6 +381,13 @@ FunctionContext::TypeDesc 
VExpr::column_type_to_type_desc(const TypeDescriptor&
             out.children.push_back(VExpr::column_type_to_type_desc(t));
         }
         break;
+    case TYPE_STRUCT:
+        CHECK(type.children.size() >= 1);
+        out.type = FunctionContext::TYPE_STRUCT;
+        for (const auto& t : type.children) {
+            out.children.push_back(VExpr::column_type_to_type_desc(t));
+        }
+        break;
     case TYPE_STRING:
         out.type = FunctionContext::TYPE_STRING;
         out.len = type.len;
diff --git a/regression-test/data/load_p0/stream_load/struct_malformat.csv 
b/regression-test/data/load_p0/stream_load/struct_malformat.csv
new file mode 100644
index 0000000000..8af8629e9e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/struct_malformat.csv
@@ -0,0 +1,5 @@
+1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", 
"f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1}
+2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", 
"f7":null, "f8":null, "f9":null, "f10":1.1}
+3|\N
+4|"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", 
"f7":null, "f8":null, "f9":null, "f10":1.1
+5|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26", 
f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1
diff --git a/regression-test/data/load_p0/stream_load/struct_normal.csv 
b/regression-test/data/load_p0/stream_load/struct_normal.csv
new file mode 100644
index 0000000000..fe82889afd
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/struct_normal.csv
@@ -0,0 +1,13 @@
+1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", 
"f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1}
+2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", 
"f7":null, "f8":null, "f9":null, "f10":1.1}
+3|{'f1':1, 'f2':100, 'f3':100000, 'f4':'a', 'f5':"doris", 'f6':"2023-02-26", 
'f7':null, 'f8':null, 'f9':null, 'f10':1.1}
+4|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26", 
f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1}
+5|{f1: 1, f2: 100, f3: 100000, f4: a, f5: doris, f6: 2023-02-26, f7: 
"2023-02-26 17:58", f8: 1.01, f9: 3.1415926, f10: 1.1}
+6|{"f10":1.1, "f9":3.1415926, "f8":1.01, "f7":"2023-02-26 17:58", 
"f6":"2023-02-26", "f5":"doris", "f4":'a', "f3":100000, "f2":100, "f1":1}
+7|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26, 
f5:doris, f4:a, f3:100000, f2:100, f1:1}
+8|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26, 
f5:doris, f4:null, f3:null, f2:null, f1:1}
+9|{"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, 
"f7":null, "f8":null, "f9":null, "f10":null}
+10|{1, 100, 100000, 'a', "doris", "2023-02-26", "2023-02-26 17:58", 1.01, 
3.1415926, 1.1}
+11|{1, 100, 100000, 'a', "doris", "2023-02-26", null, null, null, 1.1}
+12|{null, null, null, null, null, null, null, null, null, null}
+13|\N
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out 
b/regression-test/data/load_p0/stream_load/test_stream_load.out
index 72bc763d81..3e4ccd3fc5 100644
--- a/regression-test/data/load_p0/stream_load/test_stream_load.out
+++ b/regression-test/data/load_p0/stream_load/test_stream_load.out
@@ -69,6 +69,28 @@
 7      [1, 2, 3, 4, 5] \N      \N      \N      \N      \N      \N      \N      
\N      \N
 8      [1, 2, 3, 4, 5] \N      \N      \N      \N      \N      [NULL]  \N      
[NULL]  \N
 
+-- !all111 --
+1      {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+2      {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+3      \N
+4      \N
+5      \N
+
+-- !all112 --
+1      {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+2      {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+3      {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+4      {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+5      {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+6      {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+7      {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+8      {1, NULL, NULL, NULL, 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+9      {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+10     {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.1}
+11     {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+12     {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+13     \N
+
 -- !sql1 --
 -2     -50     1       \N      44
 2      -51     1       2       \N
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index a04d25622e..351bab3afd 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -186,12 +186,14 @@ suite("test_stream_load", "p0") {
     def tableName6 = "test_unique_key"
     def tableName7 = "test_unique_key_with_delete"
     def tableName8 = "test_array"
+    def tableName10 = "test_struct"
     sql """ DROP TABLE IF EXISTS ${tableName3} """
     sql """ DROP TABLE IF EXISTS ${tableName4} """
     sql """ DROP TABLE IF EXISTS ${tableName5} """
     sql """ DROP TABLE IF EXISTS ${tableName6} """
     sql """ DROP TABLE IF EXISTS ${tableName7} """
     sql """ DROP TABLE IF EXISTS ${tableName8} """
+    sql """ DROP TABLE IF EXISTS ${tableName10} """
     sql """
     CREATE TABLE IF NOT EXISTS ${tableName3} (
       `k1` int(11) NULL,
@@ -287,6 +289,28 @@ suite("test_stream_load", "p0") {
     "replication_allocation" = "tag.location.default: 1"
     );
     """
+    sql """ADMIN SET FRONTEND CONFIG ('enable_struct_type' = 'true');"""
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName10} (
+      `k1` INT(11) NULL COMMENT "",
+      `k2` STRUCT<
+               f1:SMALLINT,
+               f2:INT(11),
+               f3:BIGINT,
+               f4:CHAR,
+               f5:VARCHAR(20),
+               f6:DATE,
+               f7:DATETIME,
+               f8:FLOAT,
+               f9:DOUBLE,
+               f10:DECIMAL(20, 6)> NULL COMMENT ""
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`k1`)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
 
     // load all columns
     streamLoad {
@@ -673,6 +697,86 @@ suite("test_stream_load", "p0") {
     }
     sql "sync"
 
+    // ===== test struct stream load
+    // malformat without strictmode
+    streamLoad {
+        table "${tableName10}"
+
+        set 'column_separator', '|'
+
+        file 'struct_malformat.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(5, json.NumberTotalRows)
+            assertEquals(5, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    qt_all111 "SELECT * from ${tableName10} order by k1" // 5
+    sql """truncate table ${tableName10}"""
+    sql """sync"""
+
+    // malformat with strictmode
+    streamLoad {
+        table "${tableName10}"
+
+        set 'column_separator', '|'
+        set 'strict_mode', 'true'
+
+        file 'struct_malformat.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals(5, json.NumberTotalRows)
+            assertEquals(3, json.NumberLoadedRows)
+            assertEquals(2, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+
+    // normal load
+    streamLoad {
+        table "${tableName10}"
+
+        set 'column_separator', '|'
+
+        file 'struct_normal.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(13, json.NumberTotalRows)
+            assertEquals(13, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    qt_all112 "SELECT * from ${tableName10} order by k1" // 10
+    sql """truncate table ${tableName10}"""
+    sql """sync"""
+
     // test immutable partition success
     def tableName9 = "test_immutable_partition"
     sql """ DROP TABLE IF EXISTS ${tableName9} """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to