This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cb7e8bf1aa6 [fix](simd-json-reader) fix SIMD json reader lose data and support stream parser (#35781) cb7e8bf1aa6 is described below commit cb7e8bf1aa619e2c3b8b10d84e0721e543bc092a Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Tue Jun 4 10:06:31 2024 +0800 [fix](simd-json-reader) fix SIMD json reader lose data and support stream parser (#35781) When load json with do not set read_json_by_line, only one json loaded.  But there are three rows json {"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"} {"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"} {"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"} Two rows lost. --- be/src/vec/exec/format/json/new_json_reader.cpp | 187 ++++++++++++--------- be/src/vec/exec/format/json/new_json_reader.h | 6 +- .../load_p0/stream_load/iterate_read_json.json | 3 + .../data/load_p0/stream_load/test_json_load.out | 5 + .../load_p0/stream_load/test_json_load.groovy | 14 +- 5 files changed, 129 insertions(+), 86 deletions(-) diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index b658e15e467..a765149384d 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -1058,6 +1058,7 @@ Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc size_t size = 0; simdjson::error_code error; size_t num_rows = block.rows(); + try { // step1: get and parse buf to get json doc RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); @@ -1066,19 +1067,25 @@ Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc return Status::OK(); } - // step2: get json value by json doc - Status st = _get_json_value(&size, eof, &error, is_empty_row); - if (st.is<DATA_QUALITY_ERROR>()) { - return Status::OK(); - } - RETURN_IF_ERROR(st); - if (*is_empty_row || *eof) { - return Status::OK(); - } + for (_json_stream_iterator = _json_stream.begin(); + _json_stream_iterator != _json_stream.end(); ++_json_stream_iterator) { + if (_json_stream_iterator.current_index() >= _original_doc_size) { + break; + } + // step2: get json value by json doc + Status st = _get_json_value(&size, eof, &error, is_empty_row); + if (st.is<DATA_QUALITY_ERROR>()) { + return Status::OK(); + } + RETURN_IF_ERROR(st); + if (*is_empty_row || *eof) { + return Status::OK(); + } - // step 3: write columns by json value - RETURN_IF_ERROR( - _simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof)); + // step 3: write columns by json value + RETURN_IF_ERROR(_simdjson_handle_simple_json_write_columns(block, slot_descs, + is_empty_row, eof)); + } } catch (simdjson::simdjson_error& e) { RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); if (*_scanner_eof) { @@ -1168,19 +1175,25 @@ Status NewJsonReader::_simdjson_handle_flat_array_complex_json( return Status::OK(); } - // step2: get json value by json doc - Status st = _get_json_value(&size, eof, &error, is_empty_row); - if (st.is<DATA_QUALITY_ERROR>()) { - return Status::OK(); - } - RETURN_IF_ERROR(st); - if (*is_empty_row) { - return Status::OK(); - } + for (_json_stream_iterator = _json_stream.begin(); + _json_stream_iterator != _json_stream.end(); ++_json_stream_iterator) { + if (_json_stream_iterator.current_index() >= _original_doc_size) { + break; + } + // step2: get json value by json doc + Status st = _get_json_value(&size, eof, &error, is_empty_row); + if (st.is<DATA_QUALITY_ERROR>()) { + return Status::OK(); + } + RETURN_IF_ERROR(st); + if (*is_empty_row) { + return Status::OK(); + } - // step 3: write columns by json value - RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs, - is_empty_row, eof)); + // step 3: write columns by json value + RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns( + block, slot_descs, is_empty_row, eof)); + } } catch (simdjson::simdjson_error& e) { RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); if (*_scanner_eof) { @@ -1256,20 +1269,26 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json( RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof) { // nested complex json - while (true) { - size_t num_rows = block.rows(); - simdjson::ondemand::object cur; - size_t size = 0; - simdjson::error_code error; - try { - RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); - if (size == 0 || *eof) { - *is_empty_row = true; - return Status::OK(); + size_t num_rows = block.rows(); + simdjson::ondemand::object cur; + size_t size = 0; + simdjson::error_code error; + + try { + RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + for (_json_stream_iterator = _json_stream.begin(); + _json_stream_iterator != _json_stream.end(); ++_json_stream_iterator) { + if (_json_stream_iterator.current_index() >= _original_doc_size) { + break; } Status st = _get_json_value(&size, eof, &error, is_empty_row); if (st.is<DATA_QUALITY_ERROR>()) { - continue; // continue to read next + return Status::OK(); } RETURN_IF_ERROR(st); if (*is_empty_row) { @@ -1299,16 +1318,14 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json( // so that the caller will continue reading next line. *is_empty_row = true; } - break; // read a valid row - } catch (simdjson::simdjson_error& e) { - RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); - if (*_scanner_eof) { - // When _scanner_eof is true and valid is false, it means that we have encountered - // unqualified data and decided to stop the scan. - *is_empty_row = true; - return Status::OK(); - } - continue; + } + } catch (simdjson::simdjson_error& e) { + RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); } } return Status::OK(); @@ -1515,14 +1532,13 @@ Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, boo return Status::OK(); } - // step2: init json parser iterate. - if (*size + simdjson::SIMDJSON_PADDING > _padded_size) { - // For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end. - // Hence, a re-allocation is needed if the space is not enough. - _simdjson_ondemand_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING); - _simdjson_ondemand_unscape_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING); - _padded_size = *size + simdjson::SIMDJSON_PADDING; - } + // step2: init json stream iterate. + // For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end. + _simdjson_ondemand_padding_buffer.clear(); + _padded_size = *size + simdjson::SIMDJSON_PADDING; + _simdjson_ondemand_padding_buffer.resize(_padded_size); + _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size); + // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM) if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' && static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') { @@ -1532,10 +1548,16 @@ Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, boo } memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size); _original_doc_size = *size; - *error = _ondemand_json_parser - ->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size), - _padded_size) - .get(_original_json_doc); + + *error = _ondemand_json_parser->iterate_many(_simdjson_ondemand_padding_buffer) + .get(_json_stream); + if (*error != simdjson::error_code::SUCCESS) { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", + *error, simdjson::error_message(*error)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); + } + return Status::OK(); } @@ -1557,43 +1579,40 @@ Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row return Status::OK(); } +Status NewJsonReader::_return_quality_error(fmt::memory_buffer& error_msg, + const std::string& doc_info, bool* eof) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return doc_info; }, + [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); + _counter->num_rows_filtered++; + if (*_scanner_eof) { + // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means + // we meet enough invalid rows and the scanner should be stopped. + // So we set eof to true and return OK, the caller will stop the process as we meet the end of file. + *eof = true; + return Status::OK(); + } + return Status::DataQualityError(fmt::to_string(error_msg)); +} + Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error, bool* is_empty_row) { SCOPED_TIMER(_file_read_timer); - auto return_quality_error = [&](fmt::memory_buffer& error_msg, - const std::string& doc_info) -> Status { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { return doc_info; }, - [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); - _counter->num_rows_filtered++; - if (*_scanner_eof) { - // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means - // we meet enough invalid rows and the scanner should be stopped. - // So we set eof to true and return OK, the caller will stop the process as we meet the end of file. - *eof = true; - return Status::OK(); - } - return Status::DataQualityError(fmt::to_string(error_msg)); - }; - if (*error != simdjson::error_code::SUCCESS) { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", - *error, simdjson::error_message(*error)); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); - } + _original_json_doc = (*_json_stream_iterator).value(); + auto type_res = _original_json_doc.type(); if (type_res.error() != simdjson::error_code::SUCCESS) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", type_res.error(), simdjson::error_message(type_res.error())); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); } simdjson::ondemand::json_type type = type_res.value(); if (type != simdjson::ondemand::json_type::object && type != simdjson::ondemand::json_type::array) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Not an json object or json array"); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); } if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { try { @@ -1605,13 +1624,13 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c if (!st.ok()) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", st.to_string()); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); } } catch (simdjson::simdjson_error& e) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}", e.what()); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); } } else { _json_value = _original_json_doc; @@ -1621,14 +1640,14 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is array-object, `strip_outer_array` must be TRUE."); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); } if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is not an array-object, `strip_outer_array` must be FALSE."); - return return_quality_error(error_msg, std::string((char*)_json_str, *size)); + return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof); } RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row)); return Status::OK(); diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index f044e06e62e..2a8b428db9f 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -147,6 +147,8 @@ private: Status _simdjson_init_reader(); Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, simdjson::error_code* error); + Status _return_quality_error(fmt::memory_buffer& error_msg, const std::string& doc_info, + bool* eof); Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error, bool* is_empty_row); Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row); @@ -273,8 +275,10 @@ private: std::string _simdjson_ondemand_padding_buffer; std::string _simdjson_ondemand_unscape_padding_buffer; // char _simdjson_ondemand_padding_buffer[_padded_size]; - simdjson::ondemand::document _original_json_doc; + simdjson::ondemand::document_reference _original_json_doc; simdjson::ondemand::value _json_value; + simdjson::ondemand::document_stream _json_stream; + simdjson::ondemand::document_stream::iterator _json_stream_iterator; // for strip outer array // array_iter pointed to _array simdjson::ondemand::array_iterator _array_iter; diff --git a/regression-test/data/load_p0/stream_load/iterate_read_json.json b/regression-test/data/load_p0/stream_load/iterate_read_json.json new file mode 100644 index 00000000000..f6c0d634efe --- /dev/null +++ b/regression-test/data/load_p0/stream_load/iterate_read_json.json @@ -0,0 +1,3 @@ +{"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"} +{"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"} +{"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"} \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out b/regression-test/data/load_p0/stream_load/test_json_load.out index 9f064572f13..4c5dfc81f87 100644 --- a/regression-test/data/load_p0/stream_load/test_json_load.out +++ b/regression-test/data/load_p0/stream_load/test_json_load.out @@ -245,6 +245,11 @@ John 30 New York {"email":"j...@example.com","phone":"+1-123-456-7890"} android \N \N \N \N \N android \N \N \N \N \N +-- !iterate_read_json -- +Name1 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525 +Name2 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525 +Name3 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525 + -- !select28 -- test k2_value diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_json_load.groovy index da6bea1afcd..8971b27f662 100644 --- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy @@ -787,7 +787,6 @@ suite("test_json_load", "p0") { assertEquals("${reason}", "${out}") } } - } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } @@ -823,7 +822,20 @@ suite("test_json_load", "p0") { assertEquals("${reason}", "${out}") } } + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // iterate read json when read_json_by_line = false + try { + sql "DROP TABLE IF EXISTS ${testTable}" + create_json_test_table.call(testTable) + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_json_data.call("${testTable}", test_load_label, 'false', 'false', 'json', '', '', '', '', '', 'iterate_read_json.json') + sql "sync" + + qt_iterate_read_json "select * from ${testTable} order by name" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org