This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpc_preview3-external
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f23247f2a1ebb409a44567d90f258acc327f508e
Author: daidai <[email protected]>
AuthorDate: Fri Dec 19 12:01:42 2025 +0800

    [Enhancement](parquet)update runtime filter when read next parquet row 
group.(#59053) (#59181)
    
    bp #59053
---
 .../runtime_filter_consumer_helper.h               |   2 +
 .../exec/format/parquet/vparquet_group_reader.h    |   7 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  38 ++++-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  16 ++
 be/src/vec/exec/scan/file_scanner.cpp              |  24 ++-
 be/src/vec/exec/scan/file_scanner.h                |   3 +-
 be/src/vec/exec/scan/scanner.cpp                   |   1 +
 .../scripts/create_preinstalled_scripts/run84.hql  |  20 +++
 .../runtime_filter_dim_small/dim_small.parquet     | Bin 0 -> 4230 bytes
 .../runtime_filter_fact_big/fact_big.parquet       | Bin 0 -> 129338 bytes
 .../hive/test_parquet_join_runtime_filter.groovy   | 174 +++++++++++++++++++++
 11 files changed, 272 insertions(+), 13 deletions(-)

diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h 
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index 212df4338cb..36da3cd10c0 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -52,6 +52,8 @@ public:
     // parent_operator_profile is owned by LocalState so update it is safe at 
here.
     void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
 
+    size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }
+
 private:
     // Append late-arrival runtime filters to the vconjunct_ctx.
     Status _append_rf_into_conjuncts(RuntimeState* state,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 265a95f4470..f81d6607349 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -79,7 +79,14 @@ public:
 
     // table name
     struct LazyReadContext {
+        // all conjuncts: in sql, join runtime filter, topn runtime filter.
         VExprContextSPtrs conjuncts;
+
+        // ParquetReader::set_fill_columns(xxx, xxx) will set these two members
+        std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
+                fill_partition_columns;
+        std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;
+
         bool can_lazy_read = false;
         // block->rows() returns the number of rows of the first column,
         // so we should check and resize the first column
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index fb30e5d4a61..45cf3e2c5ed 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -383,11 +383,17 @@ bool ParquetReader::_type_matches(const VSlotRef* 
slot_ref) const {
            !is_complex_type(table_col_type->get_primitive_type());
 }
 
-Status ParquetReader::set_fill_columns(
-        const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
-                partition_columns,
-        const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
-    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& 
new_conjuncts) {
+    RowGroupReader::LazyReadContext new_lazy_read_ctx;
+    new_lazy_read_ctx.conjuncts = new_conjuncts;
+    new_lazy_read_ctx.fill_partition_columns = 
std::move(_lazy_read_ctx.fill_partition_columns);
+    new_lazy_read_ctx.fill_missing_columns = 
std::move(_lazy_read_ctx.fill_missing_columns);
+    _lazy_read_ctx = std::move(new_lazy_read_ctx);
+
+    _top_runtime_vexprs.clear();
+    _push_down_predicates.clear();
+    _useless_predicates.clear();
+
     // std::unordered_map<column_name, std::pair<col_id, slot_id>>
     std::unordered_map<std::string, std::pair<uint32_t, int>> 
predicate_columns;
     // visit_slot for lazy mat.
@@ -494,7 +500,7 @@ Status ParquetReader::set_fill_columns(
         
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
     }
 
-    for (auto& kv : partition_columns) {
+    for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
         auto iter = predicate_columns.find(kv.first);
         if (iter == predicate_columns.end()) {
             _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -504,7 +510,7 @@ Status ParquetReader::set_fill_columns(
         }
     }
 
-    for (auto& kv : missing_columns) {
+    for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
         auto iter = predicate_columns.find(kv.first);
         if (iter == predicate_columns.end()) {
             _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -536,6 +542,17 @@ Status ParquetReader::set_fill_columns(
         }
     }
 
+    return Status::OK();
+}
+
+Status ParquetReader::set_fill_columns(
+        const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
+                partition_columns,
+        const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
+    _lazy_read_ctx.fill_partition_columns = partition_columns;
+    _lazy_read_ctx.fill_missing_columns = missing_columns;
+    RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));
+
     if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || 
_range_size < 0)) {
         return Status::EndOfFile("No row group to read");
     }
@@ -673,6 +690,13 @@ Status ParquetReader::_next_row_group_reader() {
             continue;
         }
 
+        bool has_late_rf_cond = false;
+        VExprContextSPtrs new_push_down_conjuncts;
+        RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, 
new_push_down_conjuncts));
+        if (has_late_rf_cond) {
+            RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
+        }
+
         size_t before_predicate_size = _push_down_predicates.size();
         _push_down_predicates.reserve(before_predicate_size + 
_top_runtime_vexprs.size());
         for (const auto& vexpr : _top_runtime_vexprs) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c3c73d98bea..e2ba5d82a70 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -160,6 +160,10 @@ public:
 
     bool count_read_rows() override { return true; }
 
