This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cb7e8bf1aa6 [fix](simd-json-reader) fix SIMD json reader lose data and 
support stream parser (#35781)
cb7e8bf1aa6 is described below

commit cb7e8bf1aa619e2c3b8b10d84e0721e543bc092a
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Tue Jun 4 10:06:31 2024 +0800

    [fix](simd-json-reader) fix SIMD json reader lose data and support stream 
parser (#35781)
    
    When load json with do not set read_json_by_line, only one json loaded.
    
    
![image](https://github.com/apache/doris/assets/77738092/01840ebd-1533-4a59-9c1b-9472e6811d44)
    
    But there are three rows json
    
    {"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
    
    {"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
    
    {"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
    
    Two rows lost.
---
 be/src/vec/exec/format/json/new_json_reader.cpp    | 187 ++++++++++++---------
 be/src/vec/exec/format/json/new_json_reader.h      |   6 +-
 .../load_p0/stream_load/iterate_read_json.json     |   3 +
 .../data/load_p0/stream_load/test_json_load.out    |   5 +
 .../load_p0/stream_load/test_json_load.groovy      |  14 +-
 5 files changed, 129 insertions(+), 86 deletions(-)

diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index b658e15e467..a765149384d 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1058,6 +1058,7 @@ Status 
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
     size_t size = 0;
     simdjson::error_code error;
     size_t num_rows = block.rows();
+
     try {
         // step1: get and parse buf to get json doc
         RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
@@ -1066,19 +1067,25 @@ Status 
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
             return Status::OK();
         }
 
-        // step2: get json value by json doc
-        Status st = _get_json_value(&size, eof, &error, is_empty_row);
-        if (st.is<DATA_QUALITY_ERROR>()) {
-            return Status::OK();
-        }
-        RETURN_IF_ERROR(st);
-        if (*is_empty_row || *eof) {
-            return Status::OK();
-        }
+        for (_json_stream_iterator = _json_stream.begin();
+             _json_stream_iterator != _json_stream.end(); 
++_json_stream_iterator) {
+            if (_json_stream_iterator.current_index() >= _original_doc_size) {
+                break;
+            }
+            // step2: get json value by json doc
+            Status st = _get_json_value(&size, eof, &error, is_empty_row);
+            if (st.is<DATA_QUALITY_ERROR>()) {
+                return Status::OK();
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row || *eof) {
+                return Status::OK();
+            }
 
-        // step 3: write columns by json value
-        RETURN_IF_ERROR(
-                _simdjson_handle_simple_json_write_columns(block, slot_descs, 
is_empty_row, eof));
+            // step 3: write columns by json value
+            RETURN_IF_ERROR(_simdjson_handle_simple_json_write_columns(block, 
slot_descs,
+                                                                       
is_empty_row, eof));
+        }
     } catch (simdjson::simdjson_error& e) {
         RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
         if (*_scanner_eof) {
@@ -1168,19 +1175,25 @@ Status 
NewJsonReader::_simdjson_handle_flat_array_complex_json(
             return Status::OK();
         }
 
-        // step2: get json value by json doc
-        Status st = _get_json_value(&size, eof, &error, is_empty_row);
-        if (st.is<DATA_QUALITY_ERROR>()) {
-            return Status::OK();
-        }
-        RETURN_IF_ERROR(st);
-        if (*is_empty_row) {
-            return Status::OK();
-        }
+        for (_json_stream_iterator = _json_stream.begin();
+             _json_stream_iterator != _json_stream.end(); 
++_json_stream_iterator) {
+            if (_json_stream_iterator.current_index() >= _original_doc_size) {
+                break;
+            }
+            // step2: get json value by json doc
+            Status st = _get_json_value(&size, eof, &error, is_empty_row);
+            if (st.is<DATA_QUALITY_ERROR>()) {
+                return Status::OK();
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row) {
+                return Status::OK();
+            }
 
-        // step 3: write columns by json value
-        
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, 
slot_descs,
-                                                                               
is_empty_row, eof));
+            // step 3: write columns by json value
+            
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(
+                    block, slot_descs, is_empty_row, eof));
+        }
     } catch (simdjson::simdjson_error& e) {
         RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
         if (*_scanner_eof) {
@@ -1256,20 +1269,26 @@ Status 
NewJsonReader::_simdjson_handle_nested_complex_json(
         RuntimeState* /*state*/, Block& block, const 
std::vector<SlotDescriptor*>& slot_descs,
         bool* is_empty_row, bool* eof) {
     // nested complex json
-    while (true) {
-        size_t num_rows = block.rows();
-        simdjson::ondemand::object cur;
-        size_t size = 0;
-        simdjson::error_code error;
-        try {
-            RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
-            if (size == 0 || *eof) {
-                *is_empty_row = true;
-                return Status::OK();
+    size_t num_rows = block.rows();
+    simdjson::ondemand::object cur;
+    size_t size = 0;
+    simdjson::error_code error;
+
+    try {
+        RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+        if (size == 0 || *eof) {
+            *is_empty_row = true;
+            return Status::OK();
+        }
+
+        for (_json_stream_iterator = _json_stream.begin();
+             _json_stream_iterator != _json_stream.end(); 
++_json_stream_iterator) {
+            if (_json_stream_iterator.current_index() >= _original_doc_size) {
+                break;
             }
             Status st = _get_json_value(&size, eof, &error, is_empty_row);
             if (st.is<DATA_QUALITY_ERROR>()) {
-                continue; // continue to read next
+                return Status::OK();
             }
             RETURN_IF_ERROR(st);
             if (*is_empty_row) {
@@ -1299,16 +1318,14 @@ Status 
NewJsonReader::_simdjson_handle_nested_complex_json(
                 // so that the caller will continue reading next line.
                 *is_empty_row = true;
             }
-            break; // read a valid row
-        } catch (simdjson::simdjson_error& e) {
-            RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
-            if (*_scanner_eof) {
-                // When _scanner_eof is true and valid is false, it means that 
we have encountered
-                // unqualified data and decided to stop the scan.
-                *is_empty_row = true;
-                return Status::OK();
-            }
-            continue;
+        }
+    } catch (simdjson::simdjson_error& e) {
+        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+        if (*_scanner_eof) {
+            // When _scanner_eof is true and valid is false, it means that we 
have encountered
+            // unqualified data and decided to stop the scan.
+            *is_empty_row = true;
+            return Status::OK();
         }
     }
     return Status::OK();
@@ -1515,14 +1532,13 @@ Status NewJsonReader::_simdjson_parse_json(size_t* 
size, bool* is_empty_row, boo
         return Status::OK();
     }
 
-    // step2: init json parser iterate.
-    if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
-        // For efficiency reasons, simdjson requires a string with a few bytes 
(simdjson::SIMDJSON_PADDING) at the end.
-        // Hence, a re-allocation is needed if the space is not enough.
-        _simdjson_ondemand_padding_buffer.resize(*size + 
simdjson::SIMDJSON_PADDING);
-        _simdjson_ondemand_unscape_padding_buffer.resize(*size + 
simdjson::SIMDJSON_PADDING);
-        _padded_size = *size + simdjson::SIMDJSON_PADDING;
-    }
+    // step2: init json stream iterate.
+    // For efficiency reasons, simdjson requires a string with a few bytes 
(simdjson::SIMDJSON_PADDING) at the end.
+    _simdjson_ondemand_padding_buffer.clear();
+    _padded_size = *size + simdjson::SIMDJSON_PADDING;
+    _simdjson_ondemand_padding_buffer.resize(_padded_size);
+    _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
+
     // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
     if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
         static_cast<char>(_json_str[1]) == '\xBB' && 
static_cast<char>(_json_str[2]) == '\xBF') {
@@ -1532,10 +1548,16 @@ Status NewJsonReader::_simdjson_parse_json(size_t* 
size, bool* is_empty_row, boo
     }
     memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
     _original_doc_size = *size;
-    *error = _ondemand_json_parser
-                     
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
-                               _padded_size)
-                     .get(_original_json_doc);
+
+    *error = 
_ondemand_json_parser->iterate_many(_simdjson_ondemand_padding_buffer)
+                     .get(_json_stream);
+    if (*error != simdjson::error_code::SUCCESS) {
+        fmt::memory_buffer error_msg;
+        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
+                       *error, simdjson::error_message(*error));
+        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
+    }
+
     return Status::OK();
 }
 
@@ -1557,43 +1579,40 @@ Status NewJsonReader::_judge_empty_row(size_t size, 
bool eof, bool* is_empty_row
     return Status::OK();
 }
 
+Status NewJsonReader::_return_quality_error(fmt::memory_buffer& error_msg,
+                                            const std::string& doc_info, bool* 
eof) {
+    RETURN_IF_ERROR(_state->append_error_msg_to_file(
+            [&]() -> std::string { return doc_info; },
+            [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
+    _counter->num_rows_filtered++;
+    if (*_scanner_eof) {
+        // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
+        // we meet enough invalid rows and the scanner should be stopped.
+        // So we set eof to true and return OK, the caller will stop the 
process as we meet the end of file.
+        *eof = true;
+        return Status::OK();
+    }
+    return Status::DataQualityError(fmt::to_string(error_msg));
+}
+
 Status NewJsonReader::_get_json_value(size_t* size, bool* eof, 
simdjson::error_code* error,
                                       bool* is_empty_row) {
     SCOPED_TIMER(_file_read_timer);
-    auto return_quality_error = [&](fmt::memory_buffer& error_msg,
-                                    const std::string& doc_info) -> Status {
-        RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                [&]() -> std::string { return doc_info; },
-                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
-        _counter->num_rows_filtered++;
-        if (*_scanner_eof) {
-            // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
-            // we meet enough invalid rows and the scanner should be stopped.
-            // So we set eof to true and return OK, the caller will stop the 
process as we meet the end of file.
-            *eof = true;
-            return Status::OK();
-        }
-        return Status::DataQualityError(fmt::to_string(error_msg));
-    };
-    if (*error != simdjson::error_code::SUCCESS) {
-        fmt::memory_buffer error_msg;
-        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
-                       *error, simdjson::error_message(*error));
-        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
-    }
+    _original_json_doc = (*_json_stream_iterator).value();
+
     auto type_res = _original_json_doc.type();
     if (type_res.error() != simdjson::error_code::SUCCESS) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
                        type_res.error(), 
simdjson::error_message(type_res.error()));
-        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
+        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
     }
     simdjson::ondemand::json_type type = type_res.value();
     if (type != simdjson::ondemand::json_type::object &&
         type != simdjson::ondemand::json_type::array) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Not an json object or json array");
-        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
+        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
     }
     if (!_parsed_json_root.empty() && type == 
simdjson::ondemand::json_type::object) {
         try {
@@ -1605,13 +1624,13 @@ Status NewJsonReader::_get_json_value(size_t* size, 
bool* eof, simdjson::error_c
             if (!st.ok()) {
                 fmt::memory_buffer error_msg;
                 fmt::format_to(error_msg, "{}", st.to_string());
-                return return_quality_error(error_msg, 
std::string((char*)_json_str, *size));
+                return _return_quality_error(error_msg, 
std::string((char*)_json_str, *size), eof);
             }
         } catch (simdjson::simdjson_error& e) {
             fmt::memory_buffer error_msg;
             fmt::format_to(error_msg, "Encounter error while 
extract_from_object, error: {}",
                            e.what());
-            return return_quality_error(error_msg, 
std::string((char*)_json_str, *size));
+            return _return_quality_error(error_msg, 
std::string((char*)_json_str, *size), eof);
         }
     } else {
         _json_value = _original_json_doc;
@@ -1621,14 +1640,14 @@ Status NewJsonReader::_get_json_value(size_t* size, 
bool* eof, simdjson::error_c
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is array-object, `strip_outer_array` must be 
TRUE.");
-        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
+        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
     }
 
     if (_json_value.type() != simdjson::ondemand::json_type::array && 
_strip_outer_array) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is not an array-object, `strip_outer_array` 
must be FALSE.");
-        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
+        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
     }
     RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
     return Status::OK();
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index f044e06e62e..2a8b428db9f 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -147,6 +147,8 @@ private:
     Status _simdjson_init_reader();
     Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
                                 simdjson::error_code* error);
+    Status _return_quality_error(fmt::memory_buffer& error_msg, const 
std::string& doc_info,
+                                 bool* eof);
     Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* 
error,
                            bool* is_empty_row);
     Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
@@ -273,8 +275,10 @@ private:
     std::string _simdjson_ondemand_padding_buffer;
     std::string _simdjson_ondemand_unscape_padding_buffer;
     // char _simdjson_ondemand_padding_buffer[_padded_size];
-    simdjson::ondemand::document _original_json_doc;
+    simdjson::ondemand::document_reference _original_json_doc;
     simdjson::ondemand::value _json_value;
+    simdjson::ondemand::document_stream _json_stream;
+    simdjson::ondemand::document_stream::iterator _json_stream_iterator;
     // for strip outer array
     // array_iter pointed to _array
     simdjson::ondemand::array_iterator _array_iter;
diff --git a/regression-test/data/load_p0/stream_load/iterate_read_json.json 
b/regression-test/data/load_p0/stream_load/iterate_read_json.json
new file mode 100644
index 00000000000..f6c0d634efe
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/iterate_read_json.json
@@ -0,0 +1,3 @@
+{"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
+{"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
+{"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out 
b/regression-test/data/load_p0/stream_load/test_json_load.out
index 9f064572f13..4c5dfc81f87 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -245,6 +245,11 @@ John       30      New York        
{"email":"j...@example.com","phone":"+1-123-456-7890"}
 android        \N      \N      \N      \N      \N
 android        \N      \N      \N      \N      \N
 
+-- !iterate_read_json --
+Name1  21      5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
+Name2  21      5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
+Name3  21      5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
+
 -- !select28 --
 test   k2_value
 
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index da6bea1afcd..8971b27f662 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -787,7 +787,6 @@ suite("test_json_load", "p0") {
                 assertEquals("${reason}", "${out}")
             }
         }
-
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
@@ -823,7 +822,20 @@ suite("test_json_load", "p0") {
                 assertEquals("${reason}", "${out}")
             }
         }
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+      
+    // iterate read json when read_json_by_line = false
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
 
+        create_json_test_table.call(testTable)
+        def test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
+        load_json_data.call("${testTable}", test_load_label, 'false', 'false', 
'json', '', '', '', '', '', 'iterate_read_json.json')
+        sql "sync" 
+        
+        qt_iterate_read_json "select * from ${testTable} order by name"
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to