amogh-jahagirdar commented on code in PR #14615:
URL: https://github.com/apache/iceberg/pull/14615#discussion_r2580030907
##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -195,4 +195,18 @@ default ThisT metricsReporter(MetricsReporter reporter) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement metricsReporter");
}
+
+ /**
+ * Create a new scan that returns files with at least the given number of
rows. This is used as a
+ * hint and is entirely optional in order to not have to return more rows
than necessary. It is
Review Comment:
minor, as it's a code comment I think we could simplify the last 2 sentences
in this code comment into 1:
```
This may return fewer rows if the scan does not contain that many, or it may
return more than requested.
```
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java:
##########
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
"ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
}
+ @TestTemplate
+ public void limitPushedDownToSparkScan() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ AbstractObjectAssert<?, ?> scanAssert =
assertThat(builder.build()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+ // verify changelog scan
+ assertThat(builder.buildChangelogScan())
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify MoR scan
+ scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+ }
+
+ @TestTemplate
+ public void limitPushedDownToSparkScanForMetadataTable() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ // load the snapshots metadata table
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path") +
"#snapshots"), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ assertThat(builder.build())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
Review Comment:
Ok nice, looks like all these tests are parameterized on planning mode which
includes distributed
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java:
##########
@@ -150,6 +150,16 @@ public void testSelectRewrite() {
.isEqualTo("(float IS NOT NULL AND is_nan(float))");
}
+ @TestTemplate
+ public void selectWithLimit() {
+ Object[] first = row(1L, "a", 1.0F);
+ Object[] second = row(2L, "b", 2.0F);
+ Object[] third = row(3L, "c", Float.NaN);
+ assertThat(sql("SELECT * FROM %s LIMIT 1",
tableName)).containsExactly(first);
+ assertThat(sql("SELECT * FROM %s LIMIT 2",
tableName)).containsExactly(first, second);
+ assertThat(sql("SELECT * FROM %s LIMIT 3",
tableName)).containsExactly(first, second, third);
Review Comment:
I agree with how the new tests are set up; there's a clean separation
between testing what actually gets pushed down, which verifies we're building
the scans correctly and an expectation based off the result of the pushdown.
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java:
##########
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
"ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
}
+ @TestTemplate
+ public void limitPushedDownToSparkScan() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ AbstractObjectAssert<?, ?> scanAssert =
assertThat(builder.build()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+ // verify changelog scan
+ assertThat(builder.buildChangelogScan())
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify MoR scan
+ scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+ }
+
+ @TestTemplate
+ public void limitPushedDownToSparkScanForMetadataTable() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ // load the snapshots metadata table
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path") +
"#snapshots"), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ assertThat(builder.build())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
Review Comment:
What about making sure it's pushed for distributed planning?
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java:
##########
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
"ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
}
+ @TestTemplate
+ public void limitPushedDownToSparkScan() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ AbstractObjectAssert<?, ?> scanAssert =
assertThat(builder.build()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+ // verify changelog scan
+ assertThat(builder.buildChangelogScan())
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify MoR scan
+ scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+ }
+
+ @TestTemplate
+ public void limitPushedDownToSparkScanForMetadataTable() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ // load the snapshots metadata table
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path") +
"#snapshots"), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ assertThat(builder.build())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
Review Comment:
Though for this particular test for metadata tables,
`buildCopyOnWriteScan`/`buildMergeOnReadScan` wouldn't be relevant right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]