RussellSpitzer commented on code in PR #7636:
URL: https://github.com/apache/iceberg/pull/7636#discussion_r1202919345
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java:
##########
@@ -535,6 +540,106 @@ public void testAggregatePushDownForTimeTravel() {
assertEquals("count push down", expected2, actual2);
}
+ @Test
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public void testAggregatePushDownForIncrementalScan() {
+ sql("CREATE TABLE %s (id LONG, data INT) USING iceberg", tableName);
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, 1111), (1, 2222), (2, 3333), (2,
4444), (3, 5555), (3, 6666) ",
+ tableName);
+ long snapshotId1 =
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+ sql("INSERT INTO %s VALUES (4, 7777), (5, 8888)", tableName);
+ long snapshotId2 =
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+ sql("INSERT INTO %s VALUES (6, -7777), (7, 8888)", tableName);
+ long snapshotId3 =
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+ sql("INSERT INTO %s VALUES (8, 7777), (9, 9999)", tableName);
+
+ Dataset<Row> dfWithAggPushdown1 =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId2)
+ .option(SparkReadOptions.END_SNAPSHOT_ID, snapshotId3)
+ .load(tableName)
+ .agg(functions.min("data"), functions.max("data"),
functions.count("data"));
+ String explain1 =
+
dfWithAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple"));
+ boolean explainContainsPushDownAggregates1 = false;
+ if (explain1.contains("count(data)")
+ && explain1.contains("min(data)")
+ && explain1.contains("max(data)")) {
+ explainContainsPushDownAggregates1 = true;
+ }
+
+ Assert.assertTrue("aggregate pushed down",
explainContainsPushDownAggregates1);
+
+ Dataset<Row> dfWithoutAggPushdown1 =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.START_SNAPSHOT_ID, snapshotId2)
+ .option(SparkReadOptions.END_SNAPSHOT_ID, snapshotId3)
+ .option(SparkReadOptions.AGGREGATE_PUSH_DOWN_ENABLED, "false")
+ .load(tableName)
+ .agg(functions.min("data"), functions.max("data"),
functions.count("data"));
+ String explain2 =
+
dfWithoutAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple"));
+ explainContainsPushDownAggregates1 = false;
Review Comment:
We have a StringAssert.contains(strings*) you can use (which will also give
a nicer exception)
Assertions.asserThat("str").contains(.....)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]