alamb commented on code in PR #21637:
URL: https://github.com/apache/datafusion/pull/21637#discussion_r3156285145
##########
benchmarks/src/bin/dfbench.rs:
##########
@@ -20,16 +20,13 @@ use datafusion::error::Result;
use clap::{Parser, Subcommand};
-#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
-compile_error!(
- "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the
same time"
-);
-
#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-#[cfg(feature = "mimalloc")]
+// `cargo clippy --all-features` enables both allocator features, so prefer
Review Comment:
this seems unrelated to the rest of this PR -- perhaps we can pull it into
its own PR for easier review and consideration
##########
datafusion/datasource-parquet/benches/parquet_fully_matched_filter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for skipping filter evaluation on fully matched row groups.
+//!
+//! This benchmark measures the performance improvement from skipping
+//! RowFilter evaluation when row group statistics prove that all rows
+//! in a row group satisfy the predicate.
+//!
+//! Dataset layout:
+//! - 20 row groups, each with 50_000 rows
+//! - Column `x`: i64, values in range [0, 100) for all row groups
+//! - Column `payload`: Utf8, 1 KB string (makes filter column decoding cost
visible)
+//!
+//! Predicate: `x < 200`
+//! - ALL row groups are fully matched (max(x) < 200 for every row group)
+//! - Without the optimization: RowFilter decodes `x` and evaluates predicate
for every row
+//! - With the optimization: RowFilter is skipped entirely (statistics prove
all rows match)
+//!
+//! Uses `ParquetPushDecoder` directly to exercise the exact code path
+//! that DataFusion's async opener uses.
+
+use std::path::PathBuf;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int64Array, RecordBatch, StringBuilder};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use bytes::Bytes;
+use criterion::{Criterion, Throughput, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
+use datafusion_expr::{Expr, col};
+use datafusion_physical_expr::planner::logical2physical;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use parquet::DecodeResult;
+use parquet::arrow::arrow_reader::ArrowReaderMetadata;
+use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+use parquet::file::properties::WriterProperties;
+use parquet::{arrow::ArrowWriter, file::metadata::ParquetMetaData};
+use tempfile::TempDir;
+
+const ROW_GROUP_SIZE: usize = 50_000;
+const NUM_ROW_GROUPS: usize = 20;
+const TOTAL_ROWS: usize = ROW_GROUP_SIZE * NUM_ROW_GROUPS;
+const PAYLOAD_LEN: usize = 1024;
+
+struct BenchmarkDataset {
Review Comment:
Rather than a targeted benchmark like this that will likely not get run all
that often, I recommend adding a new benchmark to the "clickbench_extended"
https://github.com/apache/datafusion/tree/main/benchmarks/queries/clickbench#extended-queries
I bet you could write a pretty good one with some substring match where this
optimization would help a lot.
I recommend making a separate PR to add such a query so we can show off this
PR's performance improvement
##########
benchmarks/src/bin/imdb.rs:
##########
@@ -21,16 +21,13 @@ use clap::{Parser, Subcommand};
use datafusion::error::Result;
use datafusion_benchmarks::imdb;
-#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
-compile_error!(
- "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the
same time"
-);
-
#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-#[cfg(feature = "mimalloc")]
+// `cargo clippy --all-features` enables both allocator features, so prefer
Review Comment:
likewise here
##########
datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt:
##########
@@ -104,7 +104,7 @@ Plan with Metrics
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as
name], metrics=[output_rows=10, <slt:ignore>]
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>,
selectivity=100% (10/10)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ],
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 >
800), required_guarantees=[], metrics=[output_rows=10,
elapsed_compute=<slt:ignore>, output_bytes=80.0 B,
files_ranges_pruned_statistics=1 total → 1 matched,
row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched,
row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1
total → 1 matched, limit_pruned_row_groups=0 total → 0 matched,
bytes_scanned=210, metadata_load_time=<slt:ignore>,
scan_efficiency_ratio=18.31% (210/1.15 K)]
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ],
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 >
800), required_guarantees=[], metrics=[output_rows=10,
elapsed_compute=<slt:ignore>, output_bytes=80.0 B,
files_ranges_pruned_statistics=1 total → 1 matched,
row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched,
row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0
total → 0 matched, limit_pruned_row_groups=0 total → 0 matched,
bytes_scanned=210, metadata_load_time=<slt:ignore>,
scan_efficiency_ratio=18.31% (210/1.15 K)]
Review Comment:
these metrics don't look quite correct -- it shows that there were no pages
with page index and non matched -- but really it should be all of them
matching, right?
I think the same issues exists for the other explain plans below too (pages
matching via "fully matching") do not appear in the total statistics anymore)
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -3773,11 +3773,11 @@ impl PartialOrd for Aggregate {
/// Returns 0 when no grouping set is duplicated.
fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) =
group_expr.first() {
- let mut counts: HashMap<&[Expr], usize> = HashMap::new();
- for set in sets {
- *counts.entry(set).or_insert(0) += 1;
- }
- counts.into_values().max().unwrap_or(0).saturating_sub(1)
+ sets.iter()
Review Comment:
this is a drive by cleanup (not related to the other changes in this PR,
right)?
##########
datafusion/sqllogictest/test_files/limit_pruning.slt:
##########
@@ -63,7 +63,55 @@ set datafusion.explain.analyze_level = summary;
query TT
explain analyze select * from tracking_data where species > 'M' AND s >= 50
limit 3;
----
-Plan with Metrics DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50,
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>,
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched,
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched,
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2
total → 2 matched, limit_pruned_row_groups=2 total → 0 matched,
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>,
scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
+Plan with Metrics DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50,
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>,
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched,
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched,
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=0
total → 0 matched, limit_pruned_row_groups=2 total → 0 matched,
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>,
scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
+
+statement ok
+CREATE TABLE fully_matched_limit_source AS VALUES
+ (1),
+ (2),
+ (3),
+ (4),
+ (5),
+ (6),
+ (7),
+ (1),
+ (2);
+
+query I
+COPY (SELECT column1 as a FROM fully_matched_limit_source)
+TO 'test_files/scratch/limit_pruning/fully_matched_limit.parquet'
+STORED AS PARQUET
+OPTIONS (
+ 'format.max_row_group_size' '3'
+);
+----
+9
+
+statement ok
+drop table fully_matched_limit_source;
+
+statement ok
+CREATE EXTERNAL TABLE fully_matched_limit
+STORED AS PARQUET
+LOCATION 'test_files/scratch/limit_pruning/fully_matched_limit.parquet';
+
+# One fully matched row group sits between two filtered row groups.
+# LIMIT must apply across the entire scan, not once per decoder run.
+query TT
+explain analyze select a from fully_matched_limit where a >= 3 limit 4;
+----
+Plan with Metrics DataSourceExec: <slt:ignore>metrics=[output_rows=4,
<slt:ignore>row_groups_pruned_statistics=3 total → 3 matched -> 1 fully
matched<slt:ignore>]
+
+query I
+select a from fully_matched_limit where a >= 3 limit 4;
Review Comment:
perhaps we should also add an ORDER BY to this query?
##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -74,9 +74,18 @@ impl RowGroupAccessPlanFilter {
self.access_plan.row_group_index_iter()
}
- /// Returns the inner access plan
- pub fn build(self) -> ParquetAccessPlan {
- self.access_plan
+ /// Returns the inner access plan and the indices of fully matched row
groups.
+ ///
+ /// Fully matched row groups are those where ALL rows are known to satisfy
+ /// the predicate based on row group statistics. The returned indices are
+ /// a subset of the row groups that will be scanned.
+ pub fn build(self) -> (ParquetAccessPlan, Vec<usize>) {
Review Comment:
This API feels a little awkward (to have to return both a plan and a
Vec<usize>) -- and then you have to pass the row group indices as well to
`prune_plan_with_page_index`
What do you think about just passing the `fully_matched` field to the
ParquetAccessPlan itself -- that way you could avoid a allocation here, and the
API would be more encapsulated 🤔
It would also make the `fully_matched_row_groups.contains(&row_group_index)
{` check above faster (index lookup rather than linear search)
##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -357,13 +366,38 @@ impl RowGroupAccessPlanFilter {
return;
};
+ // Collect unique column names referenced by the predicate so we can
Review Comment:
this looks like a separate fix (Null handling) -- can you possibly pull this
part into its own PR for easier / faster review?
##########
datafusion/datasource-parquet/src/page_filter.rs:
##########
@@ -197,6 +198,11 @@ impl PagePruningAccessPlanFilter {
// for each row group specified in the access plan
let row_group_indexes = access_plan.row_group_indexes();
for row_group_index in row_group_indexes {
+ // Skip page pruning for fully matched row groups: all rows are
+ // known to satisfy the predicate, so page-level pruning is wasted
work.
+ if fully_matched_row_groups.contains(&row_group_index) {
Review Comment:
💯
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1139,33 +1154,71 @@ impl RowGroupsPrunedParquetOpen {
reader_metadata.parquet_schema(),
);
- let mut decoder_builder =
- ParquetPushDecoderBuilder::new_with_metadata(reader_metadata)
- .with_projection(read_plan.projection_mask)
+ // Split into consecutive runs of row groups that share the same filter
+ // requirement. Fully matched row groups skip the RowFilter; others
need it.
+ // Reverse the run order for reverse scans so the combined decoder
stream
+ // preserves the requested global row group order.
+ let mut runs = if has_row_filter &&
!fully_matched_row_groups.is_empty() {
+ split_decoder_runs(access_plan, &fully_matched_row_groups)
Review Comment:
If fully_matched_row_groups were a field on the access_plan I think this
logic would be significantly easier to read (at least for me as a human 🤖 👴 )
Then you could encapsulate something like
```rust
let runs = access_plan.split_runs();
```
better yet would be to then wrap the Run in some structure and maybe have a
`plan_run` or method or function 🤔
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1120,17 +1134,18 @@ impl RowGroupsPrunedParquetOpen {
reader_metadata.parquet_schema(),
file_metadata.as_ref(),
&prepared.file_metrics,
+ &fully_matched_row_groups,
);
}
- // Prepare the access plan (extract row groups and row selection)
- let mut prepared_plan = access_plan.prepare(rg_metadata)?;
-
- // Potentially reverse the access plan for performance.
- // See `ParquetSource::try_pushdown_sort` for the rationale.
- if prepared.reverse_row_groups {
- prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
- }
+ // Prepare access plans (extract row groups and row selection)
+ let prepare = |plan: ParquetAccessPlan| -> Result<PreparedAccessPlan> {
+ let mut pp = plan.prepare(rg_metadata)?;
Review Comment:
I found the "prepared" vs "pp" hard to understand here 🤔
##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -357,13 +366,38 @@ impl RowGroupAccessPlanFilter {
return;
};
+ // Collect unique column names referenced by the predicate so we can
+ // check for NULLs. Rows with NULL predicate columns evaluate to NULL
+ // (not true), so a row group with NULLs cannot be "fully matched."
+ let predicate_columns =
+
datafusion_physical_expr::utils::collect_columns(predicate.orig_expr());
+
+ let null_count_converters: Vec<StatisticsConverter> = predicate_columns
+ .iter()
+ .filter_map(|col| {
+ StatisticsConverter::try_new(col.name(), arrow_schema,
parquet_schema)
Review Comment:
Converting statistics again for the column takes substantial time (esp for
things like string columns) -- and we should already have converted statistics
once when we create the pruning predicate. Is there any way to reuse those
statistics (or cache the row group null counts?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]