This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new cf847b8c711 [fix](load) return DataQualityError when filtered rows 
exceeds limit (#47617) (#49289)
cf847b8c711 is described below

commit cf847b8c711207254ba386c1fa7cd2d7a2921299
Author: Kaijie Chen <chenkai...@selectdb.com>
AuthorDate: Wed Apr 2 19:09:21 2025 +0800

    [fix](load) return DataQualityError when filtered rows exceeds limit 
(#47617) (#49289)
    
    backport #47617
---
 be/src/runtime/runtime_state.cpp                   |  8 +-
 be/src/runtime/runtime_state.h                     |  2 +-
 be/src/vec/exec/format/csv/csv_reader.cpp          | 21 +++--
 be/src/vec/exec/format/json/new_json_reader.cpp    | 57 ++++++-------
 be/src/vec/exec/scan/vfile_scanner.cpp             | 12 ++-
 be/src/vec/sink/group_commit_block_sink.cpp        | 14 ++--
 be/src/vec/sink/vrow_distribution.cpp              | 18 ++---
 be/src/vec/sink/vtablet_block_convertor.cpp        | 47 +++++------
 be/src/vec/sink/vtablet_block_convertor.h          | 13 ++-
 be/src/vec/sink/vtablet_finder.cpp                 | 16 ++--
 be/src/vec/sink/vtablet_finder.h                   |  4 +-
 .../load_p0/stream_load/test_stream_load.groovy    |  2 +-
 .../test_stream_load_with_filtered_rows.groovy     | 94 ++++++++++++++++++++++
 13 files changed, 192 insertions(+), 116 deletions(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index cdb5a65a977..5b95fbf822a 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -407,8 +407,7 @@ Status RuntimeState::create_error_log_file() {
 
 Status RuntimeState::append_error_msg_to_file(std::function<std::string()> 
line,
                                               std::function<std::string()> 
error_msg,
-                                              bool* stop_processing, bool 
is_summary) {
-    *stop_processing = false;
+                                              bool is_summary) {
     if (query_type() != TQueryType::LOAD) {
         return Status::OK();
     }
@@ -431,7 +430,10 @@ Status 
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
     if (_num_print_error_rows.fetch_add(1, std::memory_order_relaxed) > 
MAX_ERROR_NUM &&
         !is_summary) {
         if (_load_zero_tolerance) {
-            *stop_processing = true;
+            return Status::DataQualityError(
+                    "Encountered unqualified data, stop processing. Please 
check if the source "
+                    "data matches the schema, and consider disabling strict 
mode or increasing "
+                    "max_filter_ratio.");
         }
         return Status::OK();
     }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 6f1a4b8d189..0f368748101 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -288,7 +288,7 @@ public:
     // is_summary is true, means we are going to write the summary line
     // If we need to stop the processing, set stop_processing to true
     Status append_error_msg_to_file(std::function<std::string()> line,
-                                    std::function<std::string()> error_msg, 
bool* stop_processing,
+                                    std::function<std::string()> error_msg,
                                     bool is_summary = false);
 
     int64_t num_bytes_load_total() { return _num_bytes_load_total.load(); }
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 77a5b65d512..660e25b2b72 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -719,6 +719,8 @@ Status CsvReader::_validate_line(const Slice& line, bool* 
success) {
         if (!_is_load) {
             return Status::InternalError<false>("Only support csv data in utf8 
codec");
         } else {
+            _counter->num_rows_filtered++;
+            *success = false;
             RETURN_IF_ERROR(_state->append_error_msg_to_file(
                     [&]() -> std::string { return std::string(line.data, 
line.size); },
                     [&]() -> std::string {
@@ -727,10 +729,7 @@ Status CsvReader::_validate_line(const Slice& line, bool* 
success) {
                                        "Unable to display, only support csv 
data in utf8 codec",
                                        ", please check the data encoding");
                         return fmt::to_string(error_msg);
-                    },
-                    &_line_reader_eof));
-            _counter->num_rows_filtered++;
-            *success = false;
+                    }));
             return Status::OK();
         }
     }
@@ -748,6 +747,8 @@ Status CsvReader::_line_split_to_values(const Slice& line, 
bool* success) {
         if (_split_values.size() != _file_slot_descs.size()) {
             std::string cmp_str =
                     _split_values.size() > _file_slot_descs.size() ? "more 
than" : "less than";
+            _counter->num_rows_filtered++;
+            *success = false;
             RETURN_IF_ERROR(_state->append_error_msg_to_file(
                     [&]() -> std::string { return std::string(line.data, 
line.size); },
                     [&]() -> std::string {
@@ -771,10 +772,7 @@ Status CsvReader::_line_split_to_values(const Slice& line, 
bool* success) {
                         }
                         fmt::format_to(error_msg, "result values:[{}]", 
fmt::to_string(values));
                         return fmt::to_string(error_msg);
-                    },
-                    &_line_reader_eof));
-            _counter->num_rows_filtered++;
-            *success = false;
+                    }));
             return Status::OK();
         }
     }
