This is an automated email from the ASF dual-hosted git repository. xuyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0732eb54bc [feature](struct-type) support csv format stream load for struct type (#17143) 0732eb54bc is described below commit 0732eb54bc48c2375877ff13d7c5a926bb47c27a Author: xy720 <22125576+xy...@users.noreply.github.com> AuthorDate: Wed Mar 1 15:48:48 2023 +0800 [feature](struct-type) support csv format stream load for struct type (#17143) Refactor from_string method in data_type_struct.cpp to support csv format stream load for struct type. --- be/src/vec/data_types/data_type_map.cpp | 4 +- be/src/vec/data_types/data_type_struct.cpp | 188 +++++++++++++++++---- be/src/vec/exprs/vexpr.cpp | 7 + .../data/load_p0/stream_load/struct_malformat.csv | 5 + .../data/load_p0/stream_load/struct_normal.csv | 13 ++ .../data/load_p0/stream_load/test_stream_load.out | 22 +++ .../load_p0/stream_load/test_stream_load.groovy | 104 ++++++++++++ 7 files changed, 305 insertions(+), 38 deletions(-) diff --git a/be/src/vec/data_types/data_type_map.cpp b/be/src/vec/data_types/data_type_map.cpp index 4895a48c4c..5c362f5b90 100644 --- a/be/src/vec/data_types/data_type_map.cpp +++ b/be/src/vec/data_types/data_type_map.cpp @@ -156,11 +156,11 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const { auto* map_column = assert_cast<ColumnMap*>(column); if (*rb.position() != '{') { - return Status::InvalidArgument("map does not start with '{' character, found '{}'", + return Status::InvalidArgument("map does not start with '{}' character, found '{}'", "{", *rb.position()); } if (*(rb.end() - 1) != '}') { - return Status::InvalidArgument("map does not end with '}' character, found '{}'", + return Status::InvalidArgument("map does not end with '{}' character, found '{}'", "}", *(rb.end() - 1)); } diff --git a/be/src/vec/data_types/data_type_struct.cpp b/be/src/vec/data_types/data_type_struct.cpp index 886ac5342c..66954687fe 100644 --- a/be/src/vec/data_types/data_type_struct.cpp +++ b/be/src/vec/data_types/data_type_struct.cpp @@ -55,8 +55,6 @@ DataTypeStruct::DataTypeStruct(const DataTypes& elems_, const Strings& names_) } Status st = check_tuple_names(names); - //if (!st.ok()) { - //} } std::string DataTypeStruct::do_get_name() const { @@ -68,11 +66,6 @@ std::string DataTypeStruct::do_get_name() const { if (i != 0) { s << ", "; } - - // if (have_explicit_names) { - // s << back_quote_if_need(names[i]) << ' '; - // } - s << elems[i]->get_name(); } s << ")"; @@ -80,16 +73,84 @@ std::string DataTypeStruct::do_get_name() const { return s.str(); } +bool next_slot_from_string(ReadBuffer& rb, StringRef& output, bool& is_name, bool& has_quota) { + StringRef element(rb.position(), 0); + has_quota = false; + is_name = false; + if (rb.eof()) { + return false; + } + + // ltrim + while (!rb.eof() && isspace(*rb.position())) { + ++rb.position(); + element.data = rb.position(); + } + + // parse string + if (*rb.position() == '"' || *rb.position() == '\'') { + const char str_sep = *rb.position(); + size_t str_len = 1; + // search until next '"' or '\'' + while (str_len < rb.count() && *(rb.position() + str_len) != str_sep) { + ++str_len; + } + // invalid string + if (str_len >= rb.count()) { + rb.position() = rb.end(); + return false; + } + has_quota = true; + rb.position() += str_len + 1; + element.size += str_len + 1; + } + + // parse element until separator ':' or ',' or end '}' + while (!rb.eof() && (*rb.position() != ':') && (*rb.position() != ',') && + (rb.count() != 1 || *rb.position() != '}')) { + if (has_quota && !isspace(*rb.position())) { + return false; + } + ++rb.position(); + ++element.size; + } + // invalid element + if (rb.eof()) { + return false; + } + + if (*rb.position() == ':') { + is_name = true; + } + + // adjust read buffer position to first char of next element + ++rb.position(); + + // rtrim + while (element.size > 0 && isspace(element.data[element.size - 1])) { + --element.size; + } + + // trim '"' and '\'' for string + if (element.size >= 2 && (element.data[0] == '"' || element.data[0] == '\'') && + element.data[0] == element.data[element.size - 1]) { + ++element.data; + element.size -= 2; + } + output = element; + return true; +} + Status DataTypeStruct::from_string(ReadBuffer& rb, IColumn* column) const { DCHECK(!rb.eof()); auto* struct_column = assert_cast<ColumnStruct*>(column); if (*rb.position() != '{') { - return Status::InvalidArgument("Struct does not start with '{' character, found '{}'", + return Status::InvalidArgument("Struct does not start with '{}' character, found '{}'", "{", *rb.position()); } - if (rb.count() < 2 || *(rb.end() - 1) != '}') { - return Status::InvalidArgument("Struct does not end with '}' character, found '{}'", + if (*(rb.end() - 1) != '}') { + return Status::InvalidArgument("Struct does not end with '{}' character, found '{}'", "}", *(rb.end() - 1)); } @@ -99,43 +160,98 @@ Status DataTypeStruct::from_string(ReadBuffer& rb, IColumn* column) const { } ++rb.position(); + + bool is_explicit_names = false; + std::vector<std::string> field_names; std::vector<ReadBuffer> field_rbs; - field_rbs.reserve(elems.size()); + std::vector<size_t> field_pos; - // here get the value "jack" and 20 from {"name":"jack","age":20} while (!rb.eof()) { - size_t field_len = 0; - auto start = rb.position(); - while (!rb.eof() && *start != ',' && *start != '}') { - field_len++; - start++; + StringRef slot(rb.position(), rb.count()); + bool has_quota = false; + bool is_name = false; + if (!next_slot_from_string(rb, slot, is_name, has_quota)) { + return Status::InvalidArgument("Cannot read struct field from text '{}'", + slot.to_string()); } - if (field_len >= rb.count()) { - return Status::InvalidArgument("Invalid Length"); - } - ReadBuffer field_rb(rb.position(), field_len); - - size_t len = 0; - auto start_rb = field_rb.position(); - while (!field_rb.eof() && *start_rb != ':') { - len++; - start_rb++; - } - ReadBuffer field(field_rb.position() + len + 1, field_rb.count() - len - 1); + if (is_name) { + std::string name = slot.to_string(); + if (!next_slot_from_string(rb, slot, is_name, has_quota)) { + return Status::InvalidArgument("Cannot read struct field from text '{}'", + slot.to_string()); + } + ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size); + field_names.push_back(name); + field_rbs.push_back(field_rb); - if (field.count() >= 2 && ((*field.position() == '"' && *(field.end() - 1) == '"') || - (*field.position() == '\'' && *(field.end() - 1) == '\''))) { - ReadBuffer field_no_quote(field.position() + 1, field.count() - 2); - field_rbs.push_back(field_no_quote); + if (!is_explicit_names) { + is_explicit_names = true; + } } else { - field_rbs.push_back(field); + ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size); + field_rbs.push_back(field_rb); } + } - rb.position() += field_len + 1; + // TODO: should we support insert default field value when actual field number is less than + // schema field number? + if (field_rbs.size() != elems.size()) { + std::string cmp_str = field_rbs.size() > elems.size() ? "more" : "less"; + return Status::InvalidArgument( + "Actual struct field number {} is {} than schema field number {}.", + field_rbs.size(), cmp_str, elems.size()); + } + + if (is_explicit_names) { + if (field_names.size() != field_rbs.size()) { + return Status::InvalidArgument( + "Struct field name number {} is not equal to field number {}.", + field_names.size(), field_rbs.size()); + } + std::unordered_set<std::string> name_set; + for (size_t i = 0; i < field_names.size(); i++) { + // check duplicate fields + auto ret = name_set.insert(field_names[i]); + if (!ret.second) { + return Status::InvalidArgument("Struct field name {} is duplicate with others.", + field_names[i]); + } + // check name valid + auto idx = try_get_position_by_name(field_names[i]); + if (idx == std::nullopt) { + return Status::InvalidArgument("Cannot find struct field name {} in schema.", + field_names[i]); + } + field_pos.push_back(idx.value()); + } + } else { + for (size_t i = 0; i < field_rbs.size(); i++) { + field_pos.push_back(i); + } } for (size_t idx = 0; idx < elems.size(); idx++) { - elems[idx]->from_string(field_rbs[idx], &struct_column->get_column(idx)); + auto field_rb = field_rbs[field_pos[idx]]; + // handle empty element + if (field_rb.count() == 0) { + struct_column->get_column(idx).insert_default(); + continue; + } + // handle null element + if (field_rb.count() == 4 && strncmp(field_rb.position(), "null", 4) == 0) { + auto& nested_null_col = + reinterpret_cast<ColumnNullable&>(struct_column->get_column(idx)); + nested_null_col.insert_null_elements(1); + continue; + } + auto st = elems[idx]->from_string(field_rb, &struct_column->get_column(idx)); + if (!st.ok()) { + // we should do column revert if error + for (size_t j = 0; j < idx; j++) { + struct_column->get_column(j).pop_back(1); + } + return st; + } } return Status::OK(); diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index f90993884d..4e71afbf31 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -381,6 +381,13 @@ FunctionContext::TypeDesc VExpr::column_type_to_type_desc(const TypeDescriptor& out.children.push_back(VExpr::column_type_to_type_desc(t)); } break; + case TYPE_STRUCT: + CHECK(type.children.size() >= 1); + out.type = FunctionContext::TYPE_STRUCT; + for (const auto& t : type.children) { + out.children.push_back(VExpr::column_type_to_type_desc(t)); + } + break; case TYPE_STRING: out.type = FunctionContext::TYPE_STRING; out.len = type.len; diff --git a/regression-test/data/load_p0/stream_load/struct_malformat.csv b/regression-test/data/load_p0/stream_load/struct_malformat.csv new file mode 100644 index 0000000000..8af8629e9e --- /dev/null +++ b/regression-test/data/load_p0/stream_load/struct_malformat.csv @@ -0,0 +1,5 @@ +1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1} +2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.1} +3|\N +4|"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.1 +5|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26", f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1 diff --git a/regression-test/data/load_p0/stream_load/struct_normal.csv b/regression-test/data/load_p0/stream_load/struct_normal.csv new file mode 100644 index 0000000000..fe82889afd --- /dev/null +++ b/regression-test/data/load_p0/stream_load/struct_normal.csv @@ -0,0 +1,13 @@ +1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1} +2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.1} +3|{'f1':1, 'f2':100, 'f3':100000, 'f4':'a', 'f5':"doris", 'f6':"2023-02-26", 'f7':null, 'f8':null, 'f9':null, 'f10':1.1} +4|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26", f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1} +5|{f1: 1, f2: 100, f3: 100000, f4: a, f5: doris, f6: 2023-02-26, f7: "2023-02-26 17:58", f8: 1.01, f9: 3.1415926, f10: 1.1} +6|{"f10":1.1, "f9":3.1415926, "f8":1.01, "f7":"2023-02-26 17:58", "f6":"2023-02-26", "f5":"doris", "f4":'a', "f3":100000, "f2":100, "f1":1} +7|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26, f5:doris, f4:a, f3:100000, f2:100, f1:1} +8|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26, f5:doris, f4:null, f3:null, f2:null, f1:1} +9|{"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, "f7":null, "f8":null, "f9":null, "f10":null} +10|{1, 100, 100000, 'a', "doris", "2023-02-26", "2023-02-26 17:58", 1.01, 3.1415926, 1.1} +11|{1, 100, 100000, 'a', "doris", "2023-02-26", null, null, null, 1.1} +12|{null, null, null, null, null, null, null, null, null, null} +13|\N diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out b/regression-test/data/load_p0/stream_load/test_stream_load.out index 72bc763d81..3e4ccd3fc5 100644 --- a/regression-test/data/load_p0/stream_load/test_stream_load.out +++ b/regression-test/data/load_p0/stream_load/test_stream_load.out @@ -69,6 +69,28 @@ 7 [1, 2, 3, 4, 5] \N \N \N \N \N \N \N \N \N 8 [1, 2, 3, 4, 5] \N \N \N \N \N [NULL] \N [NULL] \N +-- !all111 -- +1 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +2 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1} +3 \N +4 \N +5 \N + +-- !all112 -- +1 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +2 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1} +3 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1} +4 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +5 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +6 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +7 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +8 {1, NULL, NULL, NULL, 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +9 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL} +10 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01, 3.1415926, 1.1} +11 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1} +12 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL} +13 \N + -- !sql1 -- -2 -50 1 \N 44 2 -51 1 2 \N diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index a04d25622e..351bab3afd 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -186,12 +186,14 @@ suite("test_stream_load", "p0") { def tableName6 = "test_unique_key" def tableName7 = "test_unique_key_with_delete" def tableName8 = "test_array" + def tableName10 = "test_struct" sql """ DROP TABLE IF EXISTS ${tableName3} """ sql """ DROP TABLE IF EXISTS ${tableName4} """ sql """ DROP TABLE IF EXISTS ${tableName5} """ sql """ DROP TABLE IF EXISTS ${tableName6} """ sql """ DROP TABLE IF EXISTS ${tableName7} """ sql """ DROP TABLE IF EXISTS ${tableName8} """ + sql """ DROP TABLE IF EXISTS ${tableName10} """ sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( `k1` int(11) NULL, @@ -287,6 +289,28 @@ suite("test_stream_load", "p0") { "replication_allocation" = "tag.location.default: 1" ); """ + sql """ADMIN SET FRONTEND CONFIG ('enable_struct_type' = 'true');""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName10} ( + `k1` INT(11) NULL COMMENT "", + `k2` STRUCT< + f1:SMALLINT, + f2:INT(11), + f3:BIGINT, + f4:CHAR, + f5:VARCHAR(20), + f6:DATE, + f7:DATETIME, + f8:FLOAT, + f9:DOUBLE, + f10:DECIMAL(20, 6)> NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ // load all columns streamLoad { @@ -673,6 +697,86 @@ suite("test_stream_load", "p0") { } sql "sync" + // ===== test struct stream load + // malformat without strictmode + streamLoad { + table "${tableName10}" + + set 'column_separator', '|' + + file 'struct_malformat.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(5, json.NumberTotalRows) + assertEquals(5, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + sql "sync" + qt_all111 "SELECT * from ${tableName10} order by k1" // 5 + sql """truncate table ${tableName10}""" + sql """sync""" + + // malformat with strictmode + streamLoad { + table "${tableName10}" + + set 'column_separator', '|' + set 'strict_mode', 'true' + + file 'struct_malformat.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertEquals(5, json.NumberTotalRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(2, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + sql "sync" + + // normal load + streamLoad { + table "${tableName10}" + + set 'column_separator', '|' + + file 'struct_normal.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(13, json.NumberTotalRows) + assertEquals(13, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + sql "sync" + qt_all112 "SELECT * from ${tableName10} order by k1" // 10 + sql """truncate table ${tableName10}""" + sql """sync""" + // test immutable partition success def tableName9 = "test_immutable_partition" sql """ DROP TABLE IF EXISTS ${tableName9} """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org