This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cb9a6f63ab8 [refactor](simd_json_reader) refactor simd json parse to 
adapt stream parse (#27972)
cb9a6f63ab8 is described below

commit cb9a6f63ab87055bfa3633390c187404a10754e2
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Thu Dec 7 14:45:15 2023 +0800

    [refactor](simd_json_reader) refactor simd json parse to adapt stream parse 
(#27972)
---
 be/src/vec/exec/format/json/new_json_reader.cpp | 310 ++++++++++++------------
 be/src/vec/exec/format/json/new_json_reader.h   |  11 +
 2 files changed, 164 insertions(+), 157 deletions(-)

diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 83b78946486..5c62f284f96 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1009,158 +1009,189 @@ Status NewJsonReader::_simdjson_init_reader() {
     return Status::OK();
 }
 
+Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, 
Block& block,
+                                             size_t num_rows, bool* eof) {
+    fmt::memory_buffer error_msg;
+    fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: 
{}", error.error(),
+                   error.what());
+    RETURN_IF_ERROR(_state->append_error_msg_to_file(
+            [&]() -> std::string {
+                return std::string(_simdjson_ondemand_padding_buffer.data(), 
_original_doc_size);
+            },
+            [&]() -> std::string { return fmt::to_string(error_msg); }, eof));
+    _counter->num_rows_filtered++;
+    // Before continuing to process other rows, we need to first clean the 
fail parsed row.
+    for (int i = 0; i < block.columns(); ++i) {
+        auto column = block.get_by_position(i).column->assume_mutable();
+        if (column->size() > num_rows) {
+            column->pop_back(column->size() - num_rows);
+        }
+    }
+
+    return Status::OK();
+}
+
 Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, 
Block& block,
                                                    const 
std::vector<SlotDescriptor*>& slot_descs,
                                                    bool* is_empty_row, bool* 