@@ -797,6 +795,8 @@ Status CsvReader::_check_array_format(std::vector<Slice>& 
split_values, bool* is
         }
         const Slice& value = split_values[j];
         if (slot_desc->type().is_array_type() && !_is_null(value) && 
!_is_array(value)) {
+            _counter->num_rows_filtered++;
+            *is_success = false;
             RETURN_IF_ERROR(_state->append_error_msg_to_file(
                     [&]() -> std::string { return std::string(value.data, 
value.size); },
                     [&]() -> std::string {
@@ -804,10 +804,7 @@ Status CsvReader::_check_array_format(std::vector<Slice>& 
split_values, bool* is
                         fmt::format_to(err_msg, "Invalid format for array 
column({})",
                                        slot_desc->col_name());
                         return fmt::to_string(err_msg);
-                    },
-                    &_line_reader_eof));
-            _counter->num_rows_filtered++;
-            *is_success = false;
+                    }));
             return Status::OK();
         }
     }
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 6a75a135793..455410863d9 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -666,10 +666,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
         fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
                        _origin_json_doc.GetParseError(),
                        
rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
+        _counter->num_rows_filtered++;
         RETURN_IF_ERROR(_state->append_error_msg_to_file(
                 [&]() -> std::string { return std::string((char*)json_str, 
*size); },
-                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
-        _counter->num_rows_filtered++;
+                [&]() -> std::string { return fmt::to_string(error_msg); }));
         if (*_scanner_eof) {
             // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
             // we meet enough invalid rows and the scanner should be stopped.
@@ -687,10 +687,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
         if (_json_doc == nullptr) {
             fmt::memory_buffer error_msg;
             fmt::format_to(error_msg, "{}", "JSON Root not found.");
+            _counter->num_rows_filtered++;
             RETURN_IF_ERROR(_state->append_error_msg_to_file(
                     [&]() -> std::string { return 
_print_json_value(_origin_json_doc); },
-                    [&]() -> std::string { return fmt::to_string(error_msg); 
}, _scanner_eof));
-            _counter->num_rows_filtered++;
+                    [&]() -> std::string { return fmt::to_string(error_msg); 
}));
             if (*_scanner_eof) {
                 // Same as Case A
                 *eof = true;
@@ -706,10 +706,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is array-object, `strip_outer_array` must be 
TRUE.");
+        _counter->num_rows_filtered++;
         RETURN_IF_ERROR(_state->append_error_msg_to_file(
                 [&]() -> std::string { return 
_print_json_value(_origin_json_doc); },
-                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
-        _counter->num_rows_filtered++;
+                [&]() -> std::string { return fmt::to_string(error_msg); }));
         if (*_scanner_eof) {
             // Same as Case A
             *eof = true;
@@ -722,10 +722,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is not an array-object, `strip_outer_array` 
must be FALSE.");
+        _counter->num_rows_filtered++;
         RETURN_IF_ERROR(_state->append_error_msg_to_file(
                 [&]() -> std::string { return 
_print_json_value(_origin_json_doc); },
-                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
-        _counter->num_rows_filtered++;
+                [&]() -> std::string { return fmt::to_string(error_msg); }));
         if (*_scanner_eof) {
             // Same as Case A
             *eof = true;
@@ -954,20 +954,21 @@ Status NewJsonReader::_append_error_msg(const 
rapidjson::Value& objectValue, std
         err_msg = error_msg;
     }
 
+    _counter->num_rows_filtered++;
+    if (valid != nullptr) {
+        // current row is invalid
+        *valid = false;
+    }
+
     RETURN_IF_ERROR(_state->append_error_msg_to_file(
             [&]() -> std::string { return 
NewJsonReader::_print_json_value(objectValue); },
-            [&]() -> std::string { return err_msg; }, _scanner_eof));
+            [&]() -> std::string { return err_msg; }));
 
     // TODO(ftw): check here?
     if (*_scanner_eof) {
         _reader_eof = true;
     }
 
-    _counter->num_rows_filtered++;
-    if (valid != nullptr) {
-        // current row is invalid
-        *valid = false;
-    }
     return Status::OK();
 }
 
