huaxingao commented on code in PR #6622: URL: https://github.com/apache/iceberg/pull/6622#discussion_r1115053598
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java: ########## @@ -158,6 +178,134 @@ public Filter[] pushedFilters() { return pushedFilters; } + @Override + public boolean pushAggregation(Aggregation aggregation) { + if (!canPushDownAggregation(aggregation)) { + return false; + } + + AggregateEvaluator aggregateEvaluator; + try { + List<Expression> aggregates = + Arrays.stream(aggregation.aggregateExpressions()) + .map(agg -> SparkAggregates.convert(agg)) + .collect(Collectors.toList()); + aggregateEvaluator = AggregateEvaluator.create(schema, aggregates); + } catch (UnsupportedOperationException | IllegalArgumentException e) { + LOG.info("Skipped aggregate pushdown: " + e); + return false; + } + + if (!metricsModeSupportsAggregatePushDown(aggregateEvaluator.aggregates())) { + return false; + } + + TableScan scan = table.newScan(); + ((DataTableScan) scan).setStats(true); + Snapshot snapshot = readSnapshot(); + if (snapshot == null) { + LOG.info("Skipped aggregate pushdown: table snapshot is null"); + return false; + } + scan = scan.useSnapshot(snapshot.snapshotId()); + scan = configureSplitPlanning(scan); + + try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) { Review Comment: > I think that Spark will also push filters first. If we don't intend to pass filters to the scan, then we will need to detect that there are filters and return false because we don't filter right now. Spark actually checks if all the filters are completely pushed down before it calls pushDownAggregation. If there are postScanFilters, Spark doesn't call pushDownAggregation. > Also, it is possible to filter as long as the filter is entirely pushed down. You can check that using ExpressionUtil.selectsPartitions. If that returns true, then it is safe to pass the filter here! If Spark goes to the path pushDownAggregation, then all the filters must be entirely pushed down (must all be partition filters), we don't need to check in Iceberg. However, somehow the `GenericDataFile` still contains the stats that should be filtered out by partition filter. Maybe I didn't do it correctly. I will return false for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org