amogh-jahagirdar commented on code in PR #14615:
URL: https://github.com/apache/iceberg/pull/14615#discussion_r2580030907


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -195,4 +195,18 @@ default ThisT metricsReporter(MetricsReporter reporter) {
     throw new UnsupportedOperationException(
         this.getClass().getName() + " doesn't implement metricsReporter");
   }
+
+  /**
+   * Create a new scan that returns files with at least the given number of 
rows. This is used as a
+   * hint and is entirely optional in order to not have to return more rows 
than necessary. It is

Review Comment:
   minor, as it's a code comment I think we could simplify the last 2 sentences 
in this code comment into 1:
   
   ```
   This may return fewer rows if the scan does not contain that many, or it may 
return more than requested.
   ```



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java:
##########
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
             "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
   }
 
+  @TestTemplate
+  public void limitPushedDownToSparkScan() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    AbstractObjectAssert<?, ?> scanAssert = 
assertThat(builder.build()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+    // verify changelog scan
+    assertThat(builder.buildChangelogScan())
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify MoR scan
+    scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+  }
+
+  @TestTemplate
+  public void limitPushedDownToSparkScanForMetadataTable() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    // load the snapshots metadata table
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path") + 
"#snapshots"), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    assertThat(builder.build())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())

Review Comment:
   Ok nice, looks like all these tests are parameterized on planning mode which 
includes distributed



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java:
##########
@@ -150,6 +150,16 @@ public void testSelectRewrite() {
         .isEqualTo("(float IS NOT NULL AND is_nan(float))");
   }
 
+  @TestTemplate
+  public void selectWithLimit() {
+    Object[] first = row(1L, "a", 1.0F);
+    Object[] second = row(2L, "b", 2.0F);
+    Object[] third = row(3L, "c", Float.NaN);
+    assertThat(sql("SELECT * FROM %s LIMIT 1", 
tableName)).containsExactly(first);
+    assertThat(sql("SELECT * FROM %s LIMIT 2", 
tableName)).containsExactly(first, second);
+    assertThat(sql("SELECT * FROM %s LIMIT 3", 
tableName)).containsExactly(first, second, third);

Review Comment:
   I agree with how the new tests are set up; there's a clean separation 
between testing what actually gets pushed down, which verifies we're building 
the scans correctly and an expectation based off the result of the pushdown.



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java:
##########
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
             "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
   }
 
+  @TestTemplate
+  public void limitPushedDownToSparkScan() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    AbstractObjectAssert<?, ?> scanAssert = 
assertThat(builder.build()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+    // verify changelog scan
+    assertThat(builder.buildChangelogScan())
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify MoR scan
+    scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+  }
+
+  @TestTemplate
+  public void limitPushedDownToSparkScanForMetadataTable() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    // load the snapshots metadata table
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path") + 
"#snapshots"), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    assertThat(builder.build())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())

Review Comment:
   What about making sure it's pushed for distributed planning?



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java:
##########
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
             "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
   }
 
+  @TestTemplate
+  public void limitPushedDownToSparkScan() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    AbstractObjectAssert<?, ?> scanAssert = 
assertThat(builder.build()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+    // verify changelog scan
+    assertThat(builder.buildChangelogScan())
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify MoR scan
+    scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+  }
+
+  @TestTemplate
+  public void limitPushedDownToSparkScanForMetadataTable() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    // load the snapshots metadata table
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path") + 
"#snapshots"), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    assertThat(builder.build())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())

Review Comment:
   Though for this particular test for metadata tables, 
`buildCopyOnWriteScan`/`buildMergeOnReadScan` wouldn't be relevant right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to