eof) {
     // simple json
+    size_t size = 0;
+    simdjson::error_code error;
+    size_t num_rows = block.rows();
+    try {
+        // step1: get and parse buf to get json doc
+        RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+        if (size == 0 || *eof) {
+            *is_empty_row = true;
+            return Status::OK();
+        }
+
+        // step2: get json value by json doc
+        Status st = _get_json_value(&size, eof, &error, is_empty_row);
+        if (st.is<DATA_QUALITY_ERROR>()) {
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(st);
+        if (*is_empty_row) {
+            return Status::OK();
+        }
+
+        // step 3: write columns by json value
+        RETURN_IF_ERROR(
+                _simdjson_handle_simple_json_write_columns(block, slot_descs, 
is_empty_row, eof));
+    } catch (simdjson::simdjson_error& e) {
+        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+        if (*_scanner_eof) {
+            // When _scanner_eof is true and valid is false, it means that we 
have encountered
+            // unqualified data and decided to stop the scan.
+            *is_empty_row = true;
+            return Status::OK();
+        }
+    }
+
+    return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_simple_json_write_columns(
+        Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* 
is_empty_row,
+        bool* eof) {
     simdjson::ondemand::object objectValue;
     size_t num_rows = block.rows();
-    do {
-        bool valid = false;
-        size_t size = 0;
-        simdjson::error_code error;
-        try {
-            if (_next_row >= _total_rows) { // parse json and generic document
-                RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
-                if (size == 0 || *eof) {
+    bool valid = false;
+    try {
+        if (_json_value.type() == simdjson::ondemand::json_type::array) {
+            _array = _json_value.get_array();
+            if (_array.count_elements() == 0) {
+                // may be passing an empty json, such as "[]"
+                RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", 
"", nullptr));
+                if (*_scanner_eof) {
                     *is_empty_row = true;
                     return Status::OK();
                 }
-
-                Status st = _get_json_value(&size, eof, &error, is_empty_row);
-                if (st.is<DATA_QUALITY_ERROR>()) {
-                    continue; // continue to read next
-                }
-                RETURN_IF_ERROR(st);
-                if (*is_empty_row) {
-                    return Status::OK();
-                }
-                if (_json_value.type() == 
simdjson::ondemand::json_type::array) {
-                    _array = _json_value.get_array();
-                    _array_iter = _array.begin();
-
-                    _total_rows = _array.count_elements();
-                    if (_total_rows == 0) {
-                        // may be passing an empty json, such as "[]"
-                        RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json 
line", "", nullptr));
-                        if (*_scanner_eof) {
-                            *is_empty_row = true;
-                            return Status::OK();
-                        }
-                        continue;
-                    }
-                } else {
-                    _total_rows = 1; // only one row
-                    objectValue = _json_value;
-                }
-                _next_row = 0;
+                return Status::OK();
             }
 
-            if (_json_value.type() == simdjson::ondemand::json_type::array) { 
// handle case 1
+            _array_iter = _array.begin();
+            while (true) {
                 objectValue = *_array_iter;
                 RETURN_IF_ERROR(
                         _simdjson_set_column_value(&objectValue, block, 
slot_descs, &valid));
+                if (!valid) {
+                    if (*_scanner_eof) {
+                        // When _scanner_eof is true and valid is false, it 
means that we have encountered
+                        // unqualified data and decided to stop the scan.
+                        *is_empty_row = true;
+                        return Status::OK();
+                    }
+                }
+                ++_array_iter;
                 if (_array_iter == _array.end()) {
                     // Hint to read next json doc
-                    _next_row = _total_rows + 1;
                     break;
                 }
-                ++_array_iter;
-            } else { // handle case 2
-                // objectValue = _json_value.get_object();
-                RETURN_IF_ERROR(
-                        _simdjson_set_column_value(&objectValue, block, 
slot_descs, &valid));
             }
-            _next_row++;
+        } else {
+            objectValue = _json_value;
+            RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block, 
slot_descs, &valid));
             if (!valid) {
                 if (*_scanner_eof) {
-                    // When _scanner_eof is true and valid is false, it means 
that we have encountered
-                    // unqualified data and decided to stop the scan.
                     *is_empty_row = true;
                     return Status::OK();
                 }
-                continue;
             }
             *is_empty_row = false;
-            break; // get a valid row, then break
-        } catch (simdjson::simdjson_error& e) {
-            // prevent from endless loop
-            _next_row = _total_rows + 1;
-            fmt::memory_buffer error_msg;
-            fmt::format_to(error_msg, "Parse json data failed. code: {}, error 
info: {}", e.error(),
-                           e.what());
-            RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                    [&]() -> std::string {
-                        return 
std::string(_simdjson_ondemand_padding_buffer.data(),
-                                           _original_doc_size);
-                    },
-                    [&]() -> std::string { return fmt::to_string(error_msg); 
}, eof));
-            _counter->num_rows_filtered++;
-            // Before continuing to process other rows, we need to first clean 
the fail parsed row.
-            for (int i = 0; i < block.columns(); ++i) {
-                auto column = 
block.get_by_position(i).column->assume_mutable();
-                if (column->size() > num_rows) {
-                    column->pop_back(column->size() - num_rows);
-                }
-            }
-            if (!valid) {
-                if (*_scanner_eof) {
-                    // When _scanner_eof is true and valid is false, it means 
that we have encountered
-                    // unqualified data and decided to stop the scan.
-                    *is_empty_row = true;
-                    return Status::OK();
-                }
-                continue;
+        }
+    } catch (simdjson::simdjson_error& e) {
+        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+        if (!valid) {
+            if (*_scanner_eof) {
+                *is_empty_row = true;
+                return Status::OK();
             }
-            continue;
         }
-    } while (_next_row <= _total_rows);
+    }
     return Status::OK();
 }
 
 Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
         RuntimeState* /*state*/, Block& block, const 
std::vector<SlotDescriptor*>& slot_descs,
         bool* is_empty_row, bool* eof) {
+    // array complex json
+    size_t size = 0;
+    simdjson::error_code error;
+    size_t num_rows = block.rows();
+    try {
+        // step1: get and parse buf to get json doc
+        RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+        if (size == 0 || *eof) {
+            *is_empty_row = true;
+            return Status::OK();
+        }
+
+        // step2: get json value by json doc
+        Status st = _get_json_value(&size, eof, &error, is_empty_row);
+        if (st.is<DATA_QUALITY_ERROR>()) {
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(st);
+        if (*is_empty_row) {
+            return Status::OK();
+        }
+
+        // step 3: write columns by json value
+        
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, 
slot_descs,
+                                                                               
is_empty_row, eof));
+    } catch (simdjson::simdjson_error& e) {
+        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+        if (*_scanner_eof) {
+            // When _scanner_eof is true and valid is false, it means that we 
have encountered
+            // unqualified data and decided to stop the scan.
+            *is_empty_row = true;
+            return Status::OK();
+        }
+    }
+
+    return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns(
+        Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* 
is_empty_row,
+        bool* eof) {
 // Advance one row in array list, if it is the endpoint, stop advance and 
break the loop
 #define ADVANCE_ROW()                  \
+    ++_array_iter;                     \
     if (_array_iter == _array.end()) { \
-        _next_row = _total_rows + 1;   \
         break;                         \
-    }                                  \
-    ++_array_iter;                     \
-    ++_next_row;
+    }
 
-    // array complex json
-    size_t num_rows = block.rows();
     simdjson::ondemand::object cur;