+    void set_update_late_rf_func(std::function<Status(bool*, 
VExprContextSPtrs&)>&& func) {
+        _call_late_rf_func = std::move(func);
+    }
+
 protected:
     void _collect_profile_before_close() override;
 
@@ -252,6 +256,9 @@ private:
     bool _exists_in_file(const VSlotRef* slot) const override;
     bool _type_matches(const VSlotRef*) const override;
 
+    // update lazy read context when runtime filter changed
+    Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);
+
     RuntimeProfile* _profile = nullptr;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
@@ -337,6 +344,15 @@ private:
     std::vector<std::unique_ptr<MutilColumnBlockPredicate>> 
_push_down_predicates;
     std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
     Arena _arena;
+
+    // when creating a new row group reader, call this function to get the 
latest runtime filter conjuncts.
+    // The default implementation does nothing, sets 'changed' to false, and 
returns OK.
+    // This is used when iceberg read position delete file ...
+    static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
+        *changed = false;
+        return Status::OK();
+    }
+    std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = 
default_late_rf_func;
 };
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index c7d10c89dc0..8629737a320 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -357,8 +357,11 @@ Status FileScanner::_process_conjuncts() {
     return Status::OK();
 }
 
-Status FileScanner::_process_late_arrival_conjuncts() {
+Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
+                                                    VExprContextSPtrs& 
new_push_down_conjuncts) {
+    *changed = false;
     if (_push_down_conjuncts.size() < _conjuncts.size()) {
+        *changed = true;
         _push_down_conjuncts.clear();
         _push_down_conjuncts.resize(_conjuncts.size());
         for (size_t i = 0; i != _conjuncts.size(); ++i) {
@@ -366,6 +369,7 @@ Status FileScanner::_process_late_arrival_conjuncts() {
         }
         RETURN_IF_ERROR(_process_conjuncts());
         _discard_conjuncts();
+        new_push_down_conjuncts = _push_down_conjuncts;
     }
     if (_applied_rf_num == _total_rf_num) {
         
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", 
"True");
@@ -1045,9 +1049,17 @@ Status FileScanner::_get_next_reader() {
             // ATTN: the push down agg type may be set back to NONE,
             // see IcebergTableReader::init_row_filters for example.
             parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
-            if (push_down_predicates) {
-                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
-            }
+
+            std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
+                    [&](bool* changed, VExprContextSPtrs& 
new_push_down_conjuncts) -> Status {
+                if (!_is_load) {
+                    RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
+                    RETURN_IF_ERROR(
+                            _process_late_arrival_conjuncts(changed, 
new_push_down_conjuncts));
+                }
+                return Status::OK();
+            };
+            parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
             RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), 
file_meta_cache_ptr));
 
             need_to_get_parsed_schema = true;
@@ -1068,7 +1080,9 @@ Status FileScanner::_get_next_reader() {
 
             orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
             if (push_down_predicates) {
-                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
+                bool changed = false;
+                VExprContextSPtrs new_push_down_conjuncts;
+                RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, 
new_push_down_conjuncts));
             }
             RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), 
file_meta_cache_ptr));
 
diff --git a/be/src/vec/exec/scan/file_scanner.h 
b/be/src/vec/exec/scan/file_scanner.h
index d26186eeef6..1cbe9c1bbcf 100644
--- a/be/src/vec/exec/scan/file_scanner.h
+++ b/be/src/vec/exec/scan/file_scanner.h
@@ -251,7 +251,8 @@ private:
     void _init_runtime_filter_partition_prune_block();
     Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
     Status _process_conjuncts();
-    Status _process_late_arrival_conjuncts();
+    Status _process_late_arrival_conjuncts(bool* changed,
+                                           VExprContextSPtrs& 
new_push_down_conjuncts);
     void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
     Status _generate_truncate_columns(bool need_to_get_parsed_schema);
     Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index 5dced63feb6..2857738297f 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, 
pipeline::ScanLocalStateBase* local_state,
           _output_tuple_desc(_local_state->output_tuple_desc()),
           
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
           _has_prepared(false) {
+    _total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
     DorisMetrics::instance()->scanner_cnt->increment(1);
 }
 
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql
 
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql
new file mode 100644
index 00000000000..4b4e7b6e549
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql
@@ -0,0 +1,20 @@
+use `default`;
+
+create table fact_big (
+  k        INT,
+  c1       INT,
+  c2       BIGINT,
+  c3       DOUBLE,
+  c4       STRING
+)stored as parquet
+LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';
+
+create table dim_small (
+  k        INT,
+  c1       INT,
+  c2       BIGINT
+)stored as parquet
+LOCATION 
'/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';
+
+msck repair table fact_big;
+msck repair table dim_small;
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet
 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet
new file mode 100644
index 00000000000..e998f3c817a
Binary files /dev/null and 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet
 differ
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet
 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet
new file mode 100644
index 00000000000..b3ad736022e
Binary files /dev/null and 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet
 differ
diff --git 
a/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy
 