@@ -1088,11 +1089,6 @@ Status 
NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Bl
     fmt::memory_buffer error_msg;
     fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: 
{}", error.error(),
                    error.what());
-    RETURN_IF_ERROR(_state->append_error_msg_to_file(
-            [&]() -> std::string {
-                return std::string(_simdjson_ondemand_padding_buffer.data(), 
_original_doc_size);
-            },
-            [&]() -> std::string { return fmt::to_string(error_msg); }, eof));
     _counter->num_rows_filtered++;
     // Before continuing to process other rows, we need to first clean the 
fail parsed row.
     for (int i = 0; i < block.columns(); ++i) {
@@ -1102,6 +1098,11 @@ Status 
NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Bl
         }
     }
 
+    RETURN_IF_ERROR(_state->append_error_msg_to_file(
+            [&]() -> std::string {
+                return std::string(_simdjson_ondemand_padding_buffer.data(), 
_original_doc_size);
+            },
+            [&]() -> std::string { return fmt::to_string(error_msg); }));
     return Status::OK();
 }
 
@@ -1534,6 +1535,12 @@ Status 
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
         err_msg = error_msg;
     }
 
+    _counter->num_rows_filtered++;
+    if (valid != nullptr) {
+        // current row is invalid
+        *valid = false;
+    }
+
     RETURN_IF_ERROR(_state->append_error_msg_to_file(
             [&]() -> std::string {
                 if (!obj) {
@@ -1543,13 +1550,7 @@ Status 
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
                 (void)!obj->raw_json().get(str_view);
                 return std::string(str_view.data(), str_view.size());
             },
-            [&]() -> std::string { return err_msg; }, _scanner_eof));
-
-    _counter->num_rows_filtered++;
-    if (valid != nullptr) {
-        // current row is invalid
-        *valid = false;
-    }
+            [&]() -> std::string { return err_msg; }));
     return Status::OK();
 }
 
