huaxingao commented on code in PR #6622:
URL: https://github.com/apache/iceberg/pull/6622#discussion_r1115053598
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -158,6 +178,134 @@ public Filter[] pushedFilters() {
return pushedFilters;
}
+ @Override
+ public boolean pushAggregation(Aggregation aggregation) {
+ if (!canPushDownAggregation(aggregation)) {
+ return false;
+ }
+
+ AggregateEvaluator aggregateEvaluator;
+ try {
+ List<Expression> aggregates =
+ Arrays.stream(aggregation.aggregateExpressions())
+ .map(agg -> SparkAggregates.convert(agg))
+ .collect(Collectors.toList());
+ aggregateEvaluator = AggregateEvaluator.create(schema, aggregates);
+ } catch (UnsupportedOperationException | IllegalArgumentException e) {
+ LOG.info("Skipped aggregate pushdown: " + e);
+ return false;
+ }
+
+ if
(!metricsModeSupportsAggregatePushDown(aggregateEvaluator.aggregates())) {
+ return false;
+ }
+
+ TableScan scan = table.newScan();
+ ((DataTableScan) scan).setStats(true);
+ Snapshot snapshot = readSnapshot();
+ if (snapshot == null) {
+ LOG.info("Skipped aggregate pushdown: table snapshot is null");
+ return false;
+ }
+ scan = scan.useSnapshot(snapshot.snapshotId());
+ scan = configureSplitPlanning(scan);
+
+ try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {
Review Comment:
> I think that Spark will also push filters first. If we don't intend to
pass filters to the scan, then we will need to detect that there are filters
and return false because we don't filter right now.
Spark actually checks if all the filters are completely pushed down before
it calls pushDownAggregation. If there are postScanFilters, Spark doesn't call
pushDownAggregation.
> Also, it is possible to filter as long as the filter is entirely pushed
down. You can check that using ExpressionUtil.selectsPartitions. If that
returns true, then it is safe to pass the filter here!
If Spark goes to the path pushDownAggregation, then all the filters must be
entirely pushed down (must all be partition filters), we don't need to check in
Iceberg.
However, somehow the `GenericDataFile` still contains the stats that should
be filtered out by partition filter. Maybe I didn't do it correctly. I will
return false for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]