This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git

The following commit(s) were added to refs/heads/master by this push:
     new fb9e48a34a [fix](vstream load) Fix bug when load json with jsonpath 
(#12660)
fb9e48a34a is described below

commit fb9e48a34a5c0a7f52fe032836de889094ef3121
Author: yinzhijian <373141...@qq.com>
AuthorDate: Mon Sep 19 10:13:18 2022 +0800

    [fix](vstream load) Fix bug when load json with jsonpath (#12660)
---
 be/src/vec/exec/vjson_scanner.cpp                  |  70 ++++++++-----
 be/test/vec/exec/vjson_scanner_test.cpp            |  13 +--
 .../stream_load/load_json_with_jsonpath.out        |  11 +++
 .../stream_load/test_load_with_jsonpath.json       |   1 +
 .../stream_load/load_json_with_jsonpath.groovy     | 109 +++++++++++++++++++++
 5 files changed, 168 insertions(+), 36 deletions(-)

diff --git a/be/src/vec/exec/vjson_scanner.cpp 
b/be/src/vec/exec/vjson_scanner.cpp
index b0f8ed5ba8..95f4c8658a 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -434,12 +434,15 @@ Status 
VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
                                                const 
std::vector<SlotDescriptor*>& slot_descs,
                                                std::vector<MutableColumnPtr>& 
columns,
                                                bool* valid) {
-    int nullcount = 0;
     int ctx_idx = 0;
-    size_t column_num = slot_descs.size();
-    for (size_t i = 0; i < column_num; i++) {
-        int dest_index = ctx_idx++;
-        auto* column_ptr = columns[dest_index].get();
+    bool has_valid_value = false;
+    size_t cur_row_count = columns[0]->size();
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        int i = ctx_idx++;
+        auto* column_ptr = columns[i].get();
         rapidjson::Value* json_values = nullptr;
         bool wrap_explicitly = false;
         if (LIKELY(i < _parsed_jsonpaths.size())) {
@@ -450,16 +453,12 @@ Status 
VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
 
         if (json_values == nullptr) {
             // not match in jsondata.
-            if (slot_descs[i]->is_nullable()) {
-                auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
-                nullable_column->insert_default();
-                nullcount++;
-            } else {
+            if (!slot_descs[i]->is_nullable()) {
                 RETURN_IF_ERROR(_append_error_msg(
                         objectValue,
                         "The column `{}` is not nullable, but it's not found 
in jsondata.",
                         slot_descs[i]->col_name(), valid));
-                break;
+                return Status::OK();
             }
         } else {
             CHECK(json_values->IsArray());
@@ -471,15 +470,29 @@ Status 
VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
             }
             RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], 
column_ptr, valid));
             if (!(*valid)) {
-                break;
+                return Status::OK();
             }
+            has_valid_value = true;
         }
     }
-
-    if (nullcount == column_num) {
+    if (!has_valid_value) {
         RETURN_IF_ERROR(_append_error_msg(
                 objectValue, "All fields is null or not matched, this is a 
invalid row.", "",
                 valid));
+        return Status::OK();
+    }
+    ctx_idx = 0;
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        int dest_index = ctx_idx++;
+        auto* column_ptr = columns[dest_index].get();
+        if (column_ptr->size() < cur_row_count + 1) {
+            DCHECK(column_ptr->size() == cur_row_count);
+            column_ptr->assume_mutable()->insert_default();
+        }
+        DCHECK(column_ptr->size() == cur_row_count + 1);
     }
     return Status::OK();
 }
