RussellSpitzer commented on code in PR #7636:
URL: https://github.com/apache/iceberg/pull/7636#discussion_r1204425085


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java:
##########
@@ -535,6 +540,106 @@ public void testAggregatePushDownForTimeTravel() {
     assertEquals("count push down", expected2, actual2);
   }
 
+  @Test
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  public void testAggregatePushDownForIncrementalScan() {
+    sql("CREATE TABLE %s (id LONG, data INT) USING iceberg", tableName);
+    sql(
+        "INSERT INTO TABLE %s VALUES (1, 1111), (1, 2222), (2, 3333), (2, 
4444), (3, 5555), (3, 6666) ",
+        tableName);
+    long snapshotId1 = 
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    sql("INSERT INTO %s VALUES (4, 7777), (5, 8888)", tableName);
+    long snapshotId2 = 
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    sql("INSERT INTO %s VALUES (6, -7777), (7, 8888)", tableName);
+    long snapshotId3 = 
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    sql("INSERT INTO %s VALUES (8, 7777), (9, 9999)", tableName);
+
+    Dataset<Row> dfWithAggPushdown1 =
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId2)
+            .option(SparkReadOptions.END_SNAPSHOT_ID, snapshotId3)
+            .load(tableName)
+            .agg(functions.min("data"), functions.max("data"), 
functions.count("data"));
+    String explain1 =
+        
dfWithAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple"));
+    boolean explainContainsPushDownAggregates1 = false;
+    if (explain1.contains("count(data)")
+        && explain1.contains("min(data)")
+        && explain1.contains("max(data)")) {
+      explainContainsPushDownAggregates1 = true;
+    }
+
+    Assert.assertTrue("aggregate pushed down", 
explainContainsPushDownAggregates1);
+
+    Dataset<Row> dfWithoutAggPushdown1 =
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId2)
+            .option(SparkReadOptions.END_SNAPSHOT_ID, snapshotId3)
+            .option(SparkReadOptions.AGGREGATE_PUSH_DOWN_ENABLED, "false")
+            .load(tableName)
+            .agg(functions.min("data"), functions.max("data"), 
functions.count("data"));
+    String explain2 =
+        
dfWithoutAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple"));
+    explainContainsPushDownAggregates1 = false;
+    if (explain2.contains("count(data)")
+        || explain2.contains("min(data)")
+        || explain2.contains("max(data)")) {
+      explainContainsPushDownAggregates1 = true;
+    }
+
+    Assert.assertFalse("aggregate pushed down", 
explainContainsPushDownAggregates1);
+    assertEquals(
+        "Aggregate pushdown and non-aggregate pushdown should have the same 
results",
+        rowsToJava(dfWithAggPushdown1.collectAsList()),
+        rowsToJava(dfWithoutAggPushdown1.collectAsList()));
+
+    Dataset<Row> dfWithAggPushdown2 =
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId1)
+            .load(tableName)
+            .agg(functions.min("data"), functions.max("data"), 
functions.count("data"));
+
+    String explain3 =
+        
dfWithAggPushdown2.queryExecution().explainString(ExplainMode.fromString("simple"));
+    boolean explainContainsPushDownAggregates2 = false;
+    if (explain3.contains("count(data)")
+        && explain3.contains("min(data)")
+        && explain3.contains("max(data)")) {
+      explainContainsPushDownAggregates2 = true;
+    }
+
+    Assert.assertTrue("aggregate pushed down", 
explainContainsPushDownAggregates2);
+
+    Dataset<Row> dfWithoutAggPushdown2 =

Review Comment:
   unboundedNoPushdownResult



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to