-    do {
-        size_t size = 0;
-        simdjson::error_code error;
-        try {
-            if (_next_row >= _total_rows) {
-                RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
-                if (size == 0 || *eof) {
-                    *is_empty_row = true;
-                    return Status::OK();
-                }
-                Status st = _get_json_value(&size, eof, &error, is_empty_row);
-                if (st.is<DATA_QUALITY_ERROR>()) {
-                    continue; // continue to read next
-                }
-                RETURN_IF_ERROR(st);
-                if (*is_empty_row) {
-                    if (st.ok()) {
-                        return st;
-                    }
-                    if (_total_rows == 0) {
-                        continue;
-                    }
-                }
-                _array = _json_value.get_array();
-                _array_iter = _array.begin();
-            }
+    size_t num_rows = block.rows();
+    try {
+        bool valid = true;
+        _array = _json_value.get_array();
+        _array_iter = _array.begin();
 
-            bool valid = true;
+        while (true) {
             cur = (*_array_iter).get_object();
             // extract root
             if (!_parsed_json_root.empty()) {
@@ -1187,36 +1218,17 @@ Status 
NewJsonReader::_simdjson_handle_flat_array_complex_json(
                 continue; // process next line
             }
             *is_empty_row = false;
-            break; // get a valid row, then break
-        } catch (simdjson::simdjson_error& e) {
-            // prevent from endless loop
-            _next_row = _total_rows + 1;
-            fmt::memory_buffer error_msg;
-            fmt::format_to(error_msg, "Parse json data failed. code: {}, error 
info: {}", e.error(),
-                           e.what());
-            RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                    [&]() -> std::string {
-                        return 
std::string(_simdjson_ondemand_padding_buffer.data(),
-                                           _original_doc_size);
-                    },
-                    [&]() -> std::string { return fmt::to_string(error_msg); 
}, eof));
-            _counter->num_rows_filtered++;
-            // Before continuing to process other rows, we need to first clean 
the fail parsed row.
-            for (int i = 0; i < block.columns(); ++i) {
-                auto column = 
block.get_by_position(i).column->assume_mutable();
-                if (column->size() > num_rows) {
-                    column->pop_back(column->size() - num_rows);
-                }
-            }
-            if (*_scanner_eof) {
-                // When _scanner_eof is true and valid is false, it means that 
we have encountered
-                // unqualified data and decided to stop the scan.
-                *is_empty_row = true;
-                return Status::OK();
-            }
-            continue;
         }
-    } while (_next_row <= _total_rows);
+    } catch (simdjson::simdjson_error& e) {
+        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+        if (*_scanner_eof) {
+            // When _scanner_eof is true and valid is false, it means that we 
have encountered
+            // unqualified data and decided to stop the scan.
+            *is_empty_row = true;
+            return Status::OK();
+        }
+    }
+
     return Status::OK();
 }
 
@@ -1269,23 +1281,7 @@ Status 
NewJsonReader::_simdjson_handle_nested_complex_json(
             }
             break; // read a valid row
         } catch (simdjson::simdjson_error& e) {
-            fmt::memory_buffer error_msg;
-            fmt::format_to(error_msg, "Parse json data failed. code: {}, error 
info: {}", e.error(),
-                           e.what());
-            RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                    [&]() -> std::string {
-                        return 
std::string(_simdjson_ondemand_padding_buffer.data(),
-                                           _original_doc_size);
-                    },
-                    [&]() -> std::string { return fmt::to_string(error_msg); 
}, eof));
-            _counter->num_rows_filtered++;
-            // Before continuing to process other rows, we need to first clean 
the fail parsed row.
-            for (int i = 0; i < block.columns(); ++i) {
-                auto column = 
block.get_by_position(i).column->assume_mutable();
-                if (column->size() > num_rows) {
-                    column->pop_back(column->size() - num_rows);
-                }
-            }
+            RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
             if (*_scanner_eof) {
                 // When _scanner_eof is true and valid is false, it means that 
we have encountered
                 // unqualified data and decided to stop the scan.
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 33a0ae0ba58..92c36c3b283 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -147,14 +147,25 @@ private:
                            bool* is_empty_row);
     Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
 
+    Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& 
block, size_t num_rows,
+                                  bool* eof);
+
     Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
                                         const std::vector<SlotDescriptor*>& 
slot_descs,
                                         bool* is_empty_row, bool* eof);
 
+    Status _simdjson_handle_simple_json_write_columns(
+            Block& block, const std::vector<SlotDescriptor*>& slot_descs, 
bool* is_empty_row,
+            bool* eof);
+
     Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, 
Block& block,
                                                     const 
std::vector<SlotDescriptor*>& slot_descs,
                                                     bool* is_empty_row, bool* 
eof);
 
+    Status _simdjson_handle_flat_array_complex_json_write_columns(
+            Block& block, const std::vector<SlotDescriptor*>& slot_descs, 
bool* is_empty_row,
+            bool* eof);
+
     Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& 
block,
                                                 const 
std::vector<SlotDescriptor*>& slot_descs,
                                                 bool* is_empty_row, bool* eof);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to