@@ -1062,9 +1075,10 @@ Status VSIMDJsonReader::_generate_json_paths(const 
std::string& jsonpath,
 Status VSIMDJsonReader::_write_columns_by_jsonpath(simdjson::ondemand::value 
value,
                                                    const 
std::vector<SlotDescriptor*>& slot_descs,
                                                    Block& block, bool* valid) {
-    int nullcount = 0;
     size_t column_num = slot_descs.size();
     auto object_value = value.get_object();
+    bool has_valid_value = false;
+    size_t cur_row_count = block.rows();
     for (size_t i = 0; i < column_num; i++) {
         auto* column_ptr = 
block.get_by_position(i).column->assume_mutable().get();
         simdjson::simdjson_result<simdjson::ondemand::value> json_value;
@@ -1074,29 +1088,35 @@ Status 
VSIMDJsonReader::_write_columns_by_jsonpath(simdjson::ondemand::value val
         }
         if (i >= _parsed_jsonpaths.size() || json_value.error() != 
simdjson::error_code::SUCCESS) {
             // not match in jsondata.
-            if (slot_descs[i]->is_nullable()) {
-                auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
-                nullable_column->insert_default();
-                nullcount++;
-            } else {
+            if (!slot_descs[i]->is_nullable()) {
                 RETURN_IF_ERROR(_append_error_msg(
                         "The column `{}` is not nullable, but it's not found 
in jsondata.",
                         slot_descs[i]->col_name(), valid));
-                break;
+                return Status::OK();
             }
         } else {
             RETURN_IF_ERROR(
                     _write_data_to_column(json_value.value(), slot_descs[i], 
column_ptr, valid));
             if (!(*valid)) {
-                break;
+                return Status::OK();
             }
+            has_valid_value = true;
         }
         object_value.reset();
     }
+    if (!has_valid_value) {
+        RETURN_IF_ERROR(_append_error_msg("All fields is null, this is a 
invalid row.", "", valid));
+        return Status::OK();
+    }
 
-    if (nullcount == column_num) {
-        RETURN_IF_ERROR(_append_error_msg(
-                "All fields is null or not matched, this is a invalid row.", 
"", valid));
+    // fill missing slot
+    for (const auto& column_type_name : block) {
+        auto column = column_type_name.column;
+        if (column->size() < cur_row_count + 1) {
+            DCHECK(column->size() == cur_row_count);
+            column->assume_mutable()->insert_default();
+        }
+        DCHECK(column->size() == cur_row_count + 1);
     }
     return Status::OK();
 }
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp 
b/be/test/vec/exec/vjson_scanner_test.cpp
index 673199cc92..f06b8233d6 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -790,17 +790,8 @@ TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) {
         vectorized::Block block;
         status = scan_node.get_next(&_runtime_state, &block, &eof);
         EXPECT_TRUE(status.ok());
-        EXPECT_EQ(2, block.rows());
-        EXPECT_EQ(6, block.columns());
-
-        auto columns = block.get_columns_with_type_and_name();
-        ASSERT_EQ(columns.size(), 6);
-        ASSERT_EQ(columns[0].to_string(0), "NULL");
-        ASSERT_EQ(columns[0].to_string(1), "NULL");
-        ASSERT_EQ(columns[1].to_string(0), "NULL");
-        ASSERT_EQ(columns[1].to_string(1), "NULL");
-        ASSERT_EQ(columns[2].to_string(0), "NULL");
-        ASSERT_EQ(columns[2].to_string(1), "NULL");
+        EXPECT_EQ(0, block.rows());
+        EXPECT_EQ(0, block.columns());
         block.clear();
         scan_node.close(&_runtime_state);
     };
diff --git 
a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out 
b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
new file mode 100644
index 0000000000..43037b624d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+22     \N
+1000   7395.231067
+2000   \N
+
+-- !select --
+22     \N
+1000   7395.231067
+2000   \N
+
diff --git 
a/regression-test/data/load_p0/stream_load/test_load_with_jsonpath.json 
b/regression-test/data/load_p0/stream_load/test_load_with_jsonpath.json
new file mode 100644
index 0000000000..b18d3a0f4b
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_load_with_jsonpath.json
@@ -0,0 +1 @@
+ [{"v1": "7395.231067", "k1": "1000"}, {"v2": "7291.703724", "k1": "2000"}, 
{"k1": "22", "v2": "7291.703724", "k2": "2000"}, {"tinyint_key": null}]
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy 
b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
new file mode 100644
index 0000000000..c9e8732960
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_json_with_jsonpath", "p0") {
+    // define a sql table
+    def testTable = "tbl_test_json_load"
+    def dbName = "test_query_db"
+
+    def create_test_table = {enable_vectorized_flag ->
+        if (enable_vectorized_flag) {
+            sql """ set enable_vectorized_engine = true """
+        } else {
+            sql """ set enable_vectorized_engine = false """
+        }
+
+        def result1 = sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+              `k1` INT NULL COMMENT "",
+              `v1` DOUBLE NULL COMMENT ""
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "storage_format" = "V2"
+            )
+            """
+    }
+
+    def load_array_data = {table_name, strip_flag, read_flag, format_flag, 
exprs, json_paths,
+                            json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
+        // load the json data
+        streamLoad {
+            table table_name
+
+            // set http request header params
+            set 'strip_outer_array', strip_flag
+            set 'read_json_by_line', read_flag
+            set 'format', format_flag
+            set 'columns', exprs
+            set 'jsonpaths', json_paths
+            set 'json_root', json_root
+            set 'where', where_expr
+            set 'fuzzy_parse', fuzzy_flag
+            set 'column_separator', column_sep
+            set 'max_filter_ratio', '1'
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows
+                             + json.NumberFilteredRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+
+    // case1: import array data in json format and enable vectorized engine
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table.call(true)
+
+        load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", 
"$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+
+        // select the table and check whether the data is correct
+        qt_select "select * from ${testTable} order by k1"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case2: import array data in json format and disable vectorized engine
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table.call(false)
+
+        load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", 
"$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+
+        // select the table and check whether the data is correct
+        qt_select "select * from ${testTable} order by k1"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to