This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch tpc_preview4-external
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview4-external by this
push:
new 7f2512b0e3a [Enhancement](parquet)update runtime filter when read next
parquet row group.(#59053) (#59181)
7f2512b0e3a is described below
commit 7f2512b0e3a0f76047e13b412cc7cea62951fc0d
Author: daidai <[email protected]>
AuthorDate: Fri Dec 19 12:01:42 2025 +0800
[Enhancement](parquet)update runtime filter when read next parquet row
group.(#59053) (#59181)
bp #59053
---
.../runtime_filter_consumer_helper.h | 2 +
.../exec/format/parquet/vparquet_group_reader.h | 7 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 38 ++++-
be/src/vec/exec/format/parquet/vparquet_reader.h | 16 ++
be/src/vec/exec/scan/file_scanner.cpp | 24 ++-
be/src/vec/exec/scan/file_scanner.h | 3 +-
be/src/vec/exec/scan/scanner.cpp | 1 +
.../scripts/create_preinstalled_scripts/run84.hql | 20 +++
.../runtime_filter_dim_small/dim_small.parquet | Bin 0 -> 4230 bytes
.../runtime_filter_fact_big/fact_big.parquet | Bin 0 -> 129338 bytes
.../hive/test_parquet_join_runtime_filter.groovy | 174 +++++++++++++++++++++
11 files changed, 272 insertions(+), 13 deletions(-)
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index 212df4338cb..36da3cd10c0 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -52,6 +52,8 @@ public:
// parent_operator_profile is owned by LocalState so update it is safe at
here.
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+ size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }
+
private:
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(RuntimeState* state,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 265a95f4470..f81d6607349 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -79,7 +79,14 @@ public:
// table name
struct LazyReadContext {
+ // all conjuncts: in sql, join runtime filter, topn runtime filter.
VExprContextSPtrs conjuncts;
+
+ // ParquetReader::set_fill_columns(xxx, xxx) will set these two members
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ fill_partition_columns;
+ std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;
+
bool can_lazy_read = false;
// block->rows() returns the number of rows of the first column,
// so we should check and resize the first column
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index fb30e5d4a61..45cf3e2c5ed 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -383,11 +383,17 @@ bool ParquetReader::_type_matches(const VSlotRef*
slot_ref) const {
!is_complex_type(table_col_type->get_primitive_type());
}
-Status ParquetReader::set_fill_columns(
- const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
- partition_columns,
- const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs&
new_conjuncts) {
+ RowGroupReader::LazyReadContext new_lazy_read_ctx;
+ new_lazy_read_ctx.conjuncts = new_conjuncts;
+ new_lazy_read_ctx.fill_partition_columns =
std::move(_lazy_read_ctx.fill_partition_columns);
+ new_lazy_read_ctx.fill_missing_columns =
std::move(_lazy_read_ctx.fill_missing_columns);
+ _lazy_read_ctx = std::move(new_lazy_read_ctx);
+
+ _top_runtime_vexprs.clear();
+ _push_down_predicates.clear();
+ _useless_predicates.clear();
+
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>>
predicate_columns;
// visit_slot for lazy mat.
@@ -494,7 +500,7 @@ Status ParquetReader::set_fill_columns(
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
}
- for (auto& kv : partition_columns) {
+ for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
auto iter = predicate_columns.find(kv.first);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -504,7 +510,7 @@ Status ParquetReader::set_fill_columns(
}
}
- for (auto& kv : missing_columns) {
+ for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
auto iter = predicate_columns.find(kv.first);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -536,6 +542,17 @@ Status ParquetReader::set_fill_columns(
}
}
+ return Status::OK();
+}
+
+Status ParquetReader::set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
+ _lazy_read_ctx.fill_partition_columns = partition_columns;
+ _lazy_read_ctx.fill_missing_columns = missing_columns;
+ RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));
+
if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 ||
_range_size < 0)) {
return Status::EndOfFile("No row group to read");
}
@@ -673,6 +690,13 @@ Status ParquetReader::_next_row_group_reader() {
continue;
}
+ bool has_late_rf_cond = false;
+ VExprContextSPtrs new_push_down_conjuncts;
+ RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond,
new_push_down_conjuncts));
+ if (has_late_rf_cond) {
+ RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
+ }
+
size_t before_predicate_size = _push_down_predicates.size();
_push_down_predicates.reserve(before_predicate_size +
_top_runtime_vexprs.size());
for (const auto& vexpr : _top_runtime_vexprs) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c3c73d98bea..e2ba5d82a70 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -160,6 +160,10 @@ public:
bool count_read_rows() override { return true; }
+ void set_update_late_rf_func(std::function<Status(bool*,
VExprContextSPtrs&)>&& func) {
+ _call_late_rf_func = std::move(func);
+ }
+
protected:
void _collect_profile_before_close() override;
@@ -252,6 +256,9 @@ private:
bool _exists_in_file(const VSlotRef* slot) const override;
bool _type_matches(const VSlotRef*) const override;
+ // update lazy read context when runtime filter changed
+ Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);
+
RuntimeProfile* _profile = nullptr;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
@@ -337,6 +344,15 @@ private:
std::vector<std::unique_ptr<MutilColumnBlockPredicate>>
_push_down_predicates;
std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
Arena _arena;
+
+ // when creating a new row group reader, call this function to get the
latest runtime filter conjuncts.
+ // The default implementation does nothing, sets 'changed' to false, and
returns OK.
+ // This is used when iceberg read position delete file ...
+ static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
+ *changed = false;
+ return Status::OK();
+ }
+ std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func =
default_late_rf_func;
};
#include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index c7d10c89dc0..8629737a320 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -357,8 +357,11 @@ Status FileScanner::_process_conjuncts() {
return Status::OK();
}
-Status FileScanner::_process_late_arrival_conjuncts() {
+Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
+ VExprContextSPtrs&
new_push_down_conjuncts) {
+ *changed = false;
if (_push_down_conjuncts.size() < _conjuncts.size()) {
+ *changed = true;
_push_down_conjuncts.clear();
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
@@ -366,6 +369,7 @@ Status FileScanner::_process_late_arrival_conjuncts() {
}
RETURN_IF_ERROR(_process_conjuncts());
_discard_conjuncts();
+ new_push_down_conjuncts = _push_down_conjuncts;
}
if (_applied_rf_num == _total_rf_num) {
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters",
"True");
@@ -1045,9 +1049,17 @@ Status FileScanner::_get_next_reader() {
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
- if (push_down_predicates) {
- RETURN_IF_ERROR(_process_late_arrival_conjuncts());
- }
+
+ std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
+ [&](bool* changed, VExprContextSPtrs&
new_push_down_conjuncts) -> Status {
+ if (!_is_load) {
+ RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
+ RETURN_IF_ERROR(
+ _process_late_arrival_conjuncts(changed,
new_push_down_conjuncts));
+ }
+ return Status::OK();
+ };
+ parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader),
file_meta_cache_ptr));
need_to_get_parsed_schema = true;
@@ -1068,7 +1080,9 @@ Status FileScanner::_get_next_reader() {
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
- RETURN_IF_ERROR(_process_late_arrival_conjuncts());
+ bool changed = false;
+ VExprContextSPtrs new_push_down_conjuncts;
+ RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed,
new_push_down_conjuncts));
}
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader),
file_meta_cache_ptr));
diff --git a/be/src/vec/exec/scan/file_scanner.h
b/be/src/vec/exec/scan/file_scanner.h
index d26186eeef6..1cbe9c1bbcf 100644
--- a/be/src/vec/exec/scan/file_scanner.h
+++ b/be/src/vec/exec/scan/file_scanner.h
@@ -251,7 +251,8 @@ private:
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts();
- Status _process_late_arrival_conjuncts();
+ Status _process_late_arrival_conjuncts(bool* changed,
+ VExprContextSPtrs&
new_push_down_conjuncts);
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index 5dced63feb6..2857738297f 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state,
pipeline::ScanLocalStateBase* local_state,
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
_has_prepared(false) {
+ _total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
DorisMetrics::instance()->scanner_cnt->increment(1);
}
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql
new file mode 100644
index 00000000000..4b4e7b6e549
--- /dev/null
+++
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql
@@ -0,0 +1,20 @@
+use `default`;
+
+create table fact_big (
+ k INT,
+ c1 INT,
+ c2 BIGINT,
+ c3 DOUBLE,
+ c4 STRING
+)stored as parquet
+LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';
+
+create table dim_small (
+ k INT,
+ c1 INT,
+ c2 BIGINT
+)stored as parquet
+LOCATION
'/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';
+
+msck repair table fact_big;
+msck repair table dim_small;
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet
new file mode 100644
index 00000000000..e998f3c817a
Binary files /dev/null and
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet
differ
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet
new file mode 100644
index 00000000000..b3ad736022e
Binary files /dev/null and
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet
differ
diff --git
a/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy
b/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy
new file mode 100644
index 00000000000..8c0b1516459
--- /dev/null
+++
b/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+suite("test_parquet_join_runtime_filter",
"p0,external,hive,external_docker,external_docker_hive") {
+
+ def getProfileList = {
+ def dst = 'http://' + context.config.feHttpAddress
+ def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+ }
+
+ def getProfile = { id ->
+ def dst = 'http://' + context.config.feHttpAddress
+ def conn = new URL(dst +
"/api/profile/text/?query_id=$id").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+ }
+
+
+ def extractFilteredGroupsValue = { String profileText ->
+ def values = (profileText =~ /FilteredGroups:\s*(\d+)/).collect {
it[1].toLong() }
+ return values.sort { a, b -> b <=> a }
+ }
+
+ def getProfileWithToken = { token ->
+ String profileId = ""
+ int attempts = 0
+ while (attempts < 10 && (profileId == null || profileId == "")) {
+ List profileData = new
JsonSlurper().parseText(getProfileList()).data.rows
+ for (def profileItem in profileData) {
+ if (profileItem["Sql Statement"].toString().contains(token)) {
+ profileId = profileItem["Profile ID"].toString()
+ break
+ }
+ }
+ if (profileId == null || profileId == "") {
+ Thread.sleep(300)
+ }
+ attempts++
+ }
+ assertTrue(profileId != null && profileId != "")
+ Thread.sleep(800)
+ return getProfile(profileId).toString()
+ }
+ // session vars
+ sql "unset variable all;"
+ sql "set profile_level=2;"
+ sql "set enable_profile=true;"
+
+
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (!"true".equalsIgnoreCase(enabled)) {
+ return;
+ }
+ for (String hivePrefix : ["hive2"]) {
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String hmsPort = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String catalog_name = "test_parquet_join_runtime_filter"
+
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ 'type'='hms',
+ 'hadoop.username' = 'hadoop',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}'
+ );
+ """
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+
+ sql """ use `default` """
+
+
+ for (int wait_time : [0, 10, 100]) {
+ sql """ set runtime_filter_wait_time_ms = ${wait_time}; """
+
+ def f1 = {
+ def t1 = UUID.randomUUID().toString()
+ def sql_result = sql """
+ select *, "${t1}" from fact_big as a join dim_small as b
on a.k = b.k where b.c1 = 5
+ """
+ def filter_result =
extractFilteredGroupsValue(getProfileWithToken(t1));
+ logger.info("sql_result = ${sql_result}");
+ logger.info("filter_result = ${filter_result}");
+
+ assertTrue(filter_result.size() == 2)
+ assertTrue(filter_result[0] > 40)
+ }
+
+
+
+ def f2 = {
+ def t1 = UUID.randomUUID().toString()
+ def sql_result = sql """
+ select *, "${t1}" from fact_big as a join dim_small as b
on a.k = b.k where b.c1 in (1,2)
+ """
+ def filter_result =
extractFilteredGroupsValue(getProfileWithToken(t1));
+ logger.info("sql_result = ${sql_result}");
+ logger.info("filter_result = ${filter_result}");
+
+ assertTrue(filter_result.size() == 2)
+ assertTrue(filter_result[0] > 30)
+ }
+
+
+
+
+ def f3 = {
+ def t1 = UUID.randomUUID().toString()
+ def sql_result = sql """
+ select *, "${t1}" from fact_big as a join dim_small as b
on a.k = b.k where b.c1 < 3
+ """
+ def filter_result =
extractFilteredGroupsValue(getProfileWithToken(t1));
+ logger.info("sql_result = ${sql_result}");
+ logger.info("filter_result = ${filter_result}");
+
+ assertTrue(filter_result.size() == 2)
+ assertTrue(filter_result[0] > 30)
+ }
+
+
+
+ def f4 = {
+ def t1 = UUID.randomUUID().toString()
+ def sql_result = sql """
+ select *, "${t1}" from fact_big as a join dim_small as b
on a.k = b.k where b.c2 >= 50
+ """
+ def filter_result =
extractFilteredGroupsValue(getProfileWithToken(t1));
+ logger.info("sql_result = ${sql_result}");
+ logger.info("filter_result = ${filter_result}");
+
+ assertTrue(filter_result.size() == 2)
+ assertTrue(filter_result[0] > 40)
+ }
+
+
+ f1()
+ f2()
+ f3()
+ f4()
+ }
+
+ sql """drop catalog ${catalog_name};"""
+ }
+
+
+
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]