This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7a38b5874 [fix](runtime filter) append late arrival runtime filters 
in vfilecanner (#25996)
fa7a38b5874 is described below

commit fa7a38b5874357297bee53f00918f85404b9617e
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Tue Nov 7 09:50:35 2023 +0800

    [fix](runtime filter) append late arrival runtime filters in vfilecanner 
(#25996)
    
    `VFileScanner` will try to append late arrival runtime filters in each loop 
of `ScannerScheduler::_scanner_scan`.  However, 
`VFileScanner::_get_next_reader` only generates the `_push_down_conjuncts` in 
the first loop, so the late arrival runtime filters are ignored.
---
 be/src/vec/exec/scan/vfile_scanner.cpp | 44 +++++++++++++++++++++-------------
 be/src/vec/exec/scan/vfile_scanner.h   |  2 ++
 2 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 35e1d3dff53..8eda9c1714b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -166,6 +166,8 @@ Status VFileScanner::prepare(
                 ADD_TIMER(_parent->_scanner_profile, 
"FileScannerConvertOuputBlockTime");
         _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, 
"EmptyFileNum", TUnit::UNIT);
         _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", 
TUnit::UNIT);
+        _has_fully_rf_file_counter =
+                ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", 
TUnit::UNIT);
     } else {
         _get_block_timer = ADD_TIMER(_local_state->scanner_profile(), 
"FileScannerGetBlockTime");
         _open_reader_timer =
@@ -182,6 +184,8 @@ Status VFileScanner::prepare(
         _empty_file_counter =
                 ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", 
TUnit::UNIT);
         _file_counter = ADD_COUNTER(_local_state->scanner_profile(), 
"FileNumber", TUnit::UNIT);
+        _has_fully_rf_file_counter =
+                ADD_COUNTER(_local_state->scanner_profile(), 
"HasFullyRfFileNumber", TUnit::UNIT);
     }
 
     _file_cache_statistics.reset(new io::FileCacheStatistics());
@@ -222,7 +226,9 @@ Status VFileScanner::prepare(
 }
 
 Status VFileScanner::_process_conjuncts_for_dict_filter() {
-    for (auto& conjunct : _conjuncts) {
+    _slot_id_to_filter_conjuncts.clear();
+    _not_single_slot_filter_conjuncts.clear();
+    for (auto& conjunct : _push_down_conjuncts) {
         auto impl = conjunct->root()->get_impl();
         // If impl is not null, which means this a conjuncts from runtime 
filter.
         auto cur_expr = impl ? impl : conjunct->root();
@@ -250,6 +256,22 @@ Status VFileScanner::_process_conjuncts_for_dict_filter() {
     return Status::OK();
 }
 
+Status VFileScanner::_process_late_arrival_conjuncts() {
+    if (_push_down_conjuncts.size() < _conjuncts.size()) {
+        _push_down_conjuncts.clear();
+        _push_down_conjuncts.resize(_conjuncts.size());
+        for (size_t i = 0; i != _conjuncts.size(); ++i) {
+            RETURN_IF_ERROR(_conjuncts[i]->clone(_state, 
_push_down_conjuncts[i]));
+        }
+        RETURN_IF_ERROR(_process_conjuncts_for_dict_filter());
+        _discard_conjuncts();
+    }
+    if (_applied_rf_num == _total_rf_num) {
+        COUNTER_UPDATE(_has_fully_rf_file_counter, 1);
+    }
+    return Status::OK();
+}
+
 void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
     for (auto& child_expr : expr->children()) {
         if (child_expr->is_slot_ref()) {
@@ -766,12 +788,8 @@ Status VFileScanner::_get_next_reader() {
                 SCOPED_TIMER(_open_reader_timer);
                 RETURN_IF_ERROR(parquet_reader->open());
             }
-            if (push_down_predicates && _push_down_conjuncts.empty() && 
!_conjuncts.empty()) {
-                _push_down_conjuncts.resize(_conjuncts.size());
-                for (size_t i = 0; i != _conjuncts.size(); ++i) {
-                    RETURN_IF_ERROR(_conjuncts[i]->clone(_state, 
_push_down_conjuncts[i]));
-                }
-                _discard_conjuncts();
+            if (push_down_predicates) {
+                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
             }
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == "iceberg") {
@@ -802,12 +820,8 @@ Status VFileScanner::_get_next_reader() {
             std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
                     _profile, _state, *_params, range, 
_state->query_options().batch_size,
                     _state->timezone(), _io_ctx.get(), 
_state->query_options().enable_orc_lazy_mat);
-            if (push_down_predicates && _push_down_conjuncts.empty() && 
!_conjuncts.empty()) {
-                _push_down_conjuncts.resize(_conjuncts.size());
-                for (size_t i = 0; i != _conjuncts.size(); ++i) {
-                    RETURN_IF_ERROR(_conjuncts[i]->clone(_state, 
_push_down_conjuncts[i]));
-                }
-                _discard_conjuncts();
+            if (push_down_predicates) {
+                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
             }
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == 
"transactional_hive") {
@@ -1080,10 +1094,6 @@ Status VFileScanner::_init_expr_ctxes() {
             }
         }
     }
-    // TODO: It should can move to scan node to process.
-    if (!_conjuncts.empty()) {
-        static_cast<void>(_process_conjuncts_for_dict_filter());
-    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index b7ceefe775c..4524abb1fdb 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -176,6 +176,7 @@ private:
     RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
     RuntimeProfile::Counter* _empty_file_counter = nullptr;
     RuntimeProfile::Counter* _file_counter = nullptr;
+    RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
 
     const std::unordered_map<std::string, int>* _col_name_to_slot_id;
     // single slot filter conjuncts
@@ -206,6 +207,7 @@ private:
     Status _generate_fill_columns();
     Status _handle_dynamic_block(Block* block);
     Status _process_conjuncts_for_dict_filter();
+    Status _process_late_arrival_conjuncts();
     void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
 
     void _reset_counter() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to