hemanthboyina commented on code in PR #16176:
URL: https://github.com/apache/iceberg/pull/16176#discussion_r3212643563


##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -186,6 +196,214 @@ public boolean pushAggregation(Aggregation aggregation) {
     return true;
   }
 
+  /**
+   * Push down aggregation with GROUP BY on identity partition columns. When 
all GROUP BY columns
+   * are identity partition fields, aggregates can be computed from file 
metadata grouped by
+   * partition values, avoiding reading any data files.
+   */
+  private boolean pushGroupByAggregation(
+      Aggregation aggregation, List<BoundAggregate<?, ?>> boundAggregates) {
+    Schema tableSchema = table().schema();
+
+    // resolve GROUP BY columns to source field IDs (not positions, for spec 
evolution safety)
+    List<Integer> groupBySourceIds = Lists.newArrayList();
+    List<Types.NestedField> groupByFields = Lists.newArrayList();
+    if (!resolveGroupByFields(aggregation, tableSchema, groupBySourceIds, 
groupByFields)) {
+      return false;
+    }
+
+    Map<List<Object>, AggregateEvaluator> evaluatorsByPartition =
+        groupFilesByPartition(groupBySourceIds, boundAggregates);
+    if (evaluatorsByPartition == null) {
+      return false;
+    }
+
+    localScan = buildGroupedLocalScan(groupByFields, evaluatorsByPartition);
+    return localScan != null;
+  }
+
+  private boolean resolveGroupByFields(
+      Aggregation aggregation,
+      Schema tableSchema,
+      List<Integer> groupBySourceIds,
+      List<Types.NestedField> groupByFields) {
+    PartitionSpec currentSpec = table().spec();
+    for (org.apache.spark.sql.connector.expressions.Expression groupByExpr :
+        aggregation.groupByExpressions()) {
+      String colName =
+          SparkUtil.toColumnName(
+              (org.apache.spark.sql.connector.expressions.NamedReference) 
groupByExpr);
+      Types.NestedField sourceField = tableSchema.findField(colName);
+      if (sourceField == null) {
+        LOG.info("Skipping grouped aggregate pushdown: cannot find field {}", 
colName);
+        return false;
+      }
+
+      // verify the field is an identity partition in the current spec
+      if (findIdentityPartitionPosition(currentSpec, sourceField.fieldId()) < 
0) {
+        LOG.info(
+            "Skipping grouped aggregate pushdown: {} is not an identity 
partition field", colName);
+        return false;
+      }
+
+      groupBySourceIds.add(sourceField.fieldId());
+      groupByFields.add(sourceField);
+    }
+
+    return true;
+  }
+
+  private Map<List<Object>, AggregateEvaluator> groupFilesByPartition(
+      List<Integer> groupBySourceIds, List<BoundAggregate<?, ?>> 
boundAggregates) {
+    Map<List<Object>, AggregateEvaluator> evaluatorsByPartition = 
Maps.newLinkedHashMap();
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = planFilesWithStats()) 
{
+      for (FileScanTask task : fileScanTasks) {
+        if (!task.deletes().isEmpty()) {
+          LOG.info("Skipping grouped aggregate pushdown: detected row level 
deletes");
+          return null;
+        }
+
+        // resolve partition values using the file's own spec (handles spec 
evolution)
+        PartitionSpec fileSpec = table().specs().get(task.file().specId());
+        StructLike partition = task.file().partition();
+        List<Object> key = 
Lists.newArrayListWithCapacity(groupBySourceIds.size());
+
+        for (int sourceId : groupBySourceIds) {
+          int pos = findIdentityPartitionPosition(fileSpec, sourceId);
+          if (pos < 0) {
+            LOG.info(
+                "Skipping grouped aggregate pushdown: field {} not in spec {}",
+                sourceId,
+                fileSpec.specId());
+            return null;
+          }
+          key.add(partition.get(pos, Object.class));
+        }
+
+        evaluatorsByPartition
+            .computeIfAbsent(key, k -> 
AggregateEvaluator.create(boundAggregates))
+            .update(task.file());
+      }
+    } catch (IOException e) {
+      LOG.info("Skipping grouped aggregate pushdown: ", e);
+      return null;
+    }
+
+    if (evaluatorsByPartition.isEmpty()) {
+      return null;
+    }
+
+    for (AggregateEvaluator evaluator : evaluatorsByPartition.values()) {
+      if (!evaluator.allAggregatorsValid()) {
+        return null;
+      }
+    }
+
+    return evaluatorsByPartition;
+  }
+
+  private SparkLocalScan buildGroupedLocalScan(
+      List<Types.NestedField> groupByFields,
+      Map<List<Object>, AggregateEvaluator> evaluatorsByPartition) {
+    AggregateEvaluator firstEvaluator = 
evaluatorsByPartition.values().iterator().next();
+    List<Types.NestedField> resultFields = Lists.newArrayList();
+    int fieldId = 0;
+
+    for (Types.NestedField field : groupByFields) {
+      resultFields.add(Types.NestedField.optional(fieldId++, field.name(), 
field.type()));
+    }
+
+    for (Types.NestedField field : firstEvaluator.resultType().fields()) {
+      resultFields.add(Types.NestedField.optional(fieldId++, field.name(), 
field.type()));
+    }
+
+    Types.StructType resultType = Types.StructType.of(resultFields);
+    List<InternalRow> resultRows = Lists.newArrayList();
+
+    for (Map.Entry<List<Object>, AggregateEvaluator> entry : 
evaluatorsByPartition.entrySet()) {
+      List<Object> partitionValues = entry.getKey();
+      StructLike aggResult = entry.getValue().result();
+
+      Object[] combined = new Object[resultFields.size()];
+      for (int i = 0; i < partitionValues.size(); i++) {
+        combined[i] = partitionValues.get(i);
+      }
+
+      for (int i = 0; i < aggResult.size(); i++) {
+        combined[partitionValues.size() + i] = aggResult.get(i, Object.class);
+      }
+
+      resultRows.add(new StructInternalRow(resultType).setStruct(new 
ArrayStructLike(combined)));
+    }
+
+    StructType pushedSchema = SparkSchemaUtil.convert(new 
Schema(resultFields));
+    return new SparkLocalScan(
+        table(), pushedSchema, resultRows.toArray(new InternalRow[0]), 
filters());
+  }
+
+  private int findIdentityPartitionPosition(PartitionSpec spec, int 
sourceFieldId) {
+    List<PartitionField> fields = spec.fields();
+    for (int i = 0; i < fields.size(); i++) {
+      PartitionField field = fields.get(i);
+      if (field.sourceId() == sourceFieldId && field.transform().isIdentity()) 
{
+        return i;
+      }
+    }
+
+    return -1;
+  }
+
+  private boolean allGroupByAreIdentityPartitionFields(Aggregation 
aggregation) {
+    PartitionSpec spec = table().spec();
+    Schema tableSchema = table().schema();
+
+    for (org.apache.spark.sql.connector.expressions.Expression groupByExpr :
+        aggregation.groupByExpressions()) {
+      if (!(groupByExpr instanceof 
org.apache.spark.sql.connector.expressions.NamedReference)) {
+        return false;
+      }
+
+      String colName =
+          SparkUtil.toColumnName(
+              (org.apache.spark.sql.connector.expressions.NamedReference) 
groupByExpr);
+      Types.NestedField sourceField = tableSchema.findField(colName);
+      if (sourceField == null) {
+        return false;
+      }
+
+      if (findIdentityPartitionPosition(spec, sourceField.fieldId()) < 0) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static class ArrayStructLike implements StructLike {

Review Comment:
   AggregateEvaluator.ArrayStructLike is private static in the api module. 
Since SparkScanBuilder is in spark module, I assume even package-private 
wouldn't help, we'd need to make it public. Kept the changes same to avoid API 
surface changes. Happy to follow up separately if preferred.
   



##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java:
##########
@@ -568,11 +568,9 @@ public void testAggregationPushdownOnBucketedColumn() {
     sql(
         "CREATE TABLE %s (id BIGINT, struct_with_int STRUCT<c1:INT>) USING 
iceberg PARTITIONED BY (bucket(8, id))",
         tableName);
-

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to