@@ -1619,10 +1620,10 @@ Status NewJsonReader::_get_json_value(size_t* size, 
bool* eof, simdjson::error_c
     SCOPED_TIMER(_file_read_timer);
     auto return_quality_error = [&](fmt::memory_buffer& error_msg,
                                     const std::string& doc_info) -> Status {
+        _counter->num_rows_filtered++;
         RETURN_IF_ERROR(_state->append_error_msg_to_file(
                 [&]() -> std::string { return doc_info; },
-                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
-        _counter->num_rows_filtered++;
+                [&]() -> std::string { return fmt::to_string(error_msg); }));
         if (*_scanner_eof) {
             // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
             // we meet enough invalid rows and the scanner should be stopped.
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index f4376cc1e70..dfd3397d270 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -306,7 +306,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
     Status st = _get_block_wrapped(state, block, eof);
     if (!st.ok()) {
         // add cur path in error msg for easy debugging
-        return std::move(st.prepend("cur path: " + 
get_current_scan_range_name() + ". "));
+        return std::move(st.append(". cur path: " + 
get_current_scan_range_name()));
     }
     return st;
 }
@@ -629,6 +629,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) 
{
                     if (_strict_mode && 
(_src_slot_descs_order_by_dest[dest_index]) &&
                         
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index])
                                  .column->is_null_at(i)) {
+                        filter_map[i] = false;
                         RETURN_IF_ERROR(_state->append_error_msg_to_file(
                                 [&]() -> std::string {
                                     return _src_block_ptr->dump_one_line(i,
@@ -648,10 +649,9 @@ Status VFileScanner::_convert_to_output_block(Block* 
block) {
                                                    "src value is {}",
                                                    slot_desc->col_name(), 
_strict_mode, raw_string);
                                     return fmt::to_string(error_msg);
-                                },
-                                &_scanner_eof));
-                        filter_map[i] = false;
+                                }));
                     } else if (!slot_desc->is_nullable()) {
+                        filter_map[i] = false;
                         RETURN_IF_ERROR(_state->append_error_msg_to_file(
                                 [&]() -> std::string {
                                     return _src_block_ptr->dump_one_line(i,
@@ -664,9 +664,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) 
{
                                                    "nullable",
                                                    slot_desc->col_name());
                                     return fmt::to_string(error_msg);
-                                },
-                                &_scanner_eof));
-                        filter_map[i] = false;
+                                }));
                     }
                 }
             }
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 40e4b97a6e8..c510299aaa9 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -167,24 +167,24 @@ Status GroupCommitBlockSink::send(RuntimeState* state, 
vectorized::Block* input_
         for (int index = 0; index < rows; index++) {
             _vpartition->find_partition(block.get(), index, 
_partitions[index]);
         }
-        bool stop_processing = false;
         for (int row_index = 0; row_index < rows; row_index++) {
             if (_partitions[row_index] == nullptr) [[unlikely]] {
                 _filter_bitmap.Set(row_index, true);
                 LOG(WARNING) << "no partition for this tuple. tuple="
                              << block->dump_data(row_index, 1);
-                RETURN_IF_ERROR(state->append_error_msg_to_file(
+                _has_filtered_rows = true;
+                state->update_num_rows_load_filtered(1);
+                state->update_num_rows_load_total(-1);
+                // meiyi: we should ignore this error in group commit,
+                // as errors should no longer occur after the first 20,000 
rows.
+                static_cast<void>(state->append_error_msg_to_file(
                         []() -> std::string { return ""; },
                         [&]() -> std::string {
                             fmt::memory_buffer buf;
                             fmt::format_to(buf, "no partition for this tuple. 
tuple=\n{}",
                                            block->dump_data(row_index, 1));
                             return fmt::to_string(buf);
-                        },
-                        &stop_processing));
-                _has_filtered_rows = true;
-                state->update_num_rows_load_filtered(1);
-                state->update_num_rows_load_total(-1);
+                        }));
             }
         }
     }
diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 025e13edff0..e4f809988e7 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -291,9 +291,8 @@ Status 
VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
         std::vector<RowPartTabletIds>& row_part_tablet_ids) {
     auto num_rows = block->rows();
 
-    bool stop_processing = false;
     RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                 _tablet_indexes, 
stop_processing, _skip));
+                                                 _tablet_indexes, _skip));
     if (has_filtered_rows) {
         for (int i = 0; i < num_rows; i++) {
             _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
@@ -359,11 +358,9 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_partition(
     auto partition_col = block->get_by_position(partition_keys[0]);
     _missing_map.clear();
     _missing_map.reserve(partition_col.column->size());
-    bool stop_processing = false;
 
     RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                 _tablet_indexes, 
stop_processing, _skip,
-                                                 &_missing_map));
+                                                 _tablet_indexes, _skip, 
&_missing_map));
 
     // the missing vals for auto partition are also skipped.
     if (has_filtered_rows) {
@@ -390,7 +387,6 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
     // for auto-partition's, find and save origins in _partitions and replace 
them. at meanwhile save the missing values for auto
     //  partition. then we find partition again to get replaced partitions in 
_partitions. this time _missing_map is ignored cuz
     //  we already saved missing values.
-    bool stop_processing = false;
     if (_vpartition->is_auto_partition() &&
         _state->query_options().enable_auto_create_when_overwrite) {
         // allow auto create partition for missing rows.
@@ -400,8 +396,7 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
         _missing_map.reserve(partition_col.column->size());
 
         RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                     _tablet_indexes, 
stop_processing, _skip,
-                                                     &_missing_map));
+                                                     _tablet_indexes, _skip, 
&_missing_map));
 
         // allow and really need to create during auto-detect-overwriting.
         if (!_missing_map.empty()) {
@@ -409,7 +404,7 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
         }
     } else {
         RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                     _tablet_indexes, 
stop_processing, _skip));
+                                                     _tablet_indexes, _skip));
     }
     RETURN_IF_ERROR(_replace_overwriting_partition());
 
