This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3e910e29781 [refactor](simd_json_reader) refactor simd json reader to adapt to parse multi json (#27272) 3e910e29781 is described below commit 3e910e297818f9df11f4c815ff1c96f5e117d41f Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Thu Nov 30 15:01:06 2023 +0800 [refactor](simd_json_reader) refactor simd json reader to adapt to parse multi json (#27272) --- be/src/vec/exec/format/json/new_json_reader.cpp | 126 ++++++++++++++---------- be/src/vec/exec/format/json/new_json_reader.h | 9 +- 2 files changed, 82 insertions(+), 53 deletions(-) diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index f735b1a74ba..83b78946486 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -1017,9 +1017,17 @@ Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc size_t num_rows = block.rows(); do { bool valid = false; + size_t size = 0; + simdjson::error_code error; try { if (_next_row >= _total_rows) { // parse json and generic document - Status st = _simdjson_parse_json(is_empty_row, eof); + RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + Status st = _get_json_value(&size, eof, &error, is_empty_row); if (st.is<DATA_QUALITY_ERROR>()) { continue; // continue to read next } @@ -1126,9 +1134,16 @@ Status NewJsonReader::_simdjson_handle_flat_array_complex_json( size_t num_rows = block.rows(); simdjson::ondemand::object cur; do { + size_t size = 0; + simdjson::error_code error; try { if (_next_row >= _total_rows) { - Status st = _simdjson_parse_json(is_empty_row, eof); + RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + Status st = _get_json_value(&size, eof, &error, is_empty_row); if (st.is<DATA_QUALITY_ERROR>()) { continue; // continue to read next } @@ -1212,8 +1227,15 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json( while (true) { size_t num_rows = block.rows(); simdjson::ondemand::object cur; + size_t size = 0; + simdjson::error_code error; try { - Status st = _simdjson_parse_json(is_empty_row, eof); + RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + Status st = _get_json_value(&size, eof, &error, is_empty_row); if (st.is<DATA_QUALITY_ERROR>()) { continue; // continue to read next } @@ -1451,48 +1473,26 @@ Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st return Status::OK(); } -Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) { - size_t size = 0; - RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof)); - - // read all data, then return - if (size == 0 || *eof) { - *is_empty_row = true; - return Status::OK(); - } - - if (!_parsed_jsonpaths.empty() && _strip_outer_array) { - _total_rows = _json_value.count_elements().value(); - _next_row = 0; - - if (_total_rows == 0) { - // meet an empty json array. - *is_empty_row = true; - } - } - return Status::OK(); -} -Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) { - // read a whole message +Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, + simdjson::error_code* error) { SCOPED_TIMER(_file_read_timer); - const uint8_t* json_str = nullptr; - std::unique_ptr<uint8_t[]> json_str_ptr; + // step1: read buf from pipe. if (_line_reader != nullptr) { - RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof, _io_ctx)); + RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx)); } else { size_t length = 0; - RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length)); - json_str = json_str_ptr.get(); + RETURN_IF_ERROR(_read_one_message(&_json_str_ptr, &length)); + _json_str = _json_str_ptr.get(); *size = length; if (length == 0) { *eof = true; } } - - _bytes_read_counter += *size; if (*eof) { return Status::OK(); } + + // step2: init json parser iterate. if (*size + simdjson::SIMDJSON_PADDING > _padded_size) { // For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end. // Hence, a re-allocation is needed if the space is not enough. @@ -1501,19 +1501,42 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) { _padded_size = *size + simdjson::SIMDJSON_PADDING; } // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM) - if (*size >= 3 && static_cast<char>(json_str[0]) == '\xEF' && - static_cast<char>(json_str[1]) == '\xBB' && static_cast<char>(json_str[2]) == '\xBF') { + if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' && + static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') { // skip the first three BOM bytes - json_str += 3; + _json_str += 3; *size -= 3; } - memcpy(&_simdjson_ondemand_padding_buffer.front(), json_str, *size); + memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size); _original_doc_size = *size; - auto error = - _ondemand_json_parser - ->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size), - _padded_size) - .get(_original_json_doc); + *error = _ondemand_json_parser + ->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size), + _padded_size) + .get(_original_json_doc); + return Status::OK(); +} + +Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row) { + if (size == 0 || eof) { + *is_empty_row = true; + return Status::OK(); + } + + if (!_parsed_jsonpaths.empty() && _strip_outer_array) { + _total_rows = _json_value.count_elements().value(); + _next_row = 0; + + if (_total_rows == 0) { + // meet an empty json array. + *is_empty_row = true; + } + } + return Status::OK(); +} + +Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error, + bool* is_empty_row) { + SCOPED_TIMER(_file_read_timer); auto return_quality_error = [&](fmt::memory_buffer& error_msg, const std::string& doc_info) -> Status { RETURN_IF_ERROR(_state->append_error_msg_to_file( @@ -1529,25 +1552,25 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) { } return Status::DataQualityError(fmt::to_string(error_msg)); }; - if (error != simdjson::error_code::SUCCESS) { + if (*error != simdjson::error_code::SUCCESS) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", - error, simdjson::error_message(error)); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + *error, simdjson::error_message(*error)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } auto type_res = _original_json_doc.type(); if (type_res.error() != simdjson::error_code::SUCCESS) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", type_res.error(), simdjson::error_message(type_res.error())); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } simdjson::ondemand::json_type type = type_res.value(); if (type != simdjson::ondemand::json_type::object && type != simdjson::ondemand::json_type::array) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Not an json object or json array"); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { try { @@ -1559,13 +1582,13 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) { if (!st.ok()) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", st.to_string()); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } } catch (simdjson::simdjson_error& e) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}", e.what()); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } } else { _json_value = _original_json_doc; @@ -1575,15 +1598,16 @@ Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is array-object, `strip_outer_array` must be TRUE."); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is not an array-object, `strip_outer_array` must be FALSE."); - return return_quality_error(error_msg, std::string((char*)json_str, *size)); + return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } + RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row)); return Status::OK(); } diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 708bceef934..33a0ae0ba58 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -141,8 +141,11 @@ private: // simdjson, replace none simdjson function if it is ready Status _simdjson_init_reader(); - Status _simdjson_parse_json(bool* is_empty_row, bool* eof); - Status _simdjson_parse_json_doc(size_t* size, bool* eof); + Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, + simdjson::error_code* error); + Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error, + bool* is_empty_row); + Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row); Status _simdjson_handle_simple_json(RuntimeState* state, Block& block, const std::vector<SlotDescriptor*>& slot_descs, @@ -246,6 +249,8 @@ private: /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name. std::vector<UInt8> _seen_columns; // simdjson + std::unique_ptr<uint8_t[]> _json_str_ptr; + const uint8_t* _json_str = nullptr; static constexpr size_t _init_buffer_size = 1024 * 1024 * 8; size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING; size_t _original_doc_size = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org