This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new cf847b8c711 [fix](load) return DataQualityError when filtered rows exceeds limit (#47617) (#49289) cf847b8c711 is described below commit cf847b8c711207254ba386c1fa7cd2d7a2921299 Author: Kaijie Chen <chenkai...@selectdb.com> AuthorDate: Wed Apr 2 19:09:21 2025 +0800 [fix](load) return DataQualityError when filtered rows exceeds limit (#47617) (#49289) backport #47617 --- be/src/runtime/runtime_state.cpp | 8 +- be/src/runtime/runtime_state.h | 2 +- be/src/vec/exec/format/csv/csv_reader.cpp | 21 +++-- be/src/vec/exec/format/json/new_json_reader.cpp | 57 ++++++------- be/src/vec/exec/scan/vfile_scanner.cpp | 12 ++- be/src/vec/sink/group_commit_block_sink.cpp | 14 ++-- be/src/vec/sink/vrow_distribution.cpp | 18 ++--- be/src/vec/sink/vtablet_block_convertor.cpp | 47 +++++------ be/src/vec/sink/vtablet_block_convertor.h | 13 ++- be/src/vec/sink/vtablet_finder.cpp | 16 ++-- be/src/vec/sink/vtablet_finder.h | 4 +- .../load_p0/stream_load/test_stream_load.groovy | 2 +- .../test_stream_load_with_filtered_rows.groovy | 94 ++++++++++++++++++++++ 13 files changed, 192 insertions(+), 116 deletions(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index cdb5a65a977..5b95fbf822a 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -407,8 +407,7 @@ Status RuntimeState::create_error_log_file() { Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line, std::function<std::string()> error_msg, - bool* stop_processing, bool is_summary) { - *stop_processing = false; + bool is_summary) { if (query_type() != TQueryType::LOAD) { return Status::OK(); } @@ -431,7 +430,10 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line, if (_num_print_error_rows.fetch_add(1, std::memory_order_relaxed) > MAX_ERROR_NUM && !is_summary) { if (_load_zero_tolerance) { - *stop_processing = true; + return Status::DataQualityError( + "Encountered unqualified data, stop processing. Please check if the source " + "data matches the schema, and consider disabling strict mode or increasing " + "max_filter_ratio."); } return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6f1a4b8d189..0f368748101 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -288,7 +288,7 @@ public: // is_summary is true, means we are going to write the summary line // If we need to stop the processing, set stop_processing to true Status append_error_msg_to_file(std::function<std::string()> line, - std::function<std::string()> error_msg, bool* stop_processing, + std::function<std::string()> error_msg, bool is_summary = false); int64_t num_bytes_load_total() { return _num_bytes_load_total.load(); } diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 77a5b65d512..660e25b2b72 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -719,6 +719,8 @@ Status CsvReader::_validate_line(const Slice& line, bool* success) { if (!_is_load) { return Status::InternalError<false>("Only support csv data in utf8 codec"); } else { + _counter->num_rows_filtered++; + *success = false; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(line.data, line.size); }, [&]() -> std::string { @@ -727,10 +729,7 @@ Status CsvReader::_validate_line(const Slice& line, bool* success) { "Unable to display, only support csv data in utf8 codec", ", please check the data encoding"); return fmt::to_string(error_msg); - }, - &_line_reader_eof)); - _counter->num_rows_filtered++; - *success = false; + })); return Status::OK(); } } @@ -748,6 +747,8 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { if (_split_values.size() != _file_slot_descs.size()) { std::string cmp_str = _split_values.size() > _file_slot_descs.size() ? "more than" : "less than"; + _counter->num_rows_filtered++; + *success = false; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(line.data, line.size); }, [&]() -> std::string { @@ -771,10 +772,7 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { } fmt::format_to(error_msg, "result values:[{}]", fmt::to_string(values)); return fmt::to_string(error_msg); - }, - &_line_reader_eof)); - _counter->num_rows_filtered++; - *success = false; + })); return Status::OK(); } } @@ -797,6 +795,8 @@ Status CsvReader::_check_array_format(std::vector<Slice>& split_values, bool* is } const Slice& value = split_values[j]; if (slot_desc->type().is_array_type() && !_is_null(value) && !_is_array(value)) { + _counter->num_rows_filtered++; + *is_success = false; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(value.data, value.size); }, [&]() -> std::string { @@ -804,10 +804,7 @@ Status CsvReader::_check_array_format(std::vector<Slice>& split_values, bool* is fmt::format_to(err_msg, "Invalid format for array column({})", slot_desc->col_name()); return fmt::to_string(err_msg); - }, - &_line_reader_eof)); - _counter->num_rows_filtered++; - *is_success = false; + })); return Status::OK(); } } diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 6a75a135793..455410863d9 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -666,10 +666,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", _origin_json_doc.GetParseError(), rapidjson::GetParseError_En(_origin_json_doc.GetParseError())); + _counter->num_rows_filtered++; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string((char*)json_str, *size); }, - [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); - _counter->num_rows_filtered++; + [&]() -> std::string { return fmt::to_string(error_msg); })); if (*_scanner_eof) { // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means // we meet enough invalid rows and the scanner should be stopped. @@ -687,10 +687,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { if (_json_doc == nullptr) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON Root not found."); + _counter->num_rows_filtered++; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return _print_json_value(_origin_json_doc); }, - [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); - _counter->num_rows_filtered++; + [&]() -> std::string { return fmt::to_string(error_msg); })); if (*_scanner_eof) { // Same as Case A *eof = true; @@ -706,10 +706,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is array-object, `strip_outer_array` must be TRUE."); + _counter->num_rows_filtered++; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return _print_json_value(_origin_json_doc); }, - [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); - _counter->num_rows_filtered++; + [&]() -> std::string { return fmt::to_string(error_msg); })); if (*_scanner_eof) { // Same as Case A *eof = true; @@ -722,10 +722,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is not an array-object, `strip_outer_array` must be FALSE."); + _counter->num_rows_filtered++; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return _print_json_value(_origin_json_doc); }, - [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); - _counter->num_rows_filtered++; + [&]() -> std::string { return fmt::to_string(error_msg); })); if (*_scanner_eof) { // Same as Case A *eof = true; @@ -954,20 +954,21 @@ Status NewJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std err_msg = error_msg; } + _counter->num_rows_filtered++; + if (valid != nullptr) { + // current row is invalid + *valid = false; + } + RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return NewJsonReader::_print_json_value(objectValue); }, - [&]() -> std::string { return err_msg; }, _scanner_eof)); + [&]() -> std::string { return err_msg; })); // TODO(ftw): check here? if (*_scanner_eof) { _reader_eof = true; } - _counter->num_rows_filtered++; - if (valid != nullptr) { - // current row is invalid - *valid = false; - } return Status::OK(); } @@ -1088,11 +1089,6 @@ Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Bl fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", error.error(), error.what()); - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size); - }, - [&]() -> std::string { return fmt::to_string(error_msg); }, eof)); _counter->num_rows_filtered++; // Before continuing to process other rows, we need to first clean the fail parsed row. for (int i = 0; i < block.columns(); ++i) { @@ -1102,6 +1098,11 @@ Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Bl } } + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size); + }, + [&]() -> std::string { return fmt::to_string(error_msg); })); return Status::OK(); } @@ -1534,6 +1535,12 @@ Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st err_msg = error_msg; } + _counter->num_rows_filtered++; + if (valid != nullptr) { + // current row is invalid + *valid = false; + } + RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { if (!obj) { @@ -1543,13 +1550,7 @@ Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st (void)!obj->raw_json().get(str_view); return std::string(str_view.data(), str_view.size()); }, - [&]() -> std::string { return err_msg; }, _scanner_eof)); - - _counter->num_rows_filtered++; - if (valid != nullptr) { - // current row is invalid - *valid = false; - } + [&]() -> std::string { return err_msg; })); return Status::OK(); } @@ -1619,10 +1620,10 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c SCOPED_TIMER(_file_read_timer); auto return_quality_error = [&](fmt::memory_buffer& error_msg, const std::string& doc_info) -> Status { + _counter->num_rows_filtered++; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return doc_info; }, - [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); - _counter->num_rows_filtered++; + [&]() -> std::string { return fmt::to_string(error_msg); })); if (*_scanner_eof) { // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means // we meet enough invalid rows and the scanner should be stopped. diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index f4376cc1e70..dfd3397d270 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -306,7 +306,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo Status st = _get_block_wrapped(state, block, eof); if (!st.ok()) { // add cur path in error msg for easy debugging - return std::move(st.prepend("cur path: " + get_current_scan_range_name() + ". ")); + return std::move(st.append(". cur path: " + get_current_scan_range_name())); } return st; } @@ -629,6 +629,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) { if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]) .column->is_null_at(i)) { + filter_map[i] = false; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return _src_block_ptr->dump_one_line(i, @@ -648,10 +649,9 @@ Status VFileScanner::_convert_to_output_block(Block* block) { "src value is {}", slot_desc->col_name(), _strict_mode, raw_string); return fmt::to_string(error_msg); - }, - &_scanner_eof)); - filter_map[i] = false; + })); } else if (!slot_desc->is_nullable()) { + filter_map[i] = false; RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return _src_block_ptr->dump_one_line(i, @@ -664,9 +664,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) { "nullable", slot_desc->col_name()); return fmt::to_string(error_msg); - }, - &_scanner_eof)); - filter_map[i] = false; + })); } } } diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 40e4b97a6e8..c510299aaa9 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -167,24 +167,24 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ for (int index = 0; index < rows; index++) { _vpartition->find_partition(block.get(), index, _partitions[index]); } - bool stop_processing = false; for (int row_index = 0; row_index < rows; row_index++) { if (_partitions[row_index] == nullptr) [[unlikely]] { _filter_bitmap.Set(row_index, true); LOG(WARNING) << "no partition for this tuple. tuple=" << block->dump_data(row_index, 1); - RETURN_IF_ERROR(state->append_error_msg_to_file( + _has_filtered_rows = true; + state->update_num_rows_load_filtered(1); + state->update_num_rows_load_total(-1); + // meiyi: we should ignore this error in group commit, + // as errors should no longer occur after the first 20,000 rows. + static_cast<void>(state->append_error_msg_to_file( []() -> std::string { return ""; }, [&]() -> std::string { fmt::memory_buffer buf; fmt::format_to(buf, "no partition for this tuple. tuple=\n{}", block->dump_data(row_index, 1)); return fmt::to_string(buf); - }, - &stop_processing)); - _has_filtered_rows = true; - state->update_num_rows_load_filtered(1); - state->update_num_rows_load_total(-1); + })); } } } diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 025e13edff0..e4f809988e7 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -291,9 +291,8 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( std::vector<RowPartTabletIds>& row_part_tablet_ids) { auto num_rows = block->rows(); - bool stop_processing = false; RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); + _tablet_indexes, _skip)); if (has_filtered_rows) { for (int i = 0; i < num_rows; i++) { _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; @@ -359,11 +358,9 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition( auto partition_col = block->get_by_position(partition_keys[0]); _missing_map.clear(); _missing_map.reserve(partition_col.column->size()); - bool stop_processing = false; RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip, - &_missing_map)); + _tablet_indexes, _skip, &_missing_map)); // the missing vals for auto partition are also skipped. if (has_filtered_rows) { @@ -390,7 +387,6 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto // partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz // we already saved missing values. - bool stop_processing = false; if (_vpartition->is_auto_partition() && _state->query_options().enable_auto_create_when_overwrite) { // allow auto create partition for missing rows. @@ -400,8 +396,7 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( _missing_map.reserve(partition_col.column->size()); RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip, - &_missing_map)); + _tablet_indexes, _skip, &_missing_map)); // allow and really need to create during auto-detect-overwriting. if (!_missing_map.empty()) { @@ -409,7 +404,7 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( } } else { RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); + _tablet_indexes, _skip)); } RETURN_IF_ERROR(_replace_overwriting_partition()); @@ -419,8 +414,7 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( _state->query_options().enable_auto_create_when_overwrite) { // here _missing_map is just a placeholder RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip, - &_missing_map)); + _tablet_indexes, _skip, &_missing_map)); if (VLOG_TRACE_IS_ON) { std::string tmp; for (auto v : _missing_map) { @@ -430,7 +424,7 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( } } else { RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); + _tablet_indexes, _skip)); } if (has_filtered_rows) { for (int i = 0; i < num_rows; i++) { diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 9a59d45568b..9680a930d0d 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -92,14 +92,11 @@ Status OlapTableBlockConvertor::validate_and_convert_block( SCOPED_RAW_TIMER(&_validate_data_ns); _filter_map.clear(); _filter_map.resize(rows, 0); - bool stop_processing = false; - RETURN_IF_ERROR(_validate_data(state, block.get(), rows, filtered_rows, &stop_processing)); + auto st = _validate_data(state, block.get(), rows, filtered_rows); _num_filtered_rows += filtered_rows; has_filtered_rows = filtered_rows > 0; - if (stop_processing) { - // should be returned after updating "_number_filtered_rows", to make sure that load job can be cancelled - // because of "data unqualified" - return Status::DataQualityError("Encountered unqualified data, stop processing"); + if (!st.ok()) { + return st; } _convert_to_dest_desc_block(block.get()); } @@ -185,9 +182,8 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip Status OlapTableBlockConvertor::_internal_validate_column( RuntimeState* state, const TypeDescriptor& type, bool is_nullable, - vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, const uint32_t row_count, - vectorized::IColumn::Permutation* rows) { + vectorized::ColumnPtr column, size_t slot_index, fmt::memory_buffer& error_prefix, + const uint32_t row_count, vectorized::IColumn::Permutation* rows) { DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { @@ -196,8 +192,7 @@ Status OlapTableBlockConvertor::_internal_validate_column( [&error_prefix, &error_msg]() -> std::string { return fmt::to_string(error_prefix) + fmt::to_string(error_msg); - }, - stop_processing); + }); error_msg.clear(); return ret; }; @@ -396,8 +391,8 @@ Status OlapTableBlockConvertor::_internal_validate_column( } fmt::format_to(error_prefix, "ARRAY type failed: "); RETURN_IF_ERROR(_validate_column(state, nested_type, type.contains_nulls[0], - column_array->get_data_ptr(), slot_index, stop_processing, - error_prefix, permutation.size(), &permutation)); + column_array->get_data_ptr(), slot_index, error_prefix, + permutation.size(), &permutation)); break; } case TYPE_MAP: { @@ -414,11 +409,11 @@ Status OlapTableBlockConvertor::_internal_validate_column( } fmt::format_to(error_prefix, "MAP type failed: "); RETURN_IF_ERROR(_validate_column(state, key_type, type.contains_nulls[0], - column_map->get_keys_ptr(), slot_index, stop_processing, - error_prefix, permutation.size(), &permutation)); + column_map->get_keys_ptr(), slot_index, error_prefix, + permutation.size(), &permutation)); RETURN_IF_ERROR(_validate_column(state, val_type, type.contains_nulls[1], - column_map->get_values_ptr(), slot_index, stop_processing, - error_prefix, permutation.size(), &permutation)); + column_map->get_values_ptr(), slot_index, error_prefix, + permutation.size(), &permutation)); break; } case TYPE_STRUCT: { @@ -429,7 +424,7 @@ Status OlapTableBlockConvertor::_internal_validate_column( for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) { RETURN_IF_ERROR(_validate_column(state, type.children[sc], type.contains_nulls[sc], column_struct->get_column_ptr(sc), slot_index, - stop_processing, error_prefix, + error_prefix, column_struct->get_column_ptr(sc)->size())); } break; @@ -457,8 +452,13 @@ Status OlapTableBlockConvertor::_internal_validate_column( } Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::Block* block, - const uint32_t rows, int& filtered_rows, - bool* stop_processing) { + const uint32_t rows, int& filtered_rows) { + filtered_rows = 0; + Defer defer {[&] { + for (int i = 0; i < rows; ++i) { + filtered_rows += _filter_map[i]; + } + }}; for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { SlotDescriptor* desc = _output_tuple_desc->slots()[i]; block->get_by_position(i).column = @@ -468,12 +468,7 @@ Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized:: fmt::memory_buffer error_prefix; fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name()); RETURN_IF_ERROR(_validate_column(state, desc->type(), desc->is_nullable(), column, i, - stop_processing, error_prefix, rows)); - } - - filtered_rows = 0; - for (int i = 0; i < rows; ++i) { - filtered_rows += _filter_map[i]; + error_prefix, rows)); } return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 7f866c38032..fbeec07870f 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -67,27 +67,26 @@ private: DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type); Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, - vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, + vectorized::ColumnPtr column, size_t slot_index, fmt::memory_buffer& error_prefix, const uint32_t row_count, vectorized::IColumn::Permutation* rows = nullptr) { RETURN_IF_CATCH_EXCEPTION({ return _internal_validate_column(state, type, is_nullable, column, slot_index, - stop_processing, error_prefix, row_count, rows); + error_prefix, row_count, rows); }); } Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, const uint32_t row_count, + size_t slot_index, fmt::memory_buffer& error_prefix, + const uint32_t row_count, vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. // invalid row number is set in Bitmap - // set stop_processing if we want to stop the whole process now. Status _validate_data(RuntimeState* state, vectorized::Block* block, const uint32_t rows, - int& filtered_rows, bool* stop_processing); + int& filtered_rows); // some output column of output expr may have different nullable property with dest slot desc // so here need to do the convert operation @@ -123,4 +122,4 @@ private: bool _is_partial_update_and_auto_inc = false; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 3bfd5bb4d22..a605a005b5f 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -34,8 +34,8 @@ namespace doris::vectorized { Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, std::vector<VOlapTablePartition*>& partitions, - std::vector<uint32_t>& tablet_index, bool& stop_processing, - std::vector<bool>& skip, std::vector<int64_t>* miss_rows) { + std::vector<uint32_t>& tablet_index, std::vector<bool>& skip, + std::vector<int64_t>* miss_rows) { for (int index = 0; index < rows; index++) { _vpartition->find_partition(block, index, partitions[index]); } @@ -50,6 +50,9 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row skip[row_index] = true; continue; } + _num_filtered_rows++; + _filter_bitmap.Set(row_index, true); + skip[row_index] = true; RETURN_IF_ERROR(state->append_error_msg_to_file( []() -> std::string { return ""; }, [&]() -> std::string { @@ -57,14 +60,7 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row fmt::format_to(buf, "no partition for this tuple. tuple=\n{}", block->dump_data(row_index, 1)); return fmt::to_string(buf); - }, - &stop_processing)); - _num_filtered_rows++; - _filter_bitmap.Set(row_index, true); - if (stop_processing) { - return Status::DataQualityError("Encountered unqualified data, stop processing"); - } - skip[row_index] = true; + })); continue; } if (!partitions[row_index]->is_mutable) [[unlikely]] { diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 24f8e357e28..52c2b3db7e5 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -44,8 +44,8 @@ public: Status find_tablets(RuntimeState* state, vectorized::Block* block, int rows, std::vector<VOlapTablePartition*>& partitions, - std::vector<uint32_t>& tablet_index, bool& filtered, - std::vector<bool>& skip, std::vector<int64_t>* miss_rows = nullptr); + std::vector<uint32_t>& tablet_index, std::vector<bool>& skip, + std::vector<int64_t>* miss_rows = nullptr); bool is_find_tablet_every_sink() { return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 27b40471a69..60b0710116d 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -1606,7 +1606,7 @@ suite("test_stream_load", "p0") { def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) assertTrue(result.contains("ErrorURL")) - assertTrue(json.Message.contains("Encountered unqualified data, stop processing")) + assertTrue(json.Message.contains("Encountered unqualified data, stop processing. Please")) } } } finally { diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy new file mode 100644 index 00000000000..1801d2be52d --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +import java.text.SimpleDateFormat + +suite("test_stream_load_with_filtered_rows", "p2") { + sql "show tables" + + // test length of input is too long than schema. + def tableName = "test_large_file_with_many_filtered_rows" + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE `${tableName}` ( + `k1` int NULL, + `k2` tinyint NULL, + `k3` smallint NULL, + `k4` bigint NULL, + `k5` largeint NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9,0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`, `k3`) + DISTRIBUTED BY HASH(`k1`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "false" + ); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + file """${getS3Url()}/regression/load_p2/stream_load/test_stream_load_with_dbgen_progress.csv""" + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(result.contains("ErrorURL")) + assertTrue(json.Message.contains("Encountered unqualified data, stop processing. Please")) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + file """${getS3Url()}/regression/load_p2/stream_load/test_stream_load_with_dbgen_progress.json""" + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(result.contains("ErrorURL")) + assertTrue(json.Message.contains("Encountered unqualified data, stop processing. Please")) + } + } + + } finally { + //sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + } + +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org