@@ -419,8 +414,7 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
         _state->query_options().enable_auto_create_when_overwrite) {
         // here _missing_map is just a placeholder
         RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                     _tablet_indexes, 
stop_processing, _skip,
-                                                     &_missing_map));
+                                                     _tablet_indexes, _skip, 
&_missing_map));
         if (VLOG_TRACE_IS_ON) {
             std::string tmp;
             for (auto v : _missing_map) {
@@ -430,7 +424,7 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
         }
     } else {
         RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                     _tablet_indexes, 
stop_processing, _skip));
+                                                     _tablet_indexes, _skip));
     }
     if (has_filtered_rows) {
         for (int i = 0; i < num_rows; i++) {
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
index 9a59d45568b..9680a930d0d 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -92,14 +92,11 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
         SCOPED_RAW_TIMER(&_validate_data_ns);
         _filter_map.clear();
         _filter_map.resize(rows, 0);
-        bool stop_processing = false;
-        RETURN_IF_ERROR(_validate_data(state, block.get(), rows, 
filtered_rows, &stop_processing));
+        auto st = _validate_data(state, block.get(), rows, filtered_rows);
         _num_filtered_rows += filtered_rows;
         has_filtered_rows = filtered_rows > 0;
-        if (stop_processing) {
-            // should be returned after updating "_number_filtered_rows", to 
make sure that load job can be cancelled
-            // because of "data unqualified"
-            return Status::DataQualityError("Encountered unqualified data, 
stop processing");
+        if (!st.ok()) {
+            return st;
         }
         _convert_to_dest_desc_block(block.get());
     }
@@ -185,9 +182,8 @@ DecimalType 
OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
 
 Status OlapTableBlockConvertor::_internal_validate_column(
         RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
-        vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
-        fmt::memory_buffer& error_prefix, const uint32_t row_count,
-        vectorized::IColumn::Permutation* rows) {
+        vectorized::ColumnPtr column, size_t slot_index, fmt::memory_buffer& 
error_prefix,
+        const uint32_t row_count, vectorized::IColumn::Permutation* rows) {
     DCHECK((rows == nullptr) || (rows->size() == row_count));
     fmt::memory_buffer error_msg;
     auto set_invalid_and_append_error_msg = [&](int row) {
@@ -196,8 +192,7 @@ Status OlapTableBlockConvertor::_internal_validate_column(
                                                    [&error_prefix, 
&error_msg]() -> std::string {
                                                        return 
fmt::to_string(error_prefix) +
                                                               
fmt::to_string(error_msg);
-                                                   },
-                                                   stop_processing);
+                                                   });
         error_msg.clear();
         return ret;
     };
@@ -396,8 +391,8 @@ Status OlapTableBlockConvertor::_internal_validate_column(
         }
         fmt::format_to(error_prefix, "ARRAY type failed: ");
         RETURN_IF_ERROR(_validate_column(state, nested_type, 
type.contains_nulls[0],
-                                         column_array->get_data_ptr(), 
slot_index, stop_processing,
-                                         error_prefix, permutation.size(), 
&permutation));
+                                         column_array->get_data_ptr(), 
slot_index, error_prefix,
+                                         permutation.size(), &permutation));
         break;
     }
     case TYPE_MAP: {
@@ -414,11 +409,11 @@ Status OlapTableBlockConvertor::_internal_validate_column(
         }
         fmt::format_to(error_prefix, "MAP type failed: ");
         RETURN_IF_ERROR(_validate_column(state, key_type, 
type.contains_nulls[0],
-                                         column_map->get_keys_ptr(), 
slot_index, stop_processing,
-                                         error_prefix, permutation.size(), 
&permutation));
+                                         column_map->get_keys_ptr(), 
slot_index, error_prefix,
+                                         permutation.size(), &permutation));
         RETURN_IF_ERROR(_validate_column(state, val_type, 
type.contains_nulls[1],
-                                         column_map->get_values_ptr(), 
slot_index, stop_processing,
-                                         error_prefix, permutation.size(), 
&permutation));
+                                         column_map->get_values_ptr(), 
slot_index, error_prefix,
+                                         permutation.size(), &permutation));
         break;
     }
     case TYPE_STRUCT: {
@@ -429,7 +424,7 @@ Status OlapTableBlockConvertor::_internal_validate_column(
         for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
             RETURN_IF_ERROR(_validate_column(state, type.children[sc], 
type.contains_nulls[sc],
                                              
column_struct->get_column_ptr(sc), slot_index,
-                                             stop_processing, error_prefix,
+                                             error_prefix,
                                              
column_struct->get_column_ptr(sc)->size()));
         }
         break;
@@ -457,8 +452,13 @@ Status OlapTableBlockConvertor::_internal_validate_column(
 }
 
 Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, 
vectorized::Block* block,
-                                               const uint32_t rows, int& 
filtered_rows,
-                                               bool* stop_processing) {
+                                               const uint32_t rows, int& 
filtered_rows) {
+    filtered_rows = 0;
+    Defer defer {[&] {
+        for (int i = 0; i < rows; ++i) {
+            filtered_rows += _filter_map[i];
+        }
+    }};
     for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
         SlotDescriptor* desc = _output_tuple_desc->slots()[i];
         block->get_by_position(i).column =
@@ -468,12 +468,7 @@ Status 
OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::
         fmt::memory_buffer error_prefix;
         fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
         RETURN_IF_ERROR(_validate_column(state, desc->type(), 
desc->is_nullable(), column, i,
-                                         stop_processing, error_prefix, rows));
-    }
-
-    filtered_rows = 0;
-    for (int i = 0; i < rows; ++i) {
-        filtered_rows += _filter_map[i];
+                                         error_prefix, rows));
     }
     return Status::OK();
 }
