wombatu-kun opened a new pull request, #16384: URL: https://github.com/apache/iceberg/pull/16384
## What Enables Spark aggregate push-down for `GROUP BY` queries whose grouping columns are identity-partition columns, e.g. `SELECT p, COUNT(*), MIN(x), MAX(x) FROM t GROUP BY p` where `p` is an identity-partitioned column. These are now answered from manifest metadata instead of scanning data files, producing one row per partition group via a local scan. Previously, `SparkScanBuilder.pushAggregation` rejected any aggregation with a `GROUP BY` clause outright (the existing `// TODO: enable aggregate push down for partition col group by expression`), so only ungrouped aggregates were optimized. ## Why For a table partitioned by an identity transform, every row in a data file shares the same value for that partition column, so the file's manifest stats (`record_count`, `lower_bounds`, `upper_bounds`, `value_counts`) belong entirely to a single group. `COUNT`/`MIN`/`MAX` per group can therefore be computed from manifests alone, turning a full table scan into a metadata-only operation for a common reporting pattern. ## How `pushAggregation` now resolves the grouping expressions to schema fields and, when they map to identity partition columns, accumulates a per-group `AggregateEvaluator` keyed by the file's partition tuple, then emits the grouped result as a local scan. The output schema follows the Spark `SupportsPushDownAggregates` contract: group-by columns first (in order), then the aggregate expressions (in order). Push-down is conservatively skipped (the query falls back to a normal scan, with identical results) when any of the following hold: - A grouping column is not an identity partition column in the spec of every scanned data file (covers non-identity transforms such as bucket/truncate/temporal, non-partition columns, and partition-spec evolution). - Any scanned file has delete files (position or equality deletes), which would invalidate metadata-derived counts. - Any group's aggregate cannot be satisfied from metrics (e.g. NaN bounds, or a metrics mode that does not retain the needed bounds). - A non-partition data filter remains (Spark keeps the filter above the scan, so the aggregate is not pushed). ## Scope Applied identically to all four supported Spark trees: `spark/v3.4`, `spark/v3.5`, `spark/v4.0`, `spark/v4.1`. ## Testing New cases in `TestAggregatePushDown` (all four Spark versions): single and multiple identity-partition GROUP BY, non-partition and non-identity-transform fallback, partition-spec-evolution fallback and the stays-identity-across-specs positive case, null partition value as its own group, row-level deletes fallback, partition filter, empty table, non-partition data-filter fallback, NaN fallback, and time-travel / incremental-scan grouped push-down. A differential oracle re-runs each positive query with `spark.sql.iceberg.aggregate-push-down.enabled=false` and asserts the metadata fast-path result equals the data-scan result. All `TestAggregatePushDown` tests pass on Spark 3.4, 3.5, 4.0, and 4.1; `spotlessCheck` is clean. 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
