This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cb9a6f63ab8 [refactor](simd_json_reader) refactor simd json parse to adapt stream parse (#27972) cb9a6f63ab8 is described below commit cb9a6f63ab87055bfa3633390c187404a10754e2 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Thu Dec 7 14:45:15 2023 +0800 [refactor](simd_json_reader) refactor simd json parse to adapt stream parse (#27972) --- be/src/vec/exec/format/json/new_json_reader.cpp | 310 ++++++++++++------------ be/src/vec/exec/format/json/new_json_reader.h | 11 + 2 files changed, 164 insertions(+), 157 deletions(-) diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 83b78946486..5c62f284f96 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -1009,158 +1009,189 @@ Status NewJsonReader::_simdjson_init_reader() { return Status::OK(); } +Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Block& block, + size_t num_rows, bool* eof) { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", error.error(), + error.what()); + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size); + }, + [&]() -> std::string { return fmt::to_string(error_msg); }, eof)); + _counter->num_rows_filtered++; + // Before continuing to process other rows, we need to first clean the fail parsed row. + for (int i = 0; i < block.columns(); ++i) { + auto column = block.get_by_position(i).column->assume_mutable(); + if (column->size() > num_rows) { + column->pop_back(column->size() - num_rows); + } + } + + return Status::OK(); +} + Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof) { // simple json + size_t size = 0; + simdjson::error_code error; + size_t num_rows = block.rows(); + try { + // step1: get and parse buf to get json doc + RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + // step2: get json value by json doc + Status st = _get_json_value(&size, eof, &error, is_empty_row); + if (st.is<DATA_QUALITY_ERROR>()) { + return Status::OK(); + } + RETURN_IF_ERROR(st); + if (*is_empty_row) { + return Status::OK(); + } + + // step 3: write columns by json value + RETURN_IF_ERROR( + _simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof)); + } catch (simdjson::simdjson_error& e) { + RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); + } + } + + return Status::OK(); +} + +Status NewJsonReader::_simdjson_handle_simple_json_write_columns( + Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, + bool* eof) { simdjson::ondemand::object objectValue; size_t num_rows = block.rows(); - do { - bool valid = false; - size_t size = 0; - simdjson::error_code error; - try { - if (_next_row >= _total_rows) { // parse json and generic document - RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); - if (size == 0 || *eof) { + bool valid = false; + try { + if (_json_value.type() == simdjson::ondemand::json_type::array) { + _array = _json_value.get_array(); + if (_array.count_elements() == 0) { + // may be passing an empty json, such as "[]" + RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", "", nullptr)); + if (*_scanner_eof) { *is_empty_row = true; return Status::OK(); } - - Status st = _get_json_value(&size, eof, &error, is_empty_row); - if (st.is<DATA_QUALITY_ERROR>()) { - continue; // continue to read next - } - RETURN_IF_ERROR(st); - if (*is_empty_row) { - return Status::OK(); - } - if (_json_value.type() == simdjson::ondemand::json_type::array) { - _array = _json_value.get_array(); - _array_iter = _array.begin(); - - _total_rows = _array.count_elements(); - if (_total_rows == 0) { - // may be passing an empty json, such as "[]" - RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", "", nullptr)); - if (*_scanner_eof) { - *is_empty_row = true; - return Status::OK(); - } - continue; - } - } else { - _total_rows = 1; // only one row - objectValue = _json_value; - } - _next_row = 0; + return Status::OK(); } - if (_json_value.type() == simdjson::ondemand::json_type::array) { // handle case 1 + _array_iter = _array.begin(); + while (true) { objectValue = *_array_iter; RETURN_IF_ERROR( _simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); + if (!valid) { + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); + } + } + ++_array_iter; if (_array_iter == _array.end()) { // Hint to read next json doc - _next_row = _total_rows + 1; break; } - ++_array_iter; - } else { // handle case 2 - // objectValue = _json_value.get_object(); - RETURN_IF_ERROR( - _simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); } - _next_row++; + } else { + objectValue = _json_value; + RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); if (!valid) { if (*_scanner_eof) { - // When _scanner_eof is true and valid is false, it means that we have encountered - // unqualified data and decided to stop the scan. *is_empty_row = true; return Status::OK(); } - continue; } *is_empty_row = false; - break; // get a valid row, then break - } catch (simdjson::simdjson_error& e) { - // prevent from endless loop - _next_row = _total_rows + 1; - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", e.error(), - e.what()); - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return std::string(_simdjson_ondemand_padding_buffer.data(), - _original_doc_size); - }, - [&]() -> std::string { return fmt::to_string(error_msg); }, eof)); - _counter->num_rows_filtered++; - // Before continuing to process other rows, we need to first clean the fail parsed row. - for (int i = 0; i < block.columns(); ++i) { - auto column = block.get_by_position(i).column->assume_mutable(); - if (column->size() > num_rows) { - column->pop_back(column->size() - num_rows); - } - } - if (!valid) { - if (*_scanner_eof) { - // When _scanner_eof is true and valid is false, it means that we have encountered - // unqualified data and decided to stop the scan. - *is_empty_row = true; - return Status::OK(); - } - continue; + } + } catch (simdjson::simdjson_error& e) { + RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); + if (!valid) { + if (*_scanner_eof) { + *is_empty_row = true; + return Status::OK(); } - continue; } - } while (_next_row <= _total_rows); + } return Status::OK(); } Status NewJsonReader::_simdjson_handle_flat_array_complex_json( RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof) { + // array complex json + size_t size = 0; + simdjson::error_code error; + size_t num_rows = block.rows(); + try { + // step1: get and parse buf to get json doc + RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + // step2: get json value by json doc + Status st = _get_json_value(&size, eof, &error, is_empty_row); + if (st.is<DATA_QUALITY_ERROR>()) { + return Status::OK(); + } + RETURN_IF_ERROR(st); + if (*is_empty_row) { + return Status::OK(); + } + + // step 3: write columns by json value + RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs, + is_empty_row, eof)); + } catch (simdjson::simdjson_error& e) { + RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); + } + } + + return Status::OK(); +} + +Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns( + Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, + bool* eof) { // Advance one row in array list, if it is the endpoint, stop advance and break the loop #define ADVANCE_ROW() \ + ++_array_iter; \ if (_array_iter == _array.end()) { \ - _next_row = _total_rows + 1; \ break; \ - } \ - ++_array_iter; \ - ++_next_row; + } - // array complex json - size_t num_rows = block.rows(); simdjson::ondemand::object cur; - do { - size_t size = 0; - simdjson::error_code error; - try { - if (_next_row >= _total_rows) { - RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); - if (size == 0 || *eof) { - *is_empty_row = true; - return Status::OK(); - } - Status st = _get_json_value(&size, eof, &error, is_empty_row); - if (st.is<DATA_QUALITY_ERROR>()) { - continue; // continue to read next - } - RETURN_IF_ERROR(st); - if (*is_empty_row) { - if (st.ok()) { - return st; - } - if (_total_rows == 0) { - continue; - } - } - _array = _json_value.get_array(); - _array_iter = _array.begin(); - } + size_t num_rows = block.rows(); + try { + bool valid = true; + _array = _json_value.get_array(); + _array_iter = _array.begin(); - bool valid = true; + while (true) { cur = (*_array_iter).get_object(); // extract root if (!_parsed_json_root.empty()) { @@ -1187,36 +1218,17 @@ Status NewJsonReader::_simdjson_handle_flat_array_complex_json( continue; // process next line } *is_empty_row = false; - break; // get a valid row, then break - } catch (simdjson::simdjson_error& e) { - // prevent from endless loop - _next_row = _total_rows + 1; - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", e.error(), - e.what()); - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return std::string(_simdjson_ondemand_padding_buffer.data(), - _original_doc_size); - }, - [&]() -> std::string { return fmt::to_string(error_msg); }, eof)); - _counter->num_rows_filtered++; - // Before continuing to process other rows, we need to first clean the fail parsed row. - for (int i = 0; i < block.columns(); ++i) { - auto column = block.get_by_position(i).column->assume_mutable(); - if (column->size() > num_rows) { - column->pop_back(column->size() - num_rows); - } - } - if (*_scanner_eof) { - // When _scanner_eof is true and valid is false, it means that we have encountered - // unqualified data and decided to stop the scan. - *is_empty_row = true; - return Status::OK(); - } - continue; } - } while (_next_row <= _total_rows); + } catch (simdjson::simdjson_error& e) { + RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); + } + } + return Status::OK(); } @@ -1269,23 +1281,7 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json( } break; // read a valid row } catch (simdjson::simdjson_error& e) { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", e.error(), - e.what()); - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return std::string(_simdjson_ondemand_padding_buffer.data(), - _original_doc_size); - }, - [&]() -> std::string { return fmt::to_string(error_msg); }, eof)); - _counter->num_rows_filtered++; - // Before continuing to process other rows, we need to first clean the fail parsed row. - for (int i = 0; i < block.columns(); ++i) { - auto column = block.get_by_position(i).column->assume_mutable(); - if (column->size() > num_rows) { - column->pop_back(column->size() - num_rows); - } - } + RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); if (*_scanner_eof) { // When _scanner_eof is true and valid is false, it means that we have encountered // unqualified data and decided to stop the scan. diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 33a0ae0ba58..92c36c3b283 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -147,14 +147,25 @@ private: bool* is_empty_row); Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row); + Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& block, size_t num_rows, + bool* eof); + Status _simdjson_handle_simple_json(RuntimeState* state, Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof); + Status _simdjson_handle_simple_json_write_columns( + Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, + bool* eof); + Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof); + Status _simdjson_handle_flat_array_complex_json_write_columns( + Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, + bool* eof); + Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org