diff --git a/be/src/vec/sink/vtablet_block_convertor.h 
b/be/src/vec/sink/vtablet_block_convertor.h
index 7f866c38032..fbeec07870f 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -67,27 +67,26 @@ private:
     DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
 
     Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
-                            vectorized::ColumnPtr column, size_t slot_index, 
bool* stop_processing,
+                            vectorized::ColumnPtr column, size_t slot_index,
                             fmt::memory_buffer& error_prefix, const uint32_t 
row_count,
                             vectorized::IColumn::Permutation* rows = nullptr) {
         RETURN_IF_CATCH_EXCEPTION({
             return _internal_validate_column(state, type, is_nullable, column, 
slot_index,
-                                             stop_processing, error_prefix, 
row_count, rows);
+                                             error_prefix, row_count, rows);
         });
     }
 
     Status _internal_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
                                      bool is_nullable, vectorized::ColumnPtr 
column,
-                                     size_t slot_index, bool* stop_processing,
-                                     fmt::memory_buffer& error_prefix, const 
uint32_t row_count,
+                                     size_t slot_index, fmt::memory_buffer& 
error_prefix,
+                                     const uint32_t row_count,
                                      vectorized::IColumn::Permutation* rows = 
nullptr);
 
     // make input data valid for OLAP table
     // return number of invalid/filtered rows.
     // invalid row number is set in Bitmap
-    // set stop_processing if we want to stop the whole process now.
     Status _validate_data(RuntimeState* state, vectorized::Block* block, const 
uint32_t rows,
-                          int& filtered_rows, bool* stop_processing);
+                          int& filtered_rows);
 
     // some output column of output expr may have different nullable property 
with dest slot desc
     // so here need to do the convert operation
@@ -123,4 +122,4 @@ private:
     bool _is_partial_update_and_auto_inc = false;
 };
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vtablet_finder.cpp 
b/be/src/vec/sink/vtablet_finder.cpp
index 3bfd5bb4d22..a605a005b5f 100644
--- a/be/src/vec/sink/vtablet_finder.cpp
+++ b/be/src/vec/sink/vtablet_finder.cpp
@@ -34,8 +34,8 @@
 namespace doris::vectorized {
 Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int 
rows,
                                       std::vector<VOlapTablePartition*>& 
partitions,
-                                      std::vector<uint32_t>& tablet_index, 
bool& stop_processing,
-                                      std::vector<bool>& skip, 
std::vector<int64_t>* miss_rows) {
+                                      std::vector<uint32_t>& tablet_index, 
std::vector<bool>& skip,
+                                      std::vector<int64_t>* miss_rows) {
     for (int index = 0; index < rows; index++) {
         _vpartition->find_partition(block, index, partitions[index]);
     }
@@ -50,6 +50,9 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, 
Block* block, int row
                 skip[row_index] = true;
                 continue;
             }
