huaxingao commented on code in PR #7636:
URL: https://github.com/apache/iceberg/pull/7636#discussion_r1203054834
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java:
##########
@@ -535,6 +540,106 @@ public void testAggregatePushDownForTimeTravel() {
assertEquals("count push down", expected2, actual2);
}
+ @Test
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public void testAggregatePushDownForIncrementalScan() {
+ sql("CREATE TABLE %s (id LONG, data INT) USING iceberg", tableName);
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, 1111), (1, 2222), (2, 3333), (2,
4444), (3, 5555), (3, 6666) ",
+ tableName);
+ long snapshotId1 =
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+ sql("INSERT INTO %s VALUES (4, 7777), (5, 8888)", tableName);
+ long snapshotId2 =
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+ sql("INSERT INTO %s VALUES (6, -7777), (7, 8888)", tableName);
+ long snapshotId3 =
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+ sql("INSERT INTO %s VALUES (8, 7777), (9, 9999)", tableName);
+
+ Dataset<Row> dfWithAggPushdown1 =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId2)
+ .option(SparkReadOptions.END_SNAPSHOT_ID, snapshotId3)
+ .load(tableName)
+ .agg(functions.min("data"), functions.max("data"),
functions.count("data"));
+ String explain1 =
+
dfWithAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple"));
+ boolean explainContainsPushDownAggregates1 = false;
+ if (explain1.contains("count(data)")
+ && explain1.contains("min(data)")
+ && explain1.contains("max(data)")) {
+ explainContainsPushDownAggregates1 = true;
+ }
+
+ Assert.assertTrue("aggregate pushed down",
explainContainsPushDownAggregates1);
+
+ Dataset<Row> dfWithoutAggPushdown1 =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId2)
+ .option(SparkReadOptions.END_SNAPSHOT_ID, snapshotId3)
+ .option(SparkReadOptions.AGGREGATE_PUSH_DOWN_ENABLED, "false")
+ .load(tableName)
+ .agg(functions.min("data"), functions.max("data"),
functions.count("data"));
+ String explain2 =
+
dfWithoutAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple"));
+ explainContainsPushDownAggregates1 = false;
Review Comment:
I don't have a good way to make sure that the expressions are in the
TableRelation.
Here is the pushed down plan:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[min(agg_func_0#51), max(agg_func_1#52),
sum(agg_func_2#53L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=65]
+- HashAggregate(keys=[], functions=[partial_min(agg_func_0#51),
partial_max(agg_func_1#52), partial_sum(agg_func_2#53L)])
+- Project [min(data)#54 AS agg_func_0#51, max(data)#55 AS
agg_func_1#52, count(data)#56L AS agg_func_2#53L]
+- LocalTableScan [min(data)#54, max(data)#55, count(data)#56L]
```
Here is the non pushed down plan:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[min(data#414), max(data#414),
count(data#414)])
+- HashAggregate(keys=[], functions=[partial_min(data#414),
partial_max(data#414), partial_count(data#414)])
+- BatchScan spark_catalog.default.table[data#414]
spark_catalog.default.table (branch=null) [filters=, groupedBy=]
RuntimeFilters: []
```
I think actually it might be easier to check `LocalTableScan` instead.
I have changed this test to check `LocalTableScan`. I will have a followup
to change all the other tests to also check `LocalTableScan` .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]