b/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy
new file mode 100644
index 00000000000..8c0b1516459
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+suite("test_parquet_join_runtime_filter", 
"p0,external,hive,external_docker,external_docker_hive") {
+
+    def getProfileList = {
+        def dst = 'http://' + context.config.feHttpAddress
+        def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+        conn.setRequestMethod("GET")
+        def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+                (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+        conn.setRequestProperty("Authorization", "Basic ${encoding}")
+        return conn.getInputStream().getText()
+    }
+
+    def getProfile = { id ->
+        def dst = 'http://' + context.config.feHttpAddress
+        def conn = new URL(dst + 
"/api/profile/text/?query_id=$id").openConnection()
+        conn.setRequestMethod("GET")
+        def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+                (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+        conn.setRequestProperty("Authorization", "Basic ${encoding}")
+        return conn.getInputStream().getText()
+    }
+
+
+    def extractFilteredGroupsValue = { String profileText ->
+        def values = (profileText =~ /FilteredGroups:\s*(\d+)/).collect { 
it[1].toLong() }
+        return values.sort { a, b -> b <=> a }
+    }
+
+    def getProfileWithToken = { token ->
+        String profileId = ""
+        int attempts = 0
+        while (attempts < 10 && (profileId == null || profileId == "")) {
+            List profileData = new 
JsonSlurper().parseText(getProfileList()).data.rows
+            for (def profileItem in profileData) {
+                if (profileItem["Sql Statement"].toString().contains(token)) {
+                    profileId = profileItem["Profile ID"].toString()
+                    break
+                }
+            }
+            if (profileId == null || profileId == "") {
+                Thread.sleep(300)
+            }
+            attempts++
+        }
+        assertTrue(profileId != null && profileId != "")
+        Thread.sleep(800)
+        return getProfile(profileId).toString()
+    }
+    // session vars
+    sql "unset variable all;"
+    sql "set profile_level=2;"
+    sql "set enable_profile=true;"
+
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (!"true".equalsIgnoreCase(enabled)) {
+        return;
+    }
+    for (String hivePrefix : ["hive2"]) {
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String hmsPort = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        String catalog_name = "test_parquet_join_runtime_filter"
+
+        sql """drop catalog if exists ${catalog_name};"""
+        sql """
+            create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hadoop.username' = 'hadoop',
+                'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}'
+            );
+        """
+        logger.info("catalog " + catalog_name + " created")
+        sql """switch ${catalog_name};"""
+        logger.info("switched to catalog " + catalog_name)
+        
+        sql """ use `default` """
+
+
+        for (int wait_time : [0, 10, 100]) {
+            sql """ set runtime_filter_wait_time_ms = ${wait_time}; """ 
+
+            def f1 = {
+                def t1 = UUID.randomUUID().toString()
+                def sql_result = sql """
+                    select *, "${t1}" from fact_big as a  join dim_small as b 
on a.k = b.k  where b.c1 = 5
+                """
+                def filter_result = 
extractFilteredGroupsValue(getProfileWithToken(t1));
+                logger.info("sql_result = ${sql_result}");
+                logger.info("filter_result = ${filter_result}");
+
+                assertTrue(filter_result.size() == 2)
+                assertTrue(filter_result[0] > 40)
+            }
+
+
+
+            def f2 = {
+                def t1 = UUID.randomUUID().toString()
+                def sql_result = sql """
+                    select *, "${t1}" from fact_big as a  join dim_small as b 
on a.k = b.k  where b.c1 in (1,2)
+                """
+                def filter_result = 
extractFilteredGroupsValue(getProfileWithToken(t1));
+                logger.info("sql_result = ${sql_result}");
+                logger.info("filter_result = ${filter_result}");
+
+                assertTrue(filter_result.size() == 2)
+                assertTrue(filter_result[0] > 30)
+            }
+
+
+
+
+            def f3 = {
+                def t1 = UUID.randomUUID().toString()
+                def sql_result = sql """
+                    select *, "${t1}" from fact_big as a  join dim_small as b 
on a.k = b.k  where b.c1 < 3  
+                """
+                def filter_result = 
extractFilteredGroupsValue(getProfileWithToken(t1));
+                logger.info("sql_result = ${sql_result}");
+                logger.info("filter_result = ${filter_result}");
+
+                assertTrue(filter_result.size() == 2)
+                assertTrue(filter_result[0] > 30)
+            }
+
+
+
+            def f4 = {
+                def t1 = UUID.randomUUID().toString()
+                def sql_result = sql """
+                    select *, "${t1}" from fact_big as a  join dim_small as b 
on a.k = b.k  where b.c2 >= 50   
+                """
+                def filter_result = 
extractFilteredGroupsValue(getProfileWithToken(t1));
+                logger.info("sql_result = ${sql_result}");
+                logger.info("filter_result = ${filter_result}");
+
+                assertTrue(filter_result.size() == 2)
+                assertTrue(filter_result[0] > 40)
+            }
+
+
+            f1()
+            f2()
+            f3()
+            f4()
+        }     
+
+        sql """drop catalog ${catalog_name};"""
+    }
+
+
+
+
+  
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to