Shekharrajak commented on code in PR #16454:
URL: https://github.com/apache/iceberg/pull/16454#discussion_r3276627515


##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java:
##########
@@ -1170,4 +1170,176 @@ private static NamedReference fieldRef(String col) {
   private static UserDefinedScalarFunc toUDF(BoundFunction function, 
Expression[] expressions) {
     return new UserDefinedScalarFunc(function.name(), 
function.canonicalName(), expressions);
   }
+
+  // 
---------------------------------------------------------------------------
+  // SupportsReportOrdering — tests that sort_order_id is surfaced as
+  // outputOrdering so Spark can skip redundant sorts above sorted scans.
+  // Tracks https://github.com/apache/iceberg/issues/16430.
+  // 
---------------------------------------------------------------------------
+
+  @TestTemplate
+  public void testOutputOrderingForSingleAscSortKey() {
+    sql(
+        "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+            + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+            + " 'read.split.open-file-cost'='1')",
+        tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
+    Table table = validationCatalog.loadTable(tableIdent);
+    table.replaceSortOrder().asc("event_time").commit();
+
+    sql(
+        "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00'),"
+            + " (2, TIMESTAMP '2024-01-01 01:00:00')",
+        tableName);
+    sql(
+        "INSERT INTO %s VALUES (3, TIMESTAMP '2024-01-02 00:00:00'),"
+            + " (4, TIMESTAMP '2024-01-02 01:00:00')",
+        tableName);
+
+    String plan = explainPlan("SELECT * FROM %s ORDER BY event_time LIMIT 
100", tableName);
+    assertThat(plan)
+        .as("Sort eliminated when scan advertises outputOrdering for ASC sort 
key")
+        .doesNotContain("Sort [");
+  }
+
+  @TestTemplate
+  public void testOutputOrderingForDescSortKey() {
+    sql(
+        "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+            + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+            + " 'read.split.open-file-cost'='1')",
+        tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
+    
validationCatalog.loadTable(tableIdent).replaceSortOrder().desc("event_time").commit();
+
+    sql(
+        "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-02 00:00:00'),"
+            + " (2, TIMESTAMP '2024-01-01 00:00:00')",
+        tableName);
+
+    String plan = explainPlan("SELECT * FROM %s ORDER BY event_time DESC LIMIT 
100", tableName);
+    assertThat(plan).as("Sort eliminated for DESC sort 
key").doesNotContain("Sort [");
+  }
+
+  @TestTemplate
+  public void testOutputOrderingForCompositeSortKey() {
+    sql(
+        "CREATE TABLE %s (region STRING, user_id BIGINT, event_time TIMESTAMP) 
USING iceberg"
+            + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+            + " 'read.split.open-file-cost'='1')",
+        tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
+    validationCatalog
+        .loadTable(tableIdent)
+        .replaceSortOrder()
+        .asc("region")
+        .asc("user_id")
+        .commit();
+
+    sql(
+        "INSERT INTO %s VALUES ('us', 1, TIMESTAMP '2024-01-01 00:00:00'),"
+            + " ('us', 2, TIMESTAMP '2024-01-02 00:00:00'),"
+            + " ('eu', 3, TIMESTAMP '2024-01-03 00:00:00')",
+        tableName);
+
+    String plan = explainPlan("SELECT * FROM %s ORDER BY region, user_id LIMIT 
100", tableName);
+    assertThat(plan).as("Sort eliminated for composite sort 
key").doesNotContain("Sort [");
+  }
+
+  @TestTemplate
+  public void testNoOutputOrderingForUnsortedTable() {
+    sql(
+        "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+            + " TBLPROPERTIES ('%s'='%s')",
+        tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
+
+    sql(
+        "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00'),"
+            + " (2, TIMESTAMP '2024-01-02 00:00:00')",
+        tableName);
+
+    // Window function forces an explicit local Sort that outputOrdering would 
otherwise satisfy
+    String plan =
+        explainPlan(
+            "SELECT user_id, event_time,"
+                + " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s",
+            tableName);
+    assertThat(plan).as("Sort required for unsorted table").contains("Sort [");
+  }
+
+  @TestTemplate
+  public void testNoOutputOrderingForMixedSortOrderIds() {
+    sql(
+        "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+            + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+            + " 'read.split.open-file-cost'='1')",
+        tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    table.replaceSortOrder().asc("event_time").commit();
+    sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", 
tableName);
+
+    table.refresh();
+    table.replaceSortOrder().asc("user_id").commit();
+    sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", 
tableName);
+
+    String plan =
+        explainPlan(
+            "SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY 
user_id) AS rn FROM %s",
+            tableName);
+    assertThat(plan).as("Sort required when sort_order_id differs across 
files").contains("Sort [");
+  }
+
+  @TestTemplate
+  public void testNoOutputOrderingForBucketTransform() {
+    sql(
+        "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+            + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+            + " 'read.split.open-file-cost'='1')",
+        tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
+    validationCatalog
+        .loadTable(tableIdent)
+        .replaceSortOrder()
+        .asc(org.apache.iceberg.expressions.Expressions.bucket("user_id", 8))
+        .commit();
+
+    sql(
+        "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00'),"
+            + " (2, TIMESTAMP '2024-01-02 00:00:00')",
+        tableName);
+
+    String plan =
+        explainPlan(
+            "SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY 
user_id) AS rn FROM %s",
+            tableName);
+    assertThat(plan).as("Sort required: bucket transform is hash, not 
range").contains("Sort [");

Review Comment:
   Required sorting cases 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to