+            _num_filtered_rows++;
+            _filter_bitmap.Set(row_index, true);
+            skip[row_index] = true;
             RETURN_IF_ERROR(state->append_error_msg_to_file(
                     []() -> std::string { return ""; },
                     [&]() -> std::string {
@@ -57,14 +60,7 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, 
Block* block, int row
                         fmt::format_to(buf, "no partition for this tuple. 
tuple=\n{}",
                                        block->dump_data(row_index, 1));
                         return fmt::to_string(buf);
-                    },
-                    &stop_processing));
-            _num_filtered_rows++;
-            _filter_bitmap.Set(row_index, true);
-            if (stop_processing) {
-                return Status::DataQualityError("Encountered unqualified data, 
stop processing");
-            }
-            skip[row_index] = true;
+                    }));
             continue;
         }
         if (!partitions[row_index]->is_mutable) [[unlikely]] {
diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h
index 24f8e357e28..52c2b3db7e5 100644
--- a/be/src/vec/sink/vtablet_finder.h
+++ b/be/src/vec/sink/vtablet_finder.h
@@ -44,8 +44,8 @@ public:
 
     Status find_tablets(RuntimeState* state, vectorized::Block* block, int 
rows,
                         std::vector<VOlapTablePartition*>& partitions,
-                        std::vector<uint32_t>& tablet_index, bool& filtered,
-                        std::vector<bool>& skip, std::vector<int64_t>* 
miss_rows = nullptr);
+                        std::vector<uint32_t>& tablet_index, 
std::vector<bool>& skip,
+                        std::vector<int64_t>* miss_rows = nullptr);
 
     bool is_find_tablet_every_sink() {
         return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 27b40471a69..60b0710116d 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1606,7 +1606,7 @@ suite("test_stream_load", "p0") {
                 def json = parseJson(result)
                 assertEquals("fail", json.Status.toLowerCase())
                 assertTrue(result.contains("ErrorURL"))
-                assertTrue(json.Message.contains("Encountered unqualified 
data, stop processing"))
+                assertTrue(json.Message.contains("Encountered unqualified 
data, stop processing. Please"))
             }
         }
     } finally {
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy
new file mode 100644
index 00000000000..1801d2be52d
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+import java.text.SimpleDateFormat
+
+suite("test_stream_load_with_filtered_rows", "p2") {
+    sql "show tables"
+
+    // test length of input is too long than schema.
+    def tableName = "test_large_file_with_many_filtered_rows"
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE `${tableName}` (
+            `k1` int NULL,
+            `k2` tinyint NULL,
+            `k3` smallint NULL,
+            `k4` bigint NULL,
+            `k5` largeint NULL,
+            `k6` float NULL,
+            `k7` double NULL,
+            `k8` decimal(9,0) NULL,
+            `k9` char(10) NULL,
+            `k10` varchar(1024) NULL,
+            `k11` text NULL,
+            `k12` date NULL,
+            `k13` datetime NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`, `k2`, `k3`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS AUTO
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "disable_auto_compaction" = "false"
+            );
+        """
+
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', '|'
+            file 
"""${getS3Url()}/regression/load_p2/stream_load/test_stream_load_with_dbgen_progress.csv"""
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+                assertTrue(result.contains("ErrorURL"))
+                assertTrue(json.Message.contains("Encountered unqualified 
data, stop processing. Please"))
+            }
+        }
+
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', '|'
+            file 
"""${getS3Url()}/regression/load_p2/stream_load/test_stream_load_with_dbgen_progress.json"""
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+                assertTrue(result.contains("ErrorURL"))
+                assertTrue(json.Message.contains("Encountered unqualified 
data, stop processing. Please"))
+            }
+        }
+
+    } finally {
+        //sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+    }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to