huaxingao commented on code in PR #6252: URL: https://github.com/apache/iceberg/pull/6252#discussion_r1063701748
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java: ########## @@ -148,6 +159,115 @@ public Filter[] pushedFilters() { return pushedFilters; } + @Override + public boolean pushAggregation(Aggregation aggregation) { + if (!pushDownAggregate(aggregation)) { + return false; + } + + List<BoundAggregate> boundExpressions = + Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length); + for (AggregateFunc aggregate : aggregation.aggregateExpressions()) { + Expression expr = SparkAggregates.convert(aggregate); + if (expr != null) { + try { + boundExpressions.add( + (BoundAggregate) Binder.bind(schema.asStruct(), expr, caseSensitive)); + } catch (ValidationException e) { + // binding to the table schema failed, so this expression cannot be pushed down + // disable aggregate push down + LOG.info("Failed to convert aggregate expression: {}. {}", aggregate, e.getMessage()); + return false; + } + } else { + // only push down aggregates iff all of them can be pushed down. + LOG.info("Cannot push down aggregate (failed to bind): {}", aggregate); + return false; + } + } + + if (!SparkPushedDownAggregateUtil.metricsModeSupportsAggregatePushDown( + table, boundExpressions)) { + LOG.info("The MetricsMode doesn't support aggregate push down."); + return false; + } + + try { + List<Types.NestedField> aggFields = Lists.newArrayList(); + List<Integer> aggregateIndexInTableSchema = Lists.newArrayList(); + for (int index = 0; index < boundExpressions.size(); index++) { + // Get the type for each of the pushed down aggregate, and use these Types.NestedField to + // build the schema of this data source scan, which is different from the schema of the + // table. + // e.g. SELECT COUNT(*), MAX(col1), MIN(col1), MAX(col2), MIN(col3) FROM table; + // the schema of the table is + // col1 IntegerType, col2 FloatType, col3 DecimalType + // the schema of the data source scan is + // count(*) LongType, max(col1) IntegerType, max(col2) FloatType, min(col3) DecimalType + BoundAggregate aggregate = boundExpressions.get(index); + Types.NestedField field = AggregateUtil.buildAggregateNestedField(aggregate, index + 1); + if (field.type().isNestedType()) { + // Statistics (upper_bounds and lower_bounds, null_value_counts) are not + // available for top columns, so for top columns, we can only push down Count(*). + // Statistics (upper_bounds and lower_bounds, null_value_counts) are available for + // subfields inside nested columns. Will enable push down Max, Min, Count in + // nested column in next phase. + // TODO: enable push down Count(*) for nested column and Max, Min, Count + // for subfields in nested columns. + LOG.info("Aggregate pushed down is not supported for nested type yet {}", aggregate); + return false; + } + + aggFields.add(field); + aggregateIndexInTableSchema.add( + AggregateUtil.columnIndexInTableSchema(aggregate, table, caseSensitive)); + } + + pushedAggregateSchema = SparkSchemaUtil.convert(new Schema(aggFields)); + this.pushedAggregateRows = + SparkPushedDownAggregateUtil.constructInternalRowForPushedDownAggregate( + spark, table, boundExpressions, aggregateIndexInTableSchema); + } catch (Exception e) { + LOG.info("Aggregate can't be pushed down", e.getMessage()); + return false; + } + + return true; + } + + private boolean pushDownAggregate(Aggregation aggregation) { + if (!(table instanceof BaseTable)) { + return false; + } + + if (!readConf.aggregatePushDown()) { + return false; + } + + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot != null) { + Map<String, String> map = currentSnapshot.summary(); + // if there are row-level deletes in current snapshot, the statics + // maybe changed, so disable push down aggregate. + if (Integer.parseInt(map.getOrDefault("total-position-deletes", "0")) > 0 Review Comment: @RussellSpitzer Thank you very much for your review! I will fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org