This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fb9e48a34a [fix](vstream load) Fix bug when load json with jsonpath (#12660) fb9e48a34a is described below commit fb9e48a34a5c0a7f52fe032836de889094ef3121 Author: yinzhijian <373141...@qq.com> AuthorDate: Mon Sep 19 10:13:18 2022 +0800 [fix](vstream load) Fix bug when load json with jsonpath (#12660) --- be/src/vec/exec/vjson_scanner.cpp | 70 ++++++++----- be/test/vec/exec/vjson_scanner_test.cpp | 13 +-- .../stream_load/load_json_with_jsonpath.out | 11 +++ .../stream_load/test_load_with_jsonpath.json | 1 + .../stream_load/load_json_with_jsonpath.groovy | 109 +++++++++++++++++++++ 5 files changed, 168 insertions(+), 36 deletions(-) diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index b0f8ed5ba8..95f4c8658a 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -434,12 +434,15 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, const std::vector<SlotDescriptor*>& slot_descs, std::vector<MutableColumnPtr>& columns, bool* valid) { - int nullcount = 0; int ctx_idx = 0; - size_t column_num = slot_descs.size(); - for (size_t i = 0; i < column_num; i++) { - int dest_index = ctx_idx++; - auto* column_ptr = columns[dest_index].get(); + bool has_valid_value = false; + size_t cur_row_count = columns[0]->size(); + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + int i = ctx_idx++; + auto* column_ptr = columns[i].get(); rapidjson::Value* json_values = nullptr; bool wrap_explicitly = false; if (LIKELY(i < _parsed_jsonpaths.size())) { @@ -450,16 +453,12 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, if (json_values == nullptr) { // not match in jsondata. - if (slot_descs[i]->is_nullable()) { - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr); - nullable_column->insert_default(); - nullcount++; - } else { + if (!slot_descs[i]->is_nullable()) { RETURN_IF_ERROR(_append_error_msg( objectValue, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name(), valid)); - break; + return Status::OK(); } } else { CHECK(json_values->IsArray()); @@ -471,15 +470,29 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, } RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid)); if (!(*valid)) { - break; + return Status::OK(); } + has_valid_value = true; } } - - if (nullcount == column_num) { + if (!has_valid_value) { RETURN_IF_ERROR(_append_error_msg( objectValue, "All fields is null or not matched, this is a invalid row.", "", valid)); + return Status::OK(); + } + ctx_idx = 0; + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + if (column_ptr->size() < cur_row_count + 1) { + DCHECK(column_ptr->size() == cur_row_count); + column_ptr->assume_mutable()->insert_default(); + } + DCHECK(column_ptr->size() == cur_row_count + 1); } return Status::OK(); } @@ -1062,9 +1075,10 @@ Status VSIMDJsonReader::_generate_json_paths(const std::string& jsonpath, Status VSIMDJsonReader::_write_columns_by_jsonpath(simdjson::ondemand::value value, const std::vector<SlotDescriptor*>& slot_descs, Block& block, bool* valid) { - int nullcount = 0; size_t column_num = slot_descs.size(); auto object_value = value.get_object(); + bool has_valid_value = false; + size_t cur_row_count = block.rows(); for (size_t i = 0; i < column_num; i++) { auto* column_ptr = block.get_by_position(i).column->assume_mutable().get(); simdjson::simdjson_result<simdjson::ondemand::value> json_value; @@ -1074,29 +1088,35 @@ Status VSIMDJsonReader::_write_columns_by_jsonpath(simdjson::ondemand::value val } if (i >= _parsed_jsonpaths.size() || json_value.error() != simdjson::error_code::SUCCESS) { // not match in jsondata. - if (slot_descs[i]->is_nullable()) { - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr); - nullable_column->insert_default(); - nullcount++; - } else { + if (!slot_descs[i]->is_nullable()) { RETURN_IF_ERROR(_append_error_msg( "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name(), valid)); - break; + return Status::OK(); } } else { RETURN_IF_ERROR( _write_data_to_column(json_value.value(), slot_descs[i], column_ptr, valid)); if (!(*valid)) { - break; + return Status::OK(); } + has_valid_value = true; } object_value.reset(); } + if (!has_valid_value) { + RETURN_IF_ERROR(_append_error_msg("All fields is null, this is a invalid row.", "", valid)); + return Status::OK(); + } - if (nullcount == column_num) { - RETURN_IF_ERROR(_append_error_msg( - "All fields is null or not matched, this is a invalid row.", "", valid)); + // fill missing slot + for (const auto& column_type_name : block) { + auto column = column_type_name.column; + if (column->size() < cur_row_count + 1) { + DCHECK(column->size() == cur_row_count); + column->assume_mutable()->insert_default(); + } + DCHECK(column->size() == cur_row_count + 1); } return Status::OK(); } diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index 673199cc92..f06b8233d6 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -790,17 +790,8 @@ TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) { vectorized::Block block; status = scan_node.get_next(&_runtime_state, &block, &eof); EXPECT_TRUE(status.ok()); - EXPECT_EQ(2, block.rows()); - EXPECT_EQ(6, block.columns()); - - auto columns = block.get_columns_with_type_and_name(); - ASSERT_EQ(columns.size(), 6); - ASSERT_EQ(columns[0].to_string(0), "NULL"); - ASSERT_EQ(columns[0].to_string(1), "NULL"); - ASSERT_EQ(columns[1].to_string(0), "NULL"); - ASSERT_EQ(columns[1].to_string(1), "NULL"); - ASSERT_EQ(columns[2].to_string(0), "NULL"); - ASSERT_EQ(columns[2].to_string(1), "NULL"); + EXPECT_EQ(0, block.rows()); + EXPECT_EQ(0, block.columns()); block.clear(); scan_node.close(&_runtime_state); }; diff --git a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out new file mode 100644 index 0000000000..43037b624d --- /dev/null +++ b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +22 \N +1000 7395.231067 +2000 \N + +-- !select -- +22 \N +1000 7395.231067 +2000 \N + diff --git a/regression-test/data/load_p0/stream_load/test_load_with_jsonpath.json b/regression-test/data/load_p0/stream_load/test_load_with_jsonpath.json new file mode 100644 index 0000000000..b18d3a0f4b --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_load_with_jsonpath.json @@ -0,0 +1 @@ + [{"v1": "7395.231067", "k1": "1000"}, {"v2": "7291.703724", "k1": "2000"}, {"k1": "22", "v2": "7291.703724", "k2": "2000"}, {"tinyint_key": null}] diff --git a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy new file mode 100644 index 0000000000..c9e8732960 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_load_json_with_jsonpath", "p0") { + // define a sql table + def testTable = "tbl_test_json_load" + def dbName = "test_query_db" + + def create_test_table = {enable_vectorized_flag -> + if (enable_vectorized_flag) { + sql """ set enable_vectorized_engine = true """ + } else { + sql """ set enable_vectorized_engine = false """ + } + + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTable} ( + `k1` INT NULL COMMENT "", + `v1` DOUBLE NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2" + ) + """ + } + + def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths, + json_root, where_expr, fuzzy_flag, column_sep, file_name -> + // load the json data + streamLoad { + table table_name + + // set http request header params + set 'strip_outer_array', strip_flag + set 'read_json_by_line', read_flag + set 'format', format_flag + set 'columns', exprs + set 'jsonpaths', json_paths + set 'json_root', json_root + set 'where', where_expr + set 'fuzzy_parse', fuzzy_flag + set 'column_separator', column_sep + set 'max_filter_ratio', '1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + + json.NumberFilteredRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + // case1: import array data in json format and enable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call(true) + + load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') + + // select the table and check whether the data is correct + qt_select "select * from ${testTable} order by k1" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case2: import array data in json format and disable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call(false) + + load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') + + // select the table and check whether the data is correct + qt_select "select * from ${testTable} order by k1" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org