huaxingao commented on code in PR #6622:
URL: https://github.com/apache/iceberg/pull/6622#discussion_r1115053598


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -158,6 +178,134 @@ public Filter[] pushedFilters() {
     return pushedFilters;
   }
 
+  @Override
+  public boolean pushAggregation(Aggregation aggregation) {
+    if (!canPushDownAggregation(aggregation)) {
+      return false;
+    }
+
+    AggregateEvaluator aggregateEvaluator;
+    try {
+      List<Expression> aggregates =
+          Arrays.stream(aggregation.aggregateExpressions())
+              .map(agg -> SparkAggregates.convert(agg))
+              .collect(Collectors.toList());
+      aggregateEvaluator = AggregateEvaluator.create(schema, aggregates);
+    } catch (UnsupportedOperationException | IllegalArgumentException e) {
+      LOG.info("Skipped aggregate pushdown: " + e);
+      return false;
+    }
+
+    if 
(!metricsModeSupportsAggregatePushDown(aggregateEvaluator.aggregates())) {
+      return false;
+    }
+
+    TableScan scan = table.newScan();
+    ((DataTableScan) scan).setStats(true);
+    Snapshot snapshot = readSnapshot();
+    if (snapshot == null) {
+      LOG.info("Skipped aggregate pushdown: table snapshot is null");
+      return false;
+    }
+    scan = scan.useSnapshot(snapshot.snapshotId());
+    scan = configureSplitPlanning(scan);
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {

Review Comment:
   > I think that Spark will also push filters first. If we don't intend to 
pass filters to the scan, then we will need to detect that there are filters 
and return false because we don't filter right now.
   
   Spark actually checks if all the filters are completely pushed down before 
it calls pushDownAggregation. If there are postScanFilters, Spark doesn't call 
pushDownAggregation.
   
   > Also, it is possible to filter as long as the filter is entirely pushed 
down. You can check that using ExpressionUtil.selectsPartitions. If that 
returns true, then it is safe to pass the filter here!
   
   If Spark goes to the path pushDownAggregation, then all the filters must be 
entirely pushed down (must all be partition filters), we don't need to check in 
Iceberg.
   However, somehow the `GenericDataFile` still contains the stats that should 
be filtered out by partition filter. Maybe I didn't do it correctly. I will 
return false for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to