alamb commented on code in PR #21637:
URL: https://github.com/apache/datafusion/pull/21637#discussion_r3156285145


##########
benchmarks/src/bin/dfbench.rs:
##########
@@ -20,16 +20,13 @@ use datafusion::error::Result;
 
 use clap::{Parser, Subcommand};
 
-#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
-compile_error!(
-    "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the 
same time"
-);
-
 #[cfg(feature = "snmalloc")]
 #[global_allocator]
 static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
-#[cfg(feature = "mimalloc")]
+// `cargo clippy --all-features` enables both allocator features, so prefer

Review Comment:
   this seems unrelated to the rest of this PR -- perhaps we can pull it into 
its own PR for easier review and consideration



##########
datafusion/datasource-parquet/benches/parquet_fully_matched_filter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for skipping filter evaluation on fully matched row groups.
+//!
+//! This benchmark measures the performance improvement from skipping
+//! RowFilter evaluation when row group statistics prove that all rows
+//! in a row group satisfy the predicate.
+//!
+//! Dataset layout:
+//! - 20 row groups, each with 50_000 rows
+//! - Column `x`: i64, values in range [0, 100) for all row groups
+//! - Column `payload`: Utf8, 1 KB string (makes filter column decoding cost 
visible)
+//!
+//! Predicate: `x < 200`
+//! - ALL row groups are fully matched (max(x) < 200 for every row group)
+//! - Without the optimization: RowFilter decodes `x` and evaluates predicate 
for every row
+//! - With the optimization: RowFilter is skipped entirely (statistics prove 
all rows match)
+//!
+//! Uses `ParquetPushDecoder` directly to exercise the exact code path
+//! that DataFusion's async opener uses.
+
+use std::path::PathBuf;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int64Array, RecordBatch, StringBuilder};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use bytes::Bytes;
+use criterion::{Criterion, Throughput, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
+use datafusion_expr::{Expr, col};
+use datafusion_physical_expr::planner::logical2physical;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use parquet::DecodeResult;
+use parquet::arrow::arrow_reader::ArrowReaderMetadata;
+use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+use parquet::file::properties::WriterProperties;
+use parquet::{arrow::ArrowWriter, file::metadata::ParquetMetaData};
+use tempfile::TempDir;
+
+const ROW_GROUP_SIZE: usize = 50_000;
+const NUM_ROW_GROUPS: usize = 20;
+const TOTAL_ROWS: usize = ROW_GROUP_SIZE * NUM_ROW_GROUPS;
+const PAYLOAD_LEN: usize = 1024;
+
+struct BenchmarkDataset {

Review Comment:
   Rather than a targeted benchmark like this that will likely not get run all 
that often, I recommend adding a new benchmark to the "clickbench_extended" 
   
   
https://github.com/apache/datafusion/tree/main/benchmarks/queries/clickbench#extended-queries
   
   I bet you could write a pretty good one with some substring match where this 
optimization would help a lot.
   
   I recommend making a separate PR to add such a query so we can show off this 
PR's performance improvement
   
   



##########
benchmarks/src/bin/imdb.rs:
##########
@@ -21,16 +21,13 @@ use clap::{Parser, Subcommand};
 use datafusion::error::Result;
 use datafusion_benchmarks::imdb;
 
-#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
-compile_error!(
-    "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the 
same time"
-);
-
 #[cfg(feature = "snmalloc")]
 #[global_allocator]
 static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
-#[cfg(feature = "mimalloc")]
+// `cargo clippy --all-features` enables both allocator features, so prefer

Review Comment:
   likewise here



##########
datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt:
##########
@@ -104,7 +104,7 @@ Plan with Metrics
 03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as 
name], metrics=[output_rows=10, <slt:ignore>]
 04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, 
selectivity=100% (10/10)]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
 projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND 
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], 
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND 
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 
800), required_guarantees=[], metrics=[output_rows=10, 
elapsed_compute=<slt:ignore>, output_bytes=80.0 B, 
files_ranges_pruned_statistics=1 total → 1 matched, 
row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, 
row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 
total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, 
bytes_scanned=210, metadata_load_time=<slt:ignore>, 
scan_efficiency_ratio=18.31% (210/1.15 K)]
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
 projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND 
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], 
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND 
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 
800), required_guarantees=[], metrics=[output_rows=10, 
elapsed_compute=<slt:ignore>, output_bytes=80.0 B, 
files_ranges_pruned_statistics=1 total → 1 matched, 
row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, 
row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 
total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, 
bytes_scanned=210, metadata_load_time=<slt:ignore>, 
scan_efficiency_ratio=18.31% (210/1.15 K)]

