This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e910e29781 [refactor](simd_json_reader) refactor simd json reader to 
adapt to parse multi json (#27272)
3e910e29781 is described below

commit 3e910e297818f9df11f4c815ff1c96f5e117d41f
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Thu Nov 30 15:01:06 2023 +0800

    [refactor](simd_json_reader) refactor simd json reader to adapt to parse 
multi json (#27272)
---
 be/src/vec/exec/format/json/new_json_reader.cpp | 126 ++++++++++++++----------
 be/src/vec/exec/format/json/new_json_reader.h   |   9 +-
 2 files changed, 82 insertions(+), 53 deletions(-)

diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index f735b1a74ba..83b78946486 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1017,9 +1017,17 @@ Status 
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
     size_t num_rows = block.rows();
     do {
         bool valid = false;
+        size_t size = 0;
+        simdjson::error_code error;
         try {
             if (_next_row >= _total_rows) { // parse json and generic document
-                Status st = _simdjson_parse_json(is_empty_row, eof);
+                RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+                if (size == 0 || *eof) {
+                    *is_empty_row = true;
+                    return Status::OK();
+                }
+
+                Status st = _get_json_value(&size, eof, &error, is_empty_row);
                 if (st.is<DATA_QUALITY_ERROR>()) {
                     continue; // continue to read next
                 }
@@ -1126,9 +1134,16 @@ Status 
NewJsonReader::_simdjson_handle_flat_array_complex_json(
     size_t num_rows = block.rows();
     simdjson::ondemand::object cur;
     do {
+        size_t size = 0;
+        simdjson::error_code error;
         try {
             if (_next_row >= _total_rows) {
-                Status st = _simdjson_parse_json(is_empty_row, eof);
+                RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+                if (size == 0 || *eof) {
+                    *is_empty_row = true;
+                    return Status::OK();
+                }
+                Status st = _get_json_value(&size, eof, &error, is_empty_row);
                 if (st.is<DATA_QUALITY_ERROR>()) {
                     continue; // continue to read next
                 }
@@ -1212,8 +1227,15 @@ Status 
NewJsonReader::_simdjson_handle_nested_complex_json(
     while (true) {
         size_t num_rows = block.rows();
         simdjson::ondemand::object cur;
+        size_t size = 0;
+        simdjson::error_code error;
         try {
-            Status st = _simdjson_parse_json(is_empty_row, eof);
+            RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+            if (size == 0 || *eof) {
+                *is_empty_row = true;
+                return Status::OK();
+            }
+            Status st = _get_json_value(&size, eof, &error, is_empty_row);
             if (st.is<DATA_QUALITY_ERROR>()) {
                 continue; // continue to read next
             }
@@ -1451,48 +1473,26 @@ Status 
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
     return Status::OK();
 }
 
-Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
-    size_t size = 0;
-    RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
-
-    // read all data, then return
-    if (size == 0 || *eof) {
-        *is_empty_row = true;
-        return Status::OK();
-    }
-
-    if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
-        _total_rows = _json_value.count_elements().value();
-        _next_row = 0;
-
-        if (_total_rows == 0) {
-            // meet an empty json array.
-            *is_empty_row = true;
-        }
-    }
-    return Status::OK();
-}
-Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
-    // read a whole message
+Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, 
bool* eof,
+                                           simdjson::error_code* error) {
     SCOPED_TIMER(_file_read_timer);
-    const uint8_t* json_str = nullptr;
-    std::unique_ptr<uint8_t[]> json_str_ptr;
+    // step1: read buf from pipe.
     if (_line_reader != nullptr) {
-        RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof, 
_io_ctx));
+        RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, 
_io_ctx));
     } else {
         size_t length = 0;
-        RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
-        json_str = json_str_ptr.get();
+        RETURN_IF_ERROR(_read_one_message(&_json_str_ptr, &length));
+        _json_str = _json_str_ptr.get();
         *size = length;
         if (length == 0) {
             *eof = true;
         }
     }
-
-    _bytes_read_counter += *size;
     if (*eof) {
         return Status::OK();
     }
+
+    // step2: init json parser iterate.
     if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
         // For efficiency reasons, simdjson requires a string with a few bytes 
(simdjson::SIMDJSON_PADDING) at the end.
         // Hence, a re-allocation is needed if the space is not enough.
@@ -1501,19 +1501,42 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* 
size, bool* eof) {
         _padded_size = *size + simdjson::SIMDJSON_PADDING;
     }
     // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
-    if (*size >= 3 && static_cast<char>(json_str[0]) == '\xEF' &&
-        static_cast<char>(json_str[1]) == '\xBB' && 
static_cast<char>(json_str[2]) == '\xBF') {
+    if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
+        static_cast<char>(_json_str[1]) == '\xBB' && 
static_cast<char>(_json_str[2]) == '\xBF') {
         // skip the first three BOM bytes
-        json_str += 3;
+        _json_str += 3;
         *size -= 3;
     }
-    memcpy(&_simdjson_ondemand_padding_buffer.front(), json_str, *size);
+    memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
     _original_doc_size = *size;
-    auto error =
-            _ondemand_json_parser
-                    
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
-                              _padded_size)
-                    .get(_original_json_doc);
+    *error = _ondemand_json_parser
+                     
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
+                               _padded_size)
+                     .get(_original_json_doc);
+    return Status::OK();
+}
+
+Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* 
is_empty_row) {
+    if (size == 0 || eof) {
+        *is_empty_row = true;
+        return Status::OK();
+    }
+
+    if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+        _total_rows = _json_value.count_elements().value();
+        _next_row = 0;
+
+        if (_total_rows == 0) {
+            // meet an empty json array.
+            *is_empty_row = true;
+        }
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::_get_json_value(size_t* size, bool* eof, 
simdjson::error_code* error,
+                                      bool* is_empty_row) {
+    SCOPED_TIMER(_file_read_timer);
     auto return_quality_error = [&](fmt::memory_buffer& error_msg,
                                     const std::string& doc_info) -> Status {
         RETURN_IF_ERROR(_state->append_error_msg_to_file(
@@ -1529,25 +1552,25 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* 
size, bool* eof) {
         }
         return Status::DataQualityError(fmt::to_string(error_msg));
     };
-    if (error != simdjson::error_code::SUCCESS) {
+    if (*error != simdjson::error_code::SUCCESS) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
-                       error, simdjson::error_message(error));
-        return return_quality_error(error_msg, std::string((char*)json_str, 
*size));
+                       *error, simdjson::error_message(*error));
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
     auto type_res = _original_json_doc.type();
     if (type_res.error() != simdjson::error_code::SUCCESS) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
                        type_res.error(), 
simdjson::error_message(type_res.error()));
-        return return_quality_error(error_msg, std::string((char*)json_str, 
*size));
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
     simdjson::ondemand::json_type type = type_res.value();
     if (type != simdjson::ondemand::json_type::object &&
         type != simdjson::ondemand::json_type::array) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Not an json object or json array");
-        return return_quality_error(error_msg, std::string((char*)json_str, 
*size));
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
     if (!_parsed_json_root.empty() && type == 
simdjson::ondemand::json_type::object) {
         try {
@@ -1559,13 +1582,13 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* 
size, bool* eof) {
             if (!st.ok()) {
                 fmt::memory_buffer error_msg;
                 fmt::format_to(error_msg, "{}", st.to_string());
-                return return_quality_error(error_msg, 
std::string((char*)json_str, *size));
+                return return_quality_error(error_msg, 
std::string((char*)_json_str, *size));
             }
         } catch (simdjson::simdjson_error& e) {
             fmt::memory_buffer error_msg;
             fmt::format_to(error_msg, "Encounter error while 
extract_from_object, error: {}",
                            e.what());
-            return return_quality_error(error_msg, 
std::string((char*)json_str, *size));
+            return return_quality_error(error_msg, 
std::string((char*)_json_str, *size));
         }
     } else {
         _json_value = _original_json_doc;
@@ -1575,15 +1598,16 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* 
size, bool* eof) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is array-object, `strip_outer_array` must be 
TRUE.");
-        return return_quality_error(error_msg, std::string((char*)json_str, 
*size));
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
 
     if (_json_value.type() != simdjson::ondemand::json_type::array && 
_strip_outer_array) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is not an array-object, `strip_outer_array` 
must be FALSE.");
-        return return_quality_error(error_msg, std::string((char*)json_str, 
*size));
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
+    RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 708bceef934..33a0ae0ba58 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -141,8 +141,11 @@ private:
 
     // simdjson, replace none simdjson function if it is ready
     Status _simdjson_init_reader();
-    Status _simdjson_parse_json(bool* is_empty_row, bool* eof);
-    Status _simdjson_parse_json_doc(size_t* size, bool* eof);
+    Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
+                                simdjson::error_code* error);
+    Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* 
error,
+                           bool* is_empty_row);
+    Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
 
     Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
                                         const std::vector<SlotDescriptor*>& 
slot_descs,
@@ -246,6 +249,8 @@ private:
     /// Set of columns which already met in row. Exception is thrown if there 
are more than one column with the same name.
     std::vector<UInt8> _seen_columns;
     // simdjson
+    std::unique_ptr<uint8_t[]> _json_str_ptr;
+    const uint8_t* _json_str = nullptr;
     static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
     size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
     size_t _original_doc_size = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to