Review Comment:
   these metrics don't look quite correct -- it shows that there were no pages 
with page index and non matched -- but really it should be all of them 
matching, right?
   
   I think the same issues exists for the other explain plans below too (pages 
matching via "fully matching") do not appear in the total statistics anymore)



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -3773,11 +3773,11 @@ impl PartialOrd for Aggregate {
 /// Returns 0 when no grouping set is duplicated.
 fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
     if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = 
group_expr.first() {
-        let mut counts: HashMap<&[Expr], usize> = HashMap::new();
-        for set in sets {
-            *counts.entry(set).or_insert(0) += 1;
-        }
-        counts.into_values().max().unwrap_or(0).saturating_sub(1)
+        sets.iter()

Review Comment:
   this is a drive by cleanup (not related to the other changes in this PR, 
right)?



##########
datafusion/sqllogictest/test_files/limit_pruning.slt:
##########
@@ -63,7 +63,55 @@ set datafusion.explain.analyze_level = summary;
 query TT
 explain analyze select * from tracking_data where species > 'M' AND s >= 50 
limit 3;
 ----
-Plan with Metrics DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M 
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND 
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, 
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, 
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, 
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 
total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, 
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, 
scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
+Plan with Metrics DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M 
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND 
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, 
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, 
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, 
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=0 
total → 0 matched, limit_pruned_row_groups=2 total → 0 matched, 
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, 
scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
+
+statement ok
+CREATE TABLE fully_matched_limit_source AS VALUES
+  (1),
+  (2),
+  (3),
+  (4),
+  (5),
+  (6),
+  (7),
+  (1),
+  (2);
+
+query I
+COPY (SELECT column1 as a FROM fully_matched_limit_source)
+TO 'test_files/scratch/limit_pruning/fully_matched_limit.parquet'
+STORED AS PARQUET
+OPTIONS (
+  'format.max_row_group_size' '3'
+);
+----
+9
+
+statement ok
+drop table fully_matched_limit_source;
+
+statement ok
+CREATE EXTERNAL TABLE fully_matched_limit
+STORED AS PARQUET
+LOCATION 'test_files/scratch/limit_pruning/fully_matched_limit.parquet';
+
+# One fully matched row group sits between two filtered row groups.
+# LIMIT must apply across the entire scan, not once per decoder run.
+query TT
+explain analyze select a from fully_matched_limit where a >= 3 limit 4;
+----
+Plan with Metrics DataSourceExec: <slt:ignore>metrics=[output_rows=4, 
<slt:ignore>row_groups_pruned_statistics=3 total → 3 matched -> 1 fully 
matched<slt:ignore>]
+
+query I
+select a from fully_matched_limit where a >= 3 limit 4;

Review Comment:
   perhaps we should also add an ORDER BY to this query?



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -74,9 +74,18 @@ impl RowGroupAccessPlanFilter {
         self.access_plan.row_group_index_iter()
     }
 
-    /// Returns the inner access plan
-    pub fn build(self) -> ParquetAccessPlan {
-        self.access_plan
+    /// Returns the inner access plan and the indices of fully matched row 
groups.
+    ///
+    /// Fully matched row groups are those where ALL rows are known to satisfy
+    /// the predicate based on row group statistics. The returned indices are
+    /// a subset of the row groups that will be scanned.
+    pub fn build(self) -> (ParquetAccessPlan, Vec<usize>) {

Review Comment:
   This API feels a little awkward (to have to return both a plan and a 
Vec<usize>) -- and then you have to pass the row group indices as well to 
`prune_plan_with_page_index`
   
   What do you think about just passing the `fully_matched` field to the 
ParquetAccessPlan itself -- that way you could avoid a allocation here, and the 
API would be more encapsulated 🤔 
   
   It would also make the `fully_matched_row_groups.contains(&row_group_index) 
{` check above faster (index lookup rather than linear search)



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -357,13 +366,38 @@ impl RowGroupAccessPlanFilter {
             return;
         };
 
+        // Collect unique column names referenced by the predicate so we can

Review Comment:
   this looks like a separate fix (Null handling) -- can you possibly pull this 
part into its own PR for easier / faster review?



##########
datafusion/datasource-parquet/src/page_filter.rs:
##########
@@ -197,6 +198,11 @@ impl PagePruningAccessPlanFilter {
         // for each row group specified in the access plan
         let row_group_indexes = access_plan.row_group_indexes();
         for row_group_index in row_group_indexes {
+            // Skip page pruning for fully matched row groups: all rows are
+            // known to satisfy the predicate, so page-level pruning is wasted 
work.
+            if fully_matched_row_groups.contains(&row_group_index) {

Review Comment:
   💯 



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1139,33 +1154,71 @@ impl RowGroupsPrunedParquetOpen {
             reader_metadata.parquet_schema(),
         );
 
-        let mut decoder_builder =
-            ParquetPushDecoderBuilder::new_with_metadata(reader_metadata)
-                .with_projection(read_plan.projection_mask)
+        // Split into consecutive runs of row groups that share the same filter
+        // requirement. Fully matched row groups skip the RowFilter; others 
need it.
+        // Reverse the run order for reverse scans so the combined decoder 
stream
+        // preserves the requested global row group order.
+        let mut runs = if has_row_filter && 
!fully_matched_row_groups.is_empty() {
+            split_decoder_runs(access_plan, &fully_matched_row_groups)

Review Comment:
   If fully_matched_row_groups were a field on the access_plan I think this 
logic would be significantly easier to read (at least for me as a human 🤖 👴 )
   Then you could encapsulate something like
   ```rust
   let runs = access_plan.split_runs();
   ```
   
   better yet would be to then wrap the Run in some structure and maybe have a 
`plan_run` or method or function 🤔 



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1120,17 +1134,18 @@ impl RowGroupsPrunedParquetOpen {
                 reader_metadata.parquet_schema(),
                 file_metadata.as_ref(),
                 &prepared.file_metrics,
+                &fully_matched_row_groups,
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
-        let mut prepared_plan = access_plan.prepare(rg_metadata)?;
-
-        // Potentially reverse the access plan for performance.
-        // See `ParquetSource::try_pushdown_sort` for the rationale.
-        if prepared.reverse_row_groups {
-            prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
-        }
+        // Prepare access plans (extract row groups and row selection)
+        let prepare = |plan: ParquetAccessPlan| -> Result<PreparedAccessPlan> {
+            let mut pp = plan.prepare(rg_metadata)?;

Review Comment:
   I found the "prepared" vs "pp" hard to understand here 🤔 



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -357,13 +366,38 @@ impl RowGroupAccessPlanFilter {
             return;
         };
 
+        // Collect unique column names referenced by the predicate so we can
+        // check for NULLs. Rows with NULL predicate columns evaluate to NULL
+        // (not true), so a row group with NULLs cannot be "fully matched."
+        let predicate_columns =
+            
datafusion_physical_expr::utils::collect_columns(predicate.orig_expr());
+
+        let null_count_converters: Vec<StatisticsConverter> = predicate_columns
+            .iter()
+            .filter_map(|col| {
+                StatisticsConverter::try_new(col.name(), arrow_schema, 
parquet_schema)

Review Comment:
   Converting statistics again for the column takes substantial time (esp for 
things like string columns) -- and we should already have converted statistics 
once when we create the pruning predicate. Is there any way to reuse those 
statistics (or cache